Skip to content

Commit cbf82f7

Browse files
authored
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)
* remove singleton pattern for SeaTunnelContext * rename SeaTunnelContext to JobContext and fix checkstyle * fix checkstyle * put the constructor on top
1 parent b38321e commit cbf82f7

File tree

49 files changed

+156
-213
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+156
-213
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java renamed to seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,21 @@
2828
import java.util.concurrent.ConcurrentHashMap;
2929

3030
/**
31-
* This class is used to store the context of the application. e.g. the table schema, catalog...etc.
31+
* This class is used to store the context of the job. e.g. the table schema, catalog...etc.
3232
*/
33-
public final class SeaTunnelContext implements Serializable {
33+
public final class JobContext implements Serializable {
3434

3535
private static final long serialVersionUID = -1L;
3636

37-
private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
38-
3937
// tableName -> tableSchema
4038
private final Map<String, TableSchema> tableSchemaMap = new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
4139

4240
private JobMode jobMode;
4341

4442
private final String jobId;
4543

46-
public static SeaTunnelContext getContext() {
47-
return INSTANCE;
44+
public JobContext() {
45+
this.jobId = UUID.randomUUID().toString().replace("-", "");
4846
}
4947

5048
/**
@@ -67,7 +65,7 @@ public Optional<TableSchema> getSchema(String tableName) {
6765
return Optional.ofNullable(tableSchemaMap.get(tableName));
6866
}
6967

70-
public SeaTunnelContext setJobMode(JobMode jobMode) {
68+
public JobContext setJobMode(JobMode jobMode) {
7169
this.jobMode = jobMode;
7270
return this;
7371
}
@@ -80,8 +78,4 @@ public String getJobId() {
8078
return this.jobId;
8179
}
8280

83-
private SeaTunnelContext() {
84-
this.jobId = UUID.randomUUID().toString().replace("-", "");
85-
}
86-
8781
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
2121
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
2222
import org.apache.seatunnel.api.serialization.Serializer;
23-
import org.apache.seatunnel.api.source.SeaTunnelContextAware;
23+
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
2424
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2525
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2626

@@ -43,7 +43,7 @@
4343
* {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
4444
*/
4545
public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
46-
extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelContextAware {
46+
extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelJobAware {
4747

4848
/**
4949
* Set the row type info of sink row data. This method will be automatically called by translation.

seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java renamed to seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelJobAware.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package org.apache.seatunnel.api.source;
1919

20-
import org.apache.seatunnel.api.common.SeaTunnelContext;
20+
import org.apache.seatunnel.api.common.JobContext;
2121

2222
/**
23-
* This interface defines the runtime environment of the SeaTunnel application.
23+
* This interface defines the runtime environment of the SeaTunnel job.
2424
*/
25-
public interface SeaTunnelContextAware {
25+
public interface SeaTunnelJobAware {
2626

27-
default void setSeaTunnelContext(SeaTunnelContext seaTunnelContext){
27+
default void setJobContext(JobContext jobContext){
2828
// nothing
2929
}
3030
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* @param <StateT> The type of checkpoint states.
3535
*/
3636
public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends Serializable>
37-
extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelContextAware {
37+
extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelJobAware {
3838

3939
/**
4040
* Get the boundedness of this source.

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
1919

20-
import org.apache.seatunnel.api.common.SeaTunnelContext;
2120
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2221
import org.apache.seatunnel.api.sink.SinkWriter;
2322
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -41,7 +40,6 @@
4140
@AutoService(SeaTunnelSink.class)
4241
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
4342
private static final String RULES = "rules";
44-
private SeaTunnelContext seaTunnelContext;
4543
private SeaTunnelRowType seaTunnelRowType;
4644
private List<AssertFieldRule> assertFieldRules;
4745

@@ -73,11 +71,6 @@ public void prepare(Config pluginConfig) {
7371
assertFieldRules = new AssertRuleParser().parseRules(configList);
7472
}
7573

76-
@Override
77-
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
78-
this.seaTunnelContext = seaTunnelContext;
79-
}
80-
8174
@Override
8275
public String getPluginName() {
8376
return "Assert";

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
3030

3131
import org.apache.seatunnel.api.common.PrepareFailException;
32-
import org.apache.seatunnel.api.common.SeaTunnelContext;
3332
import org.apache.seatunnel.api.serialization.DefaultSerializer;
3433
import org.apache.seatunnel.api.serialization.Serializer;
3534
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -67,7 +66,6 @@
6766
@AutoService(SeaTunnelSink.class)
6867
public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
6968

70-
private SeaTunnelContext seaTunnelContext;
7169
private ReaderOption option;
7270

7371
@Override
@@ -194,8 +192,4 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
194192
return this.option.getSeaTunnelRowType();
195193
}
196194

197-
@Override
198-
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
199-
this.seaTunnelContext = seaTunnelContext;
200-
}
201195
}

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
3131

3232
import org.apache.seatunnel.api.common.PrepareFailException;
33-
import org.apache.seatunnel.api.common.SeaTunnelContext;
3433
import org.apache.seatunnel.api.sink.SeaTunnelSink;
3534
import org.apache.seatunnel.api.sink.SinkWriter;
3635
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -65,7 +64,6 @@
6564
@AutoService(SeaTunnelSink.class)
6665
public class ClickhouseFileSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
6766

68-
private SeaTunnelContext seaTunnelContext;
6967
private FileReaderOption readerOption;
7068

7169
@Override
@@ -140,9 +138,4 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
140138
public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> createWriter(SinkWriter.Context context) throws IOException {
141139
return new ClickhouseFileSinkWriter(readerOption, context);
142140
}
143-
144-
@Override
145-
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
146-
this.seaTunnelContext = seaTunnelContext;
147-
}
148141
}

seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
21+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
22+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
23+
2024
import com.fasterxml.jackson.databind.JsonNode;
2125
import com.fasterxml.jackson.databind.ObjectMapper;
2226
import org.apache.commons.lang3.StringUtils;
@@ -27,9 +31,6 @@
2731
import org.apache.http.client.CredentialsProvider;
2832
import org.apache.http.impl.client.BasicCredentialsProvider;
2933
import org.apache.http.util.EntityUtils;
30-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
31-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
32-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
3334
import org.elasticsearch.client.Request;
3435
import org.elasticsearch.client.Response;
3536
import org.elasticsearch.client.RestClient;
@@ -40,13 +41,14 @@
4041

4142
public class EsRestClient {
4243

43-
private static EsRestClient esRestClient;
44-
private static RestClient restClient;
44+
private static EsRestClient ES_REST_CLIENT;
45+
private static RestClient REST_CLIENT;
4546

4647
private EsRestClient() {
4748

4849
}
4950

51+
@SuppressWarnings("checkstyle:MagicNumber")
5052
private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
5153
HttpHost[] httpHosts = new HttpHost[hosts.size()];
5254
for (int i = 0; i < hosts.size(); i++) {
@@ -68,19 +70,19 @@ private static RestClientBuilder getRestClientBuilder(List<String> hosts, String
6870
}
6971

7072
public static EsRestClient getInstance(List<String> hosts, String username, String password) {
71-
if (restClient == null) {
73+
if (REST_CLIENT == null) {
7274
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
73-
restClient = restClientBuilder.build();
74-
esRestClient = new EsRestClient();
75+
REST_CLIENT = restClientBuilder.build();
76+
ES_REST_CLIENT = new EsRestClient();
7577
}
76-
return esRestClient;
78+
return ES_REST_CLIENT;
7779
}
7880

7981
public BulkResponse bulk(String requestBody) {
8082
Request request = new Request("POST", "_bulk");
8183
request.setJsonEntity(requestBody);
8284
try {
83-
Response response = restClient.performRequest(request);
85+
Response response = REST_CLIENT.performRequest(request);
8486
if (response == null) {
8587
throw new BulkElasticsearchException("bulk es Response is null");
8688
}
@@ -105,7 +107,7 @@ public BulkResponse bulk(String requestBody) {
105107
public static String getClusterVersion() {
106108
Request request = new Request("GET", "/");
107109
try {
108-
Response response = restClient.performRequest(request);
110+
Response response = REST_CLIENT.performRequest(request);
109111
String result = EntityUtils.toString(response.getEntity());
110112
ObjectMapper objectMapper = new ObjectMapper();
111113
JsonNode jsonNode = objectMapper.readTree(result);
@@ -117,7 +119,7 @@ public static String getClusterVersion() {
117119
}
118120

119121
public void close() throws IOException {
120-
restClient.close();
122+
REST_CLIENT.close();
121123
}
122124

123125
}

seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ public class SinkConfig {
3535

3636
public static final String MAX_RETRY_SIZE = "max_retry_size";
3737

38-
public static void setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig){
39-
if(pluginConfig.hasPath(MAX_BATCH_SIZE)){
38+
public static void setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
39+
if (pluginConfig.hasPath(MAX_BATCH_SIZE)) {
4040
BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
4141
}
42-
if(pluginConfig.hasPath(MAX_RETRY_SIZE)){
42+
if (pluginConfig.hasPath(MAX_RETRY_SIZE)) {
4343
BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
4444
}
4545
}

seatunnel-connectors-v2/connector-elasticsearch/src/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ public class BulkConfig {
2727
* once bulk es include max document size
2828
* {@link SinkConfig#MAX_BATCH_SIZE}
2929
*/
30+
@SuppressWarnings("checkstyle:MagicNumber")
3031
public static int MAX_BATCH_SIZE = 10;
3132

3233
/**
3334
* the max retry size of bulk es
3435
* {@link SinkConfig#MAX_RETRY_SIZE}
3536
*/
37+
@SuppressWarnings("checkstyle:MagicNumber")
3638
public static int MAX_RETRY_SIZE = 3;
3739
}

0 commit comments

Comments
 (0)