Skip to content

Commit

Permalink
[FLINK-17361] Refactor JdbcTableSourceITCase to use TableResult inste…
Browse files Browse the repository at this point in the history
…ad of StreamITCase

Using the static sink approach of StreamITCase is potentially
problematic with concurrency, plus the code is just plain nicer like
this.
  • Loading branch information
aljoscha committed May 19, 2020
1 parent 7bed30e commit 8c15ff1
Showing 1 changed file with 48 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.connector.jdbc.JdbcTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.utils.StreamITCase;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;

Expand All @@ -34,8 +34,15 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;


/**
Expand Down Expand Up @@ -107,20 +114,19 @@ public void testJdbcSource() throws Exception {
")"
);

StreamITCase.clear();
tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE), Row.class)
.addSink(new StreamITCase.StringSink<>());
env.execute();
TableResult tableResult = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE);

List<String> results = manifestResults(tableResult);

List<String> expected =
Arrays.asList(
"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
StreamITCase.compareWithList(expected);
assertThat(
results,
containsInAnyOrder(
"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234"));
}

@Test
public void testProjectableJdbcSource() throws Exception {
public void testProjectableJdbcSource() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
Expand All @@ -143,20 +149,19 @@ public void testProjectableJdbcSource() throws Exception {
")"
);

StreamITCase.clear();
tEnv.toAppendStream(tEnv.sqlQuery("SELECT timestamp6_col, decimal_col FROM " + INPUT_TABLE), Row.class)
.addSink(new StreamITCase.StringSink<>());
env.execute();
TableResult tableResult = tEnv.executeSql("SELECT timestamp6_col, decimal_col FROM " + INPUT_TABLE);

List<String> results = manifestResults(tableResult);

List<String> expected =
Arrays.asList(
"2020-01-01T15:35:00.123456,100.1234",
"2020-01-01T15:36:01.123456,101.1234");
StreamITCase.compareWithList(expected);
assertThat(
results,
containsInAnyOrder(
"2020-01-01T15:35:00.123456,100.1234",
"2020-01-01T15:36:01.123456,101.1234"));
}

@Test
public void testScanQueryJDBCSource() throws Exception {
public void testScanQueryJDBCSource() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
Expand All @@ -165,23 +170,29 @@ public void testScanQueryJDBCSource() throws Exception {
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);

final String testQuery = "SELECT id FROM " + INPUT_TABLE;
tEnv.sqlUpdate(
"CREATE TABLE test(" +
"id BIGINT" +
") WITH (" +
" 'connector.type'='jdbc'," +
" 'connector.url'='" + DB_URL + "'," +
" 'connector.table'='whatever'," +
" 'connector.read.query'='" + testQuery + "'" +
")"
tEnv.executeSql(
"CREATE TABLE test(" +
"id BIGINT" +
") WITH (" +
" 'connector.type'='jdbc'," +
" 'connector.url'='" + DB_URL + "'," +
" 'connector.table'='whatever'," +
" 'connector.read.query'='" + testQuery + "'" +
")"
);

StreamITCase.clear();
tEnv.toAppendStream(tEnv.sqlQuery("SELECT id FROM test"), Row.class)
.addSink(new StreamITCase.StringSink<>());
env.execute();
TableResult tableResult = tEnv.executeSql("SELECT id FROM test");

List<String> results = manifestResults(tableResult);

assertThat(results, containsInAnyOrder("1", "2"));
}

List<String> expected = Arrays.asList("1", "2");
StreamITCase.compareWithList(expected);
private static List<String> manifestResults(TableResult result) {
Iterator<Row> resultIterator = result.collect();
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false)
.map(Row::toString)
.collect(Collectors.toList());
}
}

0 comments on commit 8c15ff1

Please sign in to comment.