Skip to content

Commit 6f49ec6

Browse files
[Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on http (#5816)
--------- Co-authored-by: 80597928 <673421862@qq.com>
1 parent 8d9a0b7 commit 6f49ec6

File tree

30 files changed

+391
-172
lines changed

30 files changed

+391
-172
lines changed

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
public class HttpConfig {
2626
public static final String BASIC = "Basic";
27+
public static final String CONNECTOR_IDENTITY = "Http";
28+
2729
public static final int DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS = 100;
2830
public static final int DEFAULT_RETRY_BACKOFF_MAX_MS = 10000;
2931
public static final boolean DEFAULT_ENABLE_MULTI_LINES = false;

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

22-
import org.apache.seatunnel.api.common.PrepareFailException;
2322
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
24-
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2523
import org.apache.seatunnel.api.sink.SinkWriter;
2624
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2725
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -34,25 +32,16 @@
3432
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
3533
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
3634

37-
import com.google.auto.service.AutoService;
38-
3935
import java.io.IOException;
4036
import java.util.Map;
4137
import java.util.stream.Collectors;
4238

43-
@AutoService(SeaTunnelSink.class)
4439
public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
4540
protected final HttpParameter httpParameter = new HttpParameter();
4641
protected SeaTunnelRowType seaTunnelRowType;
4742
protected Config pluginConfig;
4843

49-
@Override
50-
public String getPluginName() {
51-
return "Http";
52-
}
53-
54-
@Override
55-
public void prepare(Config pluginConfig) throws PrepareFailException {
44+
public HttpSink(Config pluginConfig, SeaTunnelRowType rowType) {
5645
this.pluginConfig = pluginConfig;
5746
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL.key());
5847
if (!result.isSuccess()) {
@@ -81,11 +70,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
8170
entry -> String.valueOf(entry.getValue().unwrapped()),
8271
(v1, v2) -> v2)));
8372
}
73+
this.seaTunnelRowType = rowType;
8474
}
8575

8676
@Override
87-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
88-
this.seaTunnelRowType = seaTunnelRowType;
77+
public String getPluginName() {
78+
return "Http";
8979
}
9080

9181
@Override

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.seatunnel.connectors.seatunnel.http.sink;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22+
import org.apache.seatunnel.api.table.connector.TableSink;
2123
import org.apache.seatunnel.api.table.factory.Factory;
2224
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
25+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
2326
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
2427

2528
import com.google.auto.service.AutoService;
@@ -31,6 +34,13 @@ public String factoryIdentifier() {
3134
return "Http";
3235
}
3336

37+
@Override
38+
public TableSink createSink(TableSinkFactoryContext context) {
39+
CatalogTable catalogTable = context.getCatalogTable();
40+
return () ->
41+
new HttpSink(context.getOptions().toConfig(), catalogTable.getSeaTunnelRowType());
42+
}
43+
3444
@Override
3545
public OptionRule optionRule() {
3646
return OptionRule.builder()

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
2222

2323
import org.apache.seatunnel.api.common.JobContext;
24-
import org.apache.seatunnel.api.common.PrepareFailException;
2524
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2625
import org.apache.seatunnel.api.serialization.DeserializationSchema;
2726
import org.apache.seatunnel.api.source.Boundedness;
28-
import org.apache.seatunnel.api.source.SeaTunnelSource;
27+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2928
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
29+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
30+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
31+
import org.apache.seatunnel.api.table.catalog.TableSchema;
3032
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
33+
import org.apache.seatunnel.api.table.type.BasicType;
3134
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3235
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3336
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -47,34 +50,23 @@
4750
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
4851
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
4952

50-
import com.google.auto.service.AutoService;
53+
import com.google.common.collect.Lists;
5154

55+
import java.util.Collections;
56+
import java.util.List;
5257
import java.util.Locale;
5358

54-
@AutoService(SeaTunnelSource.class)
5559
public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
5660
protected final HttpParameter httpParameter = new HttpParameter();
5761
protected PageInfo pageInfo;
58-
protected SeaTunnelRowType rowType;
5962
protected JsonField jsonField;
6063
protected String contentField;
6164
protected JobContext jobContext;
6265
protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
6366

64-
@Override
65-
public String getPluginName() {
66-
return "Http";
67-
}
67+
protected CatalogTable catalogTable;
6868

69-
@Override
70-
public Boundedness getBoundedness() {
71-
return JobMode.BATCH.equals(jobContext.getJobMode())
72-
? Boundedness.BOUNDED
73-
: Boundedness.UNBOUNDED;
74-
}
75-
76-
@Override
77-
public void prepare(Config pluginConfig) throws PrepareFailException {
69+
public HttpSource(Config pluginConfig) {
7870
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL.key());
7971
if (!result.isSuccess()) {
8072
throw new HttpConnectorException(
@@ -88,6 +80,18 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
8880
buildPagingWithConfig(pluginConfig);
8981
}
9082

83+
@Override
84+
public String getPluginName() {
85+
return HttpConfig.CONNECTOR_IDENTITY;
86+
}
87+
88+
@Override
89+
public Boundedness getBoundedness() {
90+
return JobMode.BATCH.equals(jobContext.getJobMode())
91+
? Boundedness.BOUNDED
92+
: Boundedness.UNBOUNDED;
93+
}
94+
9195
private void buildPagingWithConfig(Config pluginConfig) {
9296
if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) {
9397
pageInfo = new PageInfo();
@@ -114,7 +118,7 @@ private void buildPagingWithConfig(Config pluginConfig) {
114118

115119
protected void buildSchemaWithConfig(Config pluginConfig) {
116120
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
117-
this.rowType = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
121+
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
118122
// default use json format
119123
HttpConfig.ResponseFormat format = HttpConfig.FORMAT.defaultValue();
120124
if (pluginConfig.hasPath(HttpConfig.FORMAT.key())) {
@@ -127,7 +131,8 @@ protected void buildSchemaWithConfig(Config pluginConfig) {
127131
switch (format) {
128132
case JSON:
129133
this.deserializationSchema =
130-
new JsonDeserializationSchema(false, false, rowType);
134+
new JsonDeserializationSchema(
135+
false, false, catalogTable.getSeaTunnelRowType());
131136
if (pluginConfig.hasPath(HttpConfig.JSON_FIELD.key())) {
132137
jsonField =
133138
getJsonField(pluginConfig.getConfig(HttpConfig.JSON_FIELD.key()));
@@ -145,8 +150,33 @@ protected void buildSchemaWithConfig(Config pluginConfig) {
145150
format));
146151
}
147152
} else {
148-
this.rowType = CatalogTableUtil.buildSimpleTextSchema();
149-
this.deserializationSchema = new SimpleTextDeserializationSchema(this.rowType);
153+
TableIdentifier tableIdentifier =
154+
TableIdentifier.of(HttpConfig.CONNECTOR_IDENTITY, null, null);
155+
TableSchema tableSchema =
156+
TableSchema.builder()
157+
.column(
158+
PhysicalColumn.of(
159+
"content",
160+
new SeaTunnelRowType(
161+
new String[] {"content"},
162+
new SeaTunnelDataType<?>[] {
163+
BasicType.STRING_TYPE
164+
}),
165+
0,
166+
false,
167+
null,
168+
null))
169+
.build();
170+
171+
this.catalogTable =
172+
CatalogTable.of(
173+
tableIdentifier,
174+
tableSchema,
175+
Collections.emptyMap(),
176+
Collections.emptyList(),
177+
null);
178+
this.deserializationSchema =
179+
new SimpleTextDeserializationSchema(catalogTable.getSeaTunnelRowType());
150180
}
151181
}
152182

@@ -156,8 +186,8 @@ public void setJobContext(JobContext jobContext) {
156186
}
157187

158188
@Override
159-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
160-
return this.rowType;
189+
public List<CatalogTable> getProducedCatalogTables() {
190+
return Lists.newArrayList(catalogTable);
161191
}
162192

163193
@Override

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,18 @@
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
2121
import org.apache.seatunnel.api.source.SeaTunnelSource;
22+
import org.apache.seatunnel.api.source.SourceSplit;
2223
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
24+
import org.apache.seatunnel.api.table.connector.TableSource;
2325
import org.apache.seatunnel.api.table.factory.Factory;
2426
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
27+
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
2528
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
26-
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
2729

2830
import com.google.auto.service.AutoService;
2931

32+
import java.io.Serializable;
33+
3034
@AutoService(Factory.class)
3135
public class HttpSourceFactory implements TableSourceFactory {
3236

@@ -40,16 +44,24 @@ public OptionRule optionRule() {
4044
return getHttpBuilder().build();
4145
}
4246

47+
@Override
48+
public <T, SplitT extends SourceSplit, StateT extends Serializable>
49+
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
50+
return () ->
51+
(SeaTunnelSource<T, SplitT, StateT>)
52+
new HttpSource(context.getOptions().toConfig());
53+
}
54+
4355
public OptionRule.Builder getHttpBuilder() {
4456
return OptionRule.builder()
4557
.required(HttpConfig.URL)
4658
.optional(HttpConfig.METHOD)
4759
.optional(HttpConfig.HEADERS)
4860
.optional(HttpConfig.PARAMS)
4961
.optional(HttpConfig.FORMAT)
62+
.optional(HttpConfig.BODY)
5063
.optional(HttpConfig.JSON_FIELD)
5164
.optional(HttpConfig.CONTENT_FIELD)
52-
.conditional(HttpConfig.METHOD, HttpRequestMethod.POST, HttpConfig.BODY)
5365
.conditional(
5466
HttpConfig.FORMAT,
5567
HttpConfig.ResponseFormat.JSON,

seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.feishu.sink;
1919

20-
import org.apache.seatunnel.api.sink.SeaTunnelSink;
21-
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2221

23-
import com.google.auto.service.AutoService;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
2424

25-
@AutoService(SeaTunnelSink.class)
2625
public class FeishuSink extends HttpSink {
26+
public FeishuSink(Config pluginConfig, SeaTunnelRowType rowType) {
27+
super(pluginConfig, rowType);
28+
}
29+
2730
@Override
2831
public String getPluginName() {
2932
return "Feishu";

seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,24 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.feishu.sink;
1919

20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
import org.apache.seatunnel.api.table.connector.TableSink;
2022
import org.apache.seatunnel.api.table.factory.Factory;
23+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
2124
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkFactory;
2225

2326
import com.google.auto.service.AutoService;
2427

2528
@AutoService(Factory.class)
2629
public class FeishuSinkFactory extends HttpSinkFactory {
30+
31+
@Override
32+
public TableSink createSink(TableSinkFactoryContext context) {
33+
CatalogTable catalogTable = context.getCatalogTable();
34+
return () ->
35+
new FeishuSink(context.getOptions().toConfig(), catalogTable.getSeaTunnelRowType());
36+
}
37+
2738
@Override
2839
public String factoryIdentifier() {
2940
return "Feishu";

seatunnel-connectors-v2/connector-http/connector-http-github/src/main/java/org/apache/seatunnel/connectors/seatunnel/github/source/GithubSource.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

22-
import org.apache.seatunnel.api.common.PrepareFailException;
2322
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
24-
import org.apache.seatunnel.api.source.SeaTunnelSource;
2523
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2624
import org.apache.seatunnel.common.config.CheckConfigUtil;
2725
import org.apache.seatunnel.common.config.CheckResult;
@@ -34,24 +32,17 @@
3432
import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
3533
import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
3634

37-
import com.google.auto.service.AutoService;
3835
import lombok.extern.slf4j.Slf4j;
3936

4037
@Slf4j
41-
@AutoService(SeaTunnelSource.class)
4238
public class GithubSource extends HttpSource {
4339

4440
public static final String PLUGIN_NAME = "Github";
4541

4642
private final GithubSourceParameter githubSourceParam = new GithubSourceParameter();
4743

48-
@Override
49-
public String getPluginName() {
50-
return PLUGIN_NAME;
51-
}
52-
53-
@Override
54-
public void prepare(Config pluginConfig) throws PrepareFailException {
44+
public GithubSource(Config pluginConfig) {
45+
super(pluginConfig);
5546
CheckResult result =
5647
CheckConfigUtil.checkAllExists(pluginConfig, GithubSourceConfig.URL.key());
5748
if (!result.isSuccess()) {
@@ -62,7 +53,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
6253
getPluginName(), PluginType.SOURCE, result.getMsg()));
6354
}
6455
githubSourceParam.buildWithConfig(pluginConfig);
65-
buildSchemaWithConfig(pluginConfig);
56+
}
57+
58+
@Override
59+
public String getPluginName() {
60+
return PLUGIN_NAME;
6661
}
6762

6863
@Override

0 commit comments

Comments
 (0)