Skip to content

Commit a0ffa7b

Browse files
CosmosNiHisoka-X
andauthored
[Fix][Connector-V2] Fix http source can not read streaming (#7703)
Co-authored-by: Jia Fan <fanjiaeminem@qq.com>
1 parent 05cf84f commit a0ffa7b

File tree

4 files changed

+227
-2
lines changed

4 files changed

+227
-2
lines changed

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,13 @@ private void updateRequestParam(PageInfo pageInfo) {
153153
.put(pageInfo.getPageField(), pageInfo.getPageIndex().toString());
154154
}
155155

156+
@Override
157+
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
158+
synchronized (output.getCheckpointLock()) {
159+
internalPollNext(output);
160+
}
161+
}
162+
156163
@Override
157164
public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
158165
try {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,25 @@
4444
<version>${project.version}</version>
4545
<scope>test</scope>
4646
</dependency>
47+
<dependency>
48+
<groupId>org.testcontainers</groupId>
49+
<artifactId>postgresql</artifactId>
50+
<version>${testcontainer.version}</version>
51+
<scope>test</scope>
52+
</dependency>
53+
<dependency>
54+
<groupId>org.apache.seatunnel</groupId>
55+
<artifactId>connector-jdbc-e2e-common</artifactId>
56+
<version>${project.version}</version>
57+
<type>test-jar</type>
58+
<scope>test</scope>
59+
</dependency>
60+
<dependency>
61+
<!-- fix CVE-2022-26520 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 -->
62+
<groupId>org.postgresql</groupId>
63+
<artifactId>postgresql</artifactId>
64+
<version>42.5.1</version>
65+
</dependency>
4766
<dependency>
4867
<groupId>org.apache.seatunnel</groupId>
4968
<artifactId>connector-http-lemlist</artifactId>
@@ -104,7 +123,6 @@
104123
<version>5.14.0</version>
105124
<scope>test</scope>
106125
</dependency>
107-
108126
</dependencies>
109127

110128
</project>

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import org.apache.seatunnel.e2e.common.TestResource;
2525
import org.apache.seatunnel.e2e.common.TestSuiteBase;
26+
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
2627
import org.apache.seatunnel.e2e.common.container.TestContainer;
28+
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
2729

2830
import org.junit.jupiter.api.AfterAll;
2931
import org.junit.jupiter.api.Assertions;
@@ -34,6 +36,7 @@
3436
import org.mockserver.model.Format;
3537
import org.testcontainers.containers.Container;
3638
import org.testcontainers.containers.GenericContainer;
39+
import org.testcontainers.containers.PostgreSQLContainer;
3740
import org.testcontainers.containers.output.Slf4jLogConsumer;
3841
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
3942
import org.testcontainers.lifecycle.Startables;
@@ -45,19 +48,30 @@
4548
import lombok.EqualsAndHashCode;
4649
import lombok.Getter;
4750
import lombok.Setter;
51+
import lombok.extern.slf4j.Slf4j;
4852

4953
import java.io.File;
5054
import java.io.IOException;
5155
import java.math.BigDecimal;
5256
import java.net.URL;
57+
import java.sql.Connection;
58+
import java.sql.DriverManager;
59+
import java.sql.ResultSet;
60+
import java.sql.SQLException;
61+
import java.sql.Statement;
5362
import java.util.ArrayList;
5463
import java.util.List;
5564
import java.util.Optional;
65+
import java.util.concurrent.CompletableFuture;
66+
import java.util.concurrent.TimeUnit;
5667
import java.util.stream.Collectors;
5768
import java.util.stream.Stream;
5869

70+
import static org.awaitility.Awaitility.await;
71+
import static org.awaitility.Awaitility.given;
5972
import static org.mockserver.model.HttpRequest.request;
6073

74+
@Slf4j
6175
public class HttpIT extends TestSuiteBase implements TestResource {
6276

6377
private static final String TMP_DIR = "/tmp";
@@ -70,9 +84,31 @@ public class HttpIT extends TestSuiteBase implements TestResource {
7084

7185
private MockServerClient mockServerClient;
7286

87+
private static final String POSTGRESQL_SCHEMA = "public";
88+
private static final String SINK_TABLE_1 = "sink";
89+
private static final Integer MAX_COUNT = 15;
90+
private static final String COUNT_QUERY = "select count(*) from sink";
91+
92+
private static final String PG_IMAGE = "postgres:14-alpine";
93+
private static final String PG_DRIVER_JAR =
94+
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
95+
private PostgreSQLContainer<?> postgreSQLContainer;
96+
97+
@TestContainerExtension
98+
private final ContainerExtendedFactory extendedFactory =
99+
container -> {
100+
Container.ExecResult extraCommands =
101+
container.execInContainer(
102+
"bash",
103+
"-c",
104+
"mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
105+
+ PG_DRIVER_JAR);
106+
Assertions.assertEquals(0, extraCommands.getExitCode());
107+
};
108+
73109
@BeforeAll
74110
@Override
75-
public void startUp() {
111+
public void startUp() throws ClassNotFoundException {
76112
Optional<URL> resource =
77113
Optional.ofNullable(HttpIT.class.getResource(getMockServerConfig()));
78114
this.mockserverContainer =
@@ -100,6 +136,22 @@ public void startUp() {
100136
Startables.deepStart(Stream.of(mockserverContainer)).join();
101137
mockServerClient = new MockServerClient("127.0.0.1", 1080);
102138
fillMockRecords();
139+
140+
postgreSQLContainer =
141+
new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
142+
.withNetwork(TestSuiteBase.NETWORK)
143+
.withNetworkAliases("postgresql")
144+
.withLogConsumer(
145+
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
146+
Startables.deepStart(Stream.of(postgreSQLContainer)).join();
147+
log.info("PostgreSQL container started");
148+
Class.forName(postgreSQLContainer.getDriverClassName());
149+
given().ignoreExceptions()
150+
.await()
151+
.atLeast(100, TimeUnit.MILLISECONDS)
152+
.pollInterval(500, TimeUnit.MILLISECONDS)
153+
.atMost(2, TimeUnit.MINUTES)
154+
.untilAsserted(this::initializeJdbcTable);
103155
}
104156

105157
private static void fillMockRecords() {
@@ -149,6 +201,71 @@ public void tearDown() {
149201
if (mockServerClient != null) {
150202
mockServerClient.close();
151203
}
204+
if (postgreSQLContainer != null) {
205+
postgreSQLContainer.stop();
206+
}
207+
}
208+
209+
@TestTemplate
210+
public void testStreamingSourceToPostgresqlSink(TestContainer container) {
211+
try {
212+
CompletableFuture.supplyAsync(
213+
() -> {
214+
try {
215+
Container.ExecResult execResult1 =
216+
container.executeJob("/http_streaming_json_to_postgresql.conf");
217+
} catch (Exception e) {
218+
log.error("Commit task exception :" + e.getMessage());
219+
throw new RuntimeException(e);
220+
}
221+
return null;
222+
});
223+
await().atMost(60000, TimeUnit.MILLISECONDS)
224+
.untilAsserted(
225+
() -> {
226+
Long count = queryCount(COUNT_QUERY);
227+
Assertions.assertTrue(
228+
count >= MAX_COUNT,
229+
"Actual value should be greater than expected value");
230+
});
231+
} finally {
232+
log.info("clear schema:{}", SINK_TABLE_1);
233+
clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_1);
234+
}
235+
}
236+
237+
private Long queryCount(String sql) {
238+
try (Connection connection = getJdbcConnection()) {
239+
ResultSet resultSet = connection.createStatement().executeQuery(sql);
240+
if (resultSet.next()) {
241+
242+
return resultSet.getLong(1);
243+
}
244+
return 0L;
245+
} catch (SQLException e) {
246+
throw new RuntimeException(e);
247+
}
248+
}
249+
250+
private Connection getJdbcConnection() throws SQLException {
251+
return DriverManager.getConnection(
252+
postgreSQLContainer.getJdbcUrl(),
253+
postgreSQLContainer.getUsername(),
254+
postgreSQLContainer.getPassword());
255+
}
256+
257+
private void executeSql(String sql) {
258+
try (Connection connection = getJdbcConnection();
259+
Statement statement = connection.createStatement()) {
260+
statement.execute("SET search_path TO inventory;");
261+
statement.execute(sql);
262+
} catch (SQLException e) {
263+
throw new RuntimeException(e);
264+
}
265+
}
266+
267+
private void clearTable(String database, String tableName) {
268+
executeSql("truncate table " + database + "." + tableName);
152269
}
153270

154271
@TestTemplate
@@ -259,6 +376,24 @@ public void testMultiTableHttp(TestContainer container)
259376
Assertions.assertIterableEquals(records, recordResponse);
260377
}
261378

379+
private void initializeJdbcTable() {
380+
try (Connection connection =
381+
DriverManager.getConnection(
382+
postgreSQLContainer.getJdbcUrl(),
383+
postgreSQLContainer.getUsername(),
384+
postgreSQLContainer.getPassword())) {
385+
Statement statement = connection.createStatement();
386+
String sink =
387+
"create table sink(\n"
388+
+ "c_String varchar(255) NOT NULL PRIMARY KEY,\n"
389+
+ "c_int INT\n"
390+
+ ")";
391+
statement.execute(sink);
392+
} catch (SQLException e) {
393+
throw new RuntimeException("Initializing PostgreSql table failed!", e);
394+
}
395+
}
396+
262397
@Getter
263398
@Setter
264399
@EqualsAndHashCode
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "STREAMING"
21+
checkpoint.interval = 5000
22+
}
23+
24+
source {
25+
Http {
26+
result_table_name = "fake"
27+
url = "http://mockserver:1080/example/http"
28+
method = "GET"
29+
format = "json"
30+
date_format="yyyy-MM-dd"
31+
datetime_format="yyyy-MM-dd'T'HH:mm:ss"
32+
time_format="HH:mm:ss"
33+
poll_interval_millis = 5000
34+
schema = {
35+
fields {
36+
c_string = string
37+
c_int = int
38+
}
39+
}
40+
}
41+
}
42+
43+
transform {
44+
Sql {
45+
source_table_name = "fake"
46+
result_table_name = "fake1"
47+
query = "select CONCAT(c_string, CAST(RAND() AS STRING)) as c_string, c_int from fake"
48+
}
49+
}
50+
51+
sink {
52+
Jdbc {
53+
source_table_name = "fake1"
54+
driver = org.postgresql.Driver
55+
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
56+
user = test
57+
password = test
58+
generate_sink_sql = true
59+
database = test
60+
table = "public.sink"
61+
primary_keys = ["c_string"]
62+
support_upsert_by_query_primary_key_exist = true
63+
batch_size = 1
64+
}
65+
}

0 commit comments

Comments
 (0)