Skip to content

Commit

Permalink
[#3897] fix(python-test): Check Gravitino server status before runnin…
Browse files Browse the repository at this point in the history
…g python test in embedded mode. (#3898)

### What changes were proposed in this pull request?

Add check logic to ensure that the Gravitino server is ready to serve. 

### Why are the changes needed?

To make the python integration-test more stable. 

Fix: #3897
Fix: #3832
Fix: #3934
### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

CI and test locally.
  • Loading branch information
yuqi1129 committed Jun 21, 2024
1 parent c5f9505 commit cc5e67b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 83 deletions.
43 changes: 43 additions & 0 deletions clients/client-python/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* This software is licensed under the Apache License version 2.
*/
import io.github.piyushroshan.python.VenvTask
import java.net.HttpURLConnection
import java.net.URL

plugins {
id("io.github.piyushroshan.python-gradle-miniforge-plugin") version "1.0.0"
Expand All @@ -21,11 +23,52 @@ fun deleteCacheDir(targetDir: String) {
}
}


fun waitForServerIsReady(host: String = "http://localhost", port: Int = 8090, timeout: Long = 30000) {
val startTime = System.currentTimeMillis()
var exception: java.lang.Exception?
val urlString = "$host:$port/metrics"
val successPattern = Regex("\"version\"\\s*:")

while (true) {
try {
val url = URL(urlString)
val connection = url.openConnection() as HttpURLConnection
connection.requestMethod = "GET"
connection.connectTimeout = 1000
connection.readTimeout = 1000

val responseCode = connection.responseCode
if (responseCode == 200) {
val response = connection.inputStream.bufferedReader().use { it.readText() }
if (successPattern.containsMatchIn(response)) {
return // If this succeeds, the API is up and running
} else {
exception = RuntimeException("API returned unexpected response: $response")
}
} else {
exception = RuntimeException("Received non-200 response code: $responseCode")
}
} catch (e: Exception) {
// API is not available yet, continue to wait
exception = e
}

if (System.currentTimeMillis() - startTime > timeout) {
throw RuntimeException("Timed out waiting for API to be available", exception)
}
Thread.sleep(500) // Wait for 0.5 second before checking again
}
}

fun gravitinoServer(operation: String) {
val process = ProcessBuilder("${project.rootDir.path}/distribution/package/bin/gravitino.sh", operation).start()
val exitCode = process.waitFor()
if (exitCode == 0) {
val currentContext = process.inputStream.bufferedReader().readText()
if (operation == "start") {
waitForServerIsReady()
}
println("Gravitino server status: $currentContext")
} else {
println("Gravitino server execution failed with exit code $exitCode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.auth.AuthenticatorType;
import com.datastrato.gravitino.auxiliary.AuxiliaryServiceManager;
import com.datastrato.gravitino.client.ErrorHandlers;
import com.datastrato.gravitino.client.GravitinoAdminClient;
import com.datastrato.gravitino.client.HTTPClient;
import com.datastrato.gravitino.client.KerberosTokenProvider;
import com.datastrato.gravitino.client.RESTClient;
import com.datastrato.gravitino.dto.responses.VersionResponse;
import com.datastrato.gravitino.exceptions.RESTException;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.integration.test.util.ITUtils;
import com.datastrato.gravitino.integration.test.util.KerberosProviderHelper;
import com.datastrato.gravitino.integration.test.util.OAuthMockDataProvider;
Expand All @@ -28,13 +24,10 @@
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -152,8 +145,10 @@ public void start() throws Exception {
});
long beginTime = System.currentTimeMillis();
boolean started = false;

String url = URI + "/metrics";
while (System.currentTimeMillis() - beginTime < 1000 * 60 * 3) {
started = checkIfServerIsRunning();
started = AbstractIT.isHttpServerUp(url);
if (started || future.isDone()) {
break;
}
Expand All @@ -180,9 +175,11 @@ public void stop() throws IOException, InterruptedException {

long beginTime = System.currentTimeMillis();
boolean started = true;

String url = String.format("http://%s:%d/metrics", host, port);
while (System.currentTimeMillis() - beginTime < 1000 * 60 * 3) {
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
started = checkIfServerIsRunning();
started = AbstractIT.isHttpServerUp(url);
if (!started) {
break;
}
Expand Down Expand Up @@ -245,63 +242,4 @@ private void customizeConfigFile(String configTempFileName, String configFileNam

ITUtils.rewriteConfigFile(configTempFileName, configFileName, configMap);
}

public static boolean isPortOpen(String host, int port, int timeout) {
try (Socket socket = new Socket()) {
socket.connect(new java.net.InetSocketAddress(host, port), timeout);
return true;
} catch (Exception e) {
return false;
}
}

private boolean checkIfServerIsRunning() {
String URI = String.format("http://%s:%d", host, port);
LOG.info("checkIfServerIsRunning() URI: {}", URI);

// Use kerberos, we need to use sdk to check
if (Objects.equals(serverConfig.get(Configs.AUTHENTICATOR), "kerberos")) {
try {
KerberosTokenProvider provider =
KerberosTokenProvider.builder()
.withClientPrincipal((String) properties.get("client.kerberos.principal"))
.withKeyTabFile(new File((String) properties.get("client.kerberos.keytab")))
.build();
GravitinoAdminClient adminClient =
GravitinoAdminClient.builder(URI).withKerberosAuth(provider).build();

adminClient.listMetalakes();
return true;
} catch (Exception e) {
if (isPortOpen(host, port, 1000)) {
return true;
}

LOG.warn(
"Kerberos checkIfServerIsRunning() fails, GravitinoServer is not running {}",
e.getMessage());
return false;
}
}

// Not auth, we can use the rest client to check
VersionResponse response = null;
try {
response =
restClient.get(
"api/version",
VersionResponse.class,
Collections.emptyMap(),
ErrorHandlers.restErrorHandler());
} catch (RESTException e) {
LOG.warn("checkIfServerIsRunning() fails, GravitinoServer is not running {}", e.getMessage());
return false;
}
if (response != null && response.getCode() == 0) {
return true;
} else {
LOG.warn("checkIfServerIsRunning() fails, GravitinoServer is not running");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.datastrato.gravitino.server.web.JettyServerConfig;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -35,6 +34,10 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -218,10 +221,18 @@ public static void startIntegrationTest() throws Exception {

GravitinoITUtils.startGravitinoServer();

JettyServerConfig jettyServerConfig =
JettyServerConfig.fromConfig(serverConfig, WEBSERVER_CONF_PREFIX);
String checkServerUrl =
"http://"
+ jettyServerConfig.getHost()
+ ":"
+ jettyServerConfig.getHttpPort()
+ "/metrics";
Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(() -> isGravitinoServerUp());
.until(() -> isHttpServerUp(checkServerUrl));
}

JettyServerConfig jettyServerConfig =
Expand Down Expand Up @@ -285,19 +296,22 @@ protected String readGitCommitIdFromGitFile() {
}
}

private static boolean isGravitinoServerUp() {
JettyServerConfig jettyServerConfig =
JettyServerConfig.fromConfig(serverConfig, WEBSERVER_CONF_PREFIX);
String host = jettyServerConfig.getHost();
int port = jettyServerConfig.getHttpPort();
int timeout = 3000; // 3 second timeout

try (Socket socket = new Socket()) {
socket.connect(new java.net.InetSocketAddress(host, port), timeout);
LOG.info("Gravitino Server is up and running.");
return true;
/**
* Check if the http server is up, If http response status code is 200, then we're assuming the
* server is up. Or else we assume the server is not ready.
*
* <p>Note: The method will ignore the response body and only check the status code.
*
* @param testUrl A url that we want to test ignore the response body.
* @return true if the server is up, false otherwise.
*/
public static boolean isHttpServerUp(String testUrl) {
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpGet request = new HttpGet(testUrl);
ClassicHttpResponse response = httpClient.execute(request, a -> a);
return response.getCode() == 200;
} catch (Exception e) {
LOG.warn("Gravitino Server is not accessible.");
LOG.warn("Check Gravitino server failed: ", e);
return false;
}
}
Expand Down

0 comments on commit cc5e67b

Please sign in to comment.