Skip to content

Commit a31fdee

Browse files
authored
[SeaTunnel API] [Sink] remove useless context field (#2124)
1 parent ffcf3f5 commit a31fdee

File tree

12 files changed

+18
-79
lines changed

12 files changed

+18
-79
lines changed

docs/en/connector/source/Http.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ HTTP request header, json format.
3737

3838
### request_params[string]
3939

40-
HTTP request parameters, json format.
40+
HTTP request parameters, json format. Use string with escapes to save json
4141

4242
### sync_path[string]
4343

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,19 @@
1717

1818
package org.apache.seatunnel.api.sink;
1919

20-
import java.util.Map;
21-
2220
/**
2321
* The default {@link SinkWriter.Context} implement class.
2422
*/
2523
public class DefaultSinkWriterContext implements SinkWriter.Context {
26-
27-
private final Map<String, String> configuration;
2824
private final int subtask;
29-
private final int parallelism;
3025

31-
public DefaultSinkWriterContext(Map<String, String> configuration, int subtask, int parallelism) {
32-
this.configuration = configuration;
26+
public DefaultSinkWriterContext(int subtask) {
3327
this.subtask = subtask;
34-
this.parallelism = parallelism;
35-
}
36-
37-
@Override
38-
public Map<String, String> getConfiguration() {
39-
return configuration;
4028
}
4129

4230
@Override
4331
public int getIndexOfSubtask() {
4432
return subtask;
4533
}
4634

47-
@Override
48-
public int getNumberOfParallelSubtasks() {
49-
return parallelism;
50-
}
5135
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.io.Serializable;
2222
import java.util.Collections;
2323
import java.util.List;
24-
import java.util.Map;
2524
import java.util.Optional;
2625

2726
/**
@@ -76,20 +75,10 @@ default List<StateT> snapshotState(long checkpointId) throws IOException {
7675

7776
interface Context extends Serializable{
7877

79-
/**
80-
* Gets the configuration with which Job was started.
81-
*/
82-
Map<String, String> getConfiguration();
83-
8478
/**
8579
* @return The index of this subtask.
8680
*/
8781
int getIndexOfSubtask();
8882

89-
/**
90-
* @return The number of parallel Sink tasks.
91-
*/
92-
int getNumberOfParallelSubtasks();
93-
9483
}
9584
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ public boolean belongsToSubtask(Xid xid, SeaTunnelContext context, SinkWriter.Co
8080
return false;
8181
}
8282
int subtaskIndex = readNumber(xid.getGlobalTransactionId(), JOB_ID_BYTES, Integer.BYTES);
83-
if (subtaskIndex != sinkContext.getIndexOfSubtask()
84-
&& subtaskIndex <= sinkContext.getNumberOfParallelSubtasks() - 1) {
83+
if (subtaskIndex != sinkContext.getIndexOfSubtask()) {
8584
return false;
8685
}
8786
byte[] jobIdBytes = new byte[JOB_ID_BYTES];

seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636

3737
import java.net.URL;
3838
import java.util.ArrayList;
39-
import java.util.Collections;
4039
import java.util.List;
4140
import java.util.stream.Collectors;
4241

@@ -76,7 +75,7 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
7675
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
7776
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
7877
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
79-
stream.sinkTo(new FlinkSink<>(seaTunnelSink, Collections.emptyMap()));
78+
stream.sinkTo(new FlinkSink<>(seaTunnelSink));
8079
}
8180
// the sink is the last stream
8281
return null;

seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.seatunnel.api.common.SeaTunnelContext;
2121
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23-
import org.apache.seatunnel.common.config.Common;
2423
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
2524
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
2625
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -36,7 +35,6 @@
3635

3736
import java.net.URL;
3837
import java.util.ArrayList;
39-
import java.util.HashMap;
4038
import java.util.List;
4139
import java.util.stream.Collectors;
4240

@@ -74,8 +72,7 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws
7472
Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
7573
// TODO modify checkpoint location
7674
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
77-
SparkSinkInjector.inject(dataset.write(), seaTunnelSink, new HashMap<>(Common.COLLECTION_SIZE)).option(
78-
"checkpointLocation", "/tmp").save();
75+
SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save();
7976
}
8077
// the sink is the last stream
8178
return null;

seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,21 @@
3232

3333
import java.io.IOException;
3434
import java.util.List;
35-
import java.util.Map;
3635
import java.util.Optional;
3736
import java.util.stream.Collectors;
3837

39-
@SuppressWarnings("unchecked")
4038
public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommitWrapper<CommT>,
4139
FlinkWriterState<WriterStateT>, GlobalCommT> {
4240

4341
private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink;
44-
private final Map<String, String> configuration;
4542

46-
public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
47-
Map<String, String> configuration) {
43+
public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink) {
4844
this.sink = sink;
49-
this.configuration = configuration;
5045
}
5146

5247
@Override
5348
public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states) throws IOException {
54-
// TODO add subtask and parallelism.
55-
org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
56-
new DefaultSinkWriterContext(configuration, 0, 0);
49+
org.apache.seatunnel.api.sink.SinkWriter.Context stContext = new DefaultSinkWriterContext(context.getSubtaskId());
5750

5851
if (states == null || states.isEmpty()) {
5952
return new FlinkSinkWriter<>(sink.createWriter(stContext), 1, sink.getConsumedType());

seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,27 +33,24 @@
3333
import java.util.Arrays;
3434
import java.util.Collections;
3535
import java.util.List;
36-
import java.util.Map;
3736
import java.util.Objects;
3837
import java.util.stream.Collectors;
3938

4039
public class SparkDataSourceWriter<StateT, CommitInfoT, AggregatedCommitInfoT> implements DataSourceWriter {
4140

4241
protected final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
43-
protected final Map<String, String> configuration;
4442
@Nullable
4543
protected final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter;
4644

47-
SparkDataSourceWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
48-
Map<String, String> configuration) throws IOException {
45+
SparkDataSourceWriter(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink)
46+
throws IOException {
4947
this.sink = sink;
50-
this.configuration = configuration;
5148
this.sinkAggregatedCommitter = sink.createAggregatedCommitter().orElse(null);
5249
}
5350

5451
@Override
5552
public DataWriterFactory<InternalRow> createWriterFactory() {
56-
return new SparkDataWriterFactory<>(sink, configuration);
53+
return new SparkDataWriterFactory<>(sink);
5754
}
5855

5956
@Override

seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,17 @@
2828
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
2929

3030
import java.io.IOException;
31-
import java.util.Map;
3231

3332
public class SparkDataWriterFactory<CommitInfoT, StateT> implements DataWriterFactory<InternalRow> {
3433

3534
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
36-
private final Map<String, String> configuration;
37-
SparkDataWriterFactory(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink, Map<String, String> configuration) {
35+
SparkDataWriterFactory(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink) {
3836
this.sink = sink;
39-
this.configuration = configuration;
4037
}
4138

4239
@Override
4340
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
44-
// TODO use partitionID, taskId information.
45-
// TODO add subtask and parallelism.
46-
org.apache.seatunnel.api.sink.SinkWriter.Context context =
47-
new DefaultSinkWriterContext(configuration, (int) taskId, 0);
41+
org.apache.seatunnel.api.sink.SinkWriter.Context context = new DefaultSinkWriterContext((int) taskId);
4842
SinkWriter<SeaTunnelRow, CommitInfoT, StateT> writer;
4943
SinkCommitter<CommitInfoT> committer;
5044
try {

seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,18 @@
3232
import org.apache.spark.sql.types.StructType;
3333

3434
import java.io.IOException;
35-
import java.util.Map;
3635
import java.util.Optional;
3736

3837
public class SparkSink<StateT, CommitInfoT, AggregatedCommitInfoT> implements WriteSupport,
3938
StreamWriteSupport, DataSourceV2 {
4039

4140
private volatile SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
42-
private Map<String, String> configuration;
4341

4442
private void init(DataSourceOptions options) {
4543
if (sink == null) {
4644
this.sink = SerializationUtils.stringToObject(
4745
options.get("sink").orElseThrow(() -> new IllegalArgumentException("can not find sink " +
4846
"class string in DataSourceOptions")));
49-
this.configuration = SerializationUtils.stringToObject(
50-
options.get("configuration").orElseThrow(() -> new IllegalArgumentException("can not " +
51-
"find configuration class string in DataSourceOptions")));
5247
}
5348
}
5449

@@ -57,7 +52,7 @@ public StreamWriter createStreamWriter(String queryId, StructType schema, Output
5752
init(options);
5853

5954
try {
60-
return new SparkStreamWriter<>(sink, configuration);
55+
return new SparkStreamWriter<>(sink);
6156
} catch (IOException e) {
6257
throw new RuntimeException("find error when createStreamWriter", e);
6358
}
@@ -68,7 +63,7 @@ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType sche
6863
init(options);
6964

7065
try {
71-
return Optional.of(new SparkDataSourceWriter<>(sink, configuration));
66+
return Optional.of(new SparkDataSourceWriter<>(sink));
7267
} catch (IOException e) {
7368
throw new RuntimeException("find error when createStreamWriter", e);
7469
}

0 commit comments

Comments
 (0)