Skip to content

Commit

Permalink
[FLINK-35479][e2e] Add end-to-end test for materialized table
Browse files Browse the repository at this point in the history
  • Loading branch information
hackergin authored and lsyldliu committed Jun 24, 2024
1 parent 7856190 commit 662cfd2
Show file tree
Hide file tree
Showing 4 changed files with 393 additions and 6 deletions.
8 changes: 7 additions & 1 deletion flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ under the License.
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-gateway</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.test.util.FileUtils;
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.test.util.SQLJobClientMode;
Expand Down Expand Up @@ -65,8 +68,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -463,7 +469,7 @@ public void copyLogsTo(Path targetDirectory) throws IOException {
}

/** This rest client is used to submit SQL strings to Rest Endpoint of Sql Gateway. */
private static class TestSqlGatewayRestClient {
public static class TestSqlGatewayRestClient {

private final String host;
private final int port;
Expand All @@ -475,12 +481,31 @@ public TestSqlGatewayRestClient(String host, int port, String version) throws Ex
this.host = host;
this.port = port;
this.version = version;
this.sessionHandle = openSession();
this.sessionHandle = openSession(Collections.emptyMap());
}

private String openSession() throws Exception {
FormBody.Builder builder = new FormBody.Builder();
FormBody requestBody = builder.build();
public TestSqlGatewayRestClient(
String host, int port, String version, Map<String, String> properties)
throws Exception {
this.host = host;
this.port = port;
this.version = version;
this.sessionHandle = openSession(properties);
}

private String openSession(Map<String, String> properties) throws Exception {
RequestBody requestBody;
if (properties == null || properties.isEmpty()) {
requestBody = new FormBody.Builder().build();
} else {
Map<String, Object> requestMap = new HashMap<>();
requestMap.put("properties", properties);
requestBody =
RequestBody.create(
MediaType.parse("application/json; charset=utf-8"),
OBJECT_MAPPER.writeValueAsString(requestMap));
}

final Request request =
new Request.Builder()
.post(requestBody)
Expand Down Expand Up @@ -529,6 +554,45 @@ public void waitUntilOperationTerminate(String operationHandle) throws Exception
} while (!Objects.equals(status, "FINISHED") && !Objects.equals(status, "ERROR"));
}

public List<RowData> getOperationResult(String operationHandle) throws Exception {
List<RowData> result = new ArrayList<>();
String resultUri =
String.format(
"/%s/sessions/%s/operations/%s/result/0",
version, sessionHandle, operationHandle);
while (resultUri != null) {
final Request request =
new Request.Builder()
.get()
.url(String.format("http://%s:%s%s", host, port, resultUri))
.build();

String response = sendRequest(request);

FetchResultsResponseBody fetchResultsResponseBody =
OBJECT_MAPPER.readValue(response, FetchResultsResponseBody.class);
ResultKind resultKind = fetchResultsResponseBody.getResultKind();

if (Objects.equals(resultKind, ResultKind.SUCCESS_WITH_CONTENT)) {
result.addAll(fetchResultsResponseBody.getResults().getData());
}
resultUri = fetchResultsResponseBody.getNextResultUri();
Thread.sleep(1000);
}

return result;
}

public List<RowData> executeStatementWithResult(String sql) {
try {
String operationHandle = executeStatement(sql);
waitUntilOperationTerminate(operationHandle);
return getOperationResult(operationHandle);
} catch (Exception e) {
throw new RuntimeException("Execute statement failed", e);
}
}

private String sendRequest(Request request) throws Exception {
String responseString;
try (Response response = client.newCall(request).execute()) {
Expand Down
8 changes: 8 additions & 0 deletions flink-end-to-end-tests/flink-sql-gateway-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ under the License.
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-filesystem-test-utils</artifactId>
<version>${project.version}</version>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down
Loading

0 comments on commit 662cfd2

Please sign in to comment.