diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md
index 3dbc6af8992..b5f4ef300e5 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
| bootstrap.servers | string | yes | - |
| kafka.* | kafka producer config | no | - |
| semantic | string | no | NON |
+| partition_key | string | no | - |
| partition | int | no | - |
| assign_partitions | list | no | - |
| transaction_prefix | string | no | - |
@@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b
NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
+### partition_key [string]
+
+Configure which field is used as the key of the kafka message.
+
+For example, if you want to use value of a field from upstream data as key, you can assign it to the field name.
+
+Upstream data is the following:
+
+| name | age | data |
+| ---- | ---- | ------------- |
+| Jack | 16 | data-example1 |
+| Mary | 23 | data-example2 |
+
+If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.
+
+If the field name does not exist in the upstream data, the configured parameter will be used as the key.
+
### partition [int]
We can specify the partition, all messages will be sent to this partition.
@@ -93,7 +111,9 @@ sink {
### change log
#### next version
-
+
- Add kafka sink doc
- New feature : Kafka specified partition to send
- - New feature : Determine the partition that kafka send based on the message content
+ - New feature : Determine the partition that kafka send message based on the message content
+ - New feature : Configure which field is used as the key of the kafka message
+
diff --git a/docs/en/connector-v2/sink/common-options.md b/docs/en/connector-v2/sink/common-options.md
index ac4a2e4288f..53c623086a5 100644
--- a/docs/en/connector-v2/sink/common-options.md
+++ b/docs/en/connector-v2/sink/common-options.md
@@ -5,6 +5,8 @@
| name | type | required | default value |
| ----------------- | ------ | -------- | ------------- |
| source_table_name | string | no | - |
+| parallelism | int | no | - |
+
### source_table_name [string]
@@ -12,11 +14,18 @@ When `source_table_name` is not specified, the current plug-in processes the dat
When `source_table_name` is specified, the current plug-in is processing the data set corresponding to this parameter.
+### parallelism [int]
+
+When `parallelism` is not specified, the `parallelism` in env is used by default.
+
+When parallelism is specified, it will override the parallelism in env.
+
## Examples
```bash
source {
FakeSourceStream {
+ parallelism = 2
result_table_name = "fake"
field_name = "name,age"
}
@@ -37,6 +46,7 @@ transform {
sink {
console {
+ parallelism = 3
source_table_name = "fake_name"
}
}
diff --git a/docs/en/connector-v2/source/common-options.md b/docs/en/connector-v2/source/common-options.md
index 7254c44e565..7fc32c505ee 100644
--- a/docs/en/connector-v2/source/common-options.md
+++ b/docs/en/connector-v2/source/common-options.md
@@ -5,6 +5,7 @@
| name | type | required | default value |
| ----------------- | ------ | -------- | ------------- |
| result_table_name | string | no | - |
+| parallelism | int | no | - |
### result_table_name [string]
@@ -12,6 +13,12 @@ When `result_table_name` is not specified, the data processed by this plugin wil
When `result_table_name` is specified, the data processed by this plugin will be registered as a data set `(dataStream/dataset)` that can be directly accessed by other plugins, or called a temporary table `(table)` . The data set `(dataStream/dataset)` registered here can be directly accessed by other plugins by specifying `source_table_name` .
+### parallelism [int]
+
+When `parallelism` is not specified, the `parallelism` in env is used by default.
+
+When parallelism is specified, it will override the parallelism in env.
+
## Example
```bash
diff --git a/docs/en/intro/about.md b/docs/en/intro/about.md
index 9b7a76811c3..5f38f22a9a2 100644
--- a/docs/en/intro/about.md
+++ b/docs/en/intro/about.md
@@ -64,7 +64,7 @@ SeaTunnel have lots of users which you can find more information in [users](http
-SeaTunnel enriches the CNCF CLOUD NATIVE Landscape.
+SeaTunnel enriches the CNCF CLOUD NATIVE Landscape.
## What's More
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index ad9f8e222d1..c5a24d22897 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -31,8 +31,6 @@ public final class Constants {
public static final String SOURCE_SERIALIZATION = "source.serialization";
- public static final String SOURCE_PARALLELISM = "parallelism";
-
public static final String HDFS_ROOT = "hdfs.root";
public static final String HDFS_USER = "hdfs.user";
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index 9354130bb9e..652f7e6ac98 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -25,9 +25,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -39,7 +39,7 @@ public class FileSystemUtils {
public static Configuration CONF;
public static FileSystem getFileSystem(@NonNull String path) throws IOException {
- FileSystem fileSystem = FileSystem.get(new File(path).toURI(), CONF);
+ FileSystem fileSystem = FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), CONF);
fileSystem.setWriteChecksum(false);
return fileSystem;
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index fdfbc9756fe..bdcd2e02961 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -39,6 +39,9 @@
import org.apache.hadoop.fs.Path;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
@@ -47,6 +50,18 @@
@Slf4j
public abstract class AbstractReadStrategy implements ReadStrategy {
+ protected static final String[] TYPE_ARRAY_STRING = new String[0];
+ protected static final Boolean[] TYPE_ARRAY_BOOLEAN = new Boolean[0];
+ protected static final Byte[] TYPE_ARRAY_BYTE = new Byte[0];
+ protected static final Short[] TYPE_ARRAY_SHORT = new Short[0];
+ protected static final Integer[] TYPE_ARRAY_INTEGER = new Integer[0];
+ protected static final Long[] TYPE_ARRAY_LONG = new Long[0];
+ protected static final Float[] TYPE_ARRAY_FLOAT = new Float[0];
+ protected static final Double[] TYPE_ARRAY_DOUBLE = new Double[0];
+ protected static final BigDecimal[] TYPE_ARRAY_BIG_DECIMAL = new BigDecimal[0];
+ protected static final LocalDate[] TYPE_ARRAY_LOCAL_DATE = new LocalDate[0];
+ protected static final LocalDateTime[] TYPE_ARRAY_LOCAL_DATETIME = new LocalDateTime[0];
+
protected HadoopConf hadoopConf;
protected SeaTunnelRowType seaTunnelRowType;
protected SeaTunnelRowType seaTunnelRowTypeWithPartition;
@@ -142,7 +157,7 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea
return seaTunnelRowType;
}
// get all names of partitions fields
- String[] partitionNames = partitionsMap.keySet().toArray(new String[0]);
+ String[] partitionNames = partitionsMap.keySet().toArray(TYPE_ARRAY_STRING);
// initialize data type for partition fields
SeaTunnelDataType>[] partitionTypes = new SeaTunnelDataType>[partitionNames.length];
Arrays.fill(partitionTypes, BasicType.STRING_TYPE);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 9b81950de9a..52b2abdd2ed 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -58,7 +58,6 @@
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.LocalDate;
-import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -236,7 +235,7 @@ private SeaTunnelDataType> orcDataType2SeaTunnelDataType(TypeDescription typeD
return new MapType<>(orcDataType2SeaTunnelDataType(keyType), orcDataType2SeaTunnelDataType(valueType));
case STRUCT:
List children = typeDescription.getChildren();
- String[] fieldNames = typeDescription.getFieldNames().toArray(new String[0]);
+ String[] fieldNames = typeDescription.getFieldNames().toArray(TYPE_ARRAY_STRING);
SeaTunnelDataType>[] fieldTypes = children.stream().map(this::orcDataType2SeaTunnelDataType).toArray(SeaTunnelDataType>[]::new);
return new SeaTunnelRowType(fieldNames, fieldTypes);
default:
@@ -533,15 +532,15 @@ private Object[] readLongListVector(LongColumnVector longVector, TypeDescription
}
}
if (childType.getCategory() == TypeDescription.Category.BOOLEAN) {
- return longList.toArray(new Boolean[0]);
+ return longList.toArray(TYPE_ARRAY_BOOLEAN);
} else if (childType.getCategory() == TypeDescription.Category.INT) {
- return longList.toArray(new Integer[0]);
+ return longList.toArray(TYPE_ARRAY_INTEGER);
} else if (childType.getCategory() == TypeDescription.Category.BYTE) {
- return longList.toArray(new Byte[0]);
+ return longList.toArray(TYPE_ARRAY_BYTE);
} else if (childType.getCategory() == TypeDescription.Category.SHORT) {
- return longList.toArray(new Short[0]);
+ return longList.toArray(TYPE_ARRAY_SHORT);
} else {
- return longList.toArray(new Long[0]);
+ return longList.toArray(TYPE_ARRAY_LONG);
}
}
@@ -567,9 +566,9 @@ private Object[] readDoubleListVector(DoubleColumnVector doubleVec, TypeDescript
}
}
if (colType.getCategory() == TypeDescription.Category.FLOAT) {
- return doubleList.toArray(new Float[0]);
+ return doubleList.toArray(TYPE_ARRAY_FLOAT);
} else {
- return doubleList.toArray(new Double[0]);
+ return doubleList.toArray(TYPE_ARRAY_DOUBLE);
}
}
@@ -599,7 +598,7 @@ private Object[] readBytesListVector(BytesColumnVector bytesVec, TypeDescription
}
}
if (childType.getCategory() == TypeDescription.Category.STRING) {
- return bytesValList.toArray(new String[0]);
+ return bytesValList.toArray(TYPE_ARRAY_STRING);
} else {
return bytesValList.toArray();
}
@@ -622,7 +621,7 @@ private Object[] readDecimalListVector(DecimalColumnVector decimalVector, int of
decimalList.add(null);
}
}
- return decimalList.toArray(new BigDecimal[0]);
+ return decimalList.toArray(TYPE_ARRAY_BIG_DECIMAL);
}
private Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
@@ -651,9 +650,9 @@ private Object[] readTimestampListVector(TimestampColumnVector timestampVector,
}
}
if (childType.getCategory() == TypeDescription.Category.DATE) {
- return timestampList.toArray(new LocalDate[0]);
+ return timestampList.toArray(TYPE_ARRAY_LOCAL_DATE);
} else {
- return timestampList.toArray(new LocalDateTime[0]);
+ return timestampList.toArray(TYPE_ARRAY_LOCAL_DATETIME);
}
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 442e4cfd7f9..52df095a45d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -114,6 +114,9 @@ public void read(String path, Collector output) throws Exception {
}
private Object resolveObject(Object field, SeaTunnelDataType> fieldType) {
+ if (field == null) {
+ return null;
+ }
switch (fieldType.getSqlType()) {
case ARRAY:
ArrayList origArray = new ArrayList<>();
@@ -121,21 +124,21 @@ private Object resolveObject(Object field, SeaTunnelDataType> fieldType) {
SeaTunnelDataType> elementType = ((ArrayType, ?>) fieldType).getElementType();
switch (elementType.getSqlType()) {
case STRING:
- return origArray.toArray(new String[0]);
+ return origArray.toArray(TYPE_ARRAY_STRING);
case BOOLEAN:
- return origArray.toArray(new Boolean[0]);
+ return origArray.toArray(TYPE_ARRAY_BOOLEAN);
case TINYINT:
- return origArray.toArray(new Byte[0]);
+ return origArray.toArray(TYPE_ARRAY_BYTE);
case SMALLINT:
- return origArray.toArray(new Short[0]);
+ return origArray.toArray(TYPE_ARRAY_SHORT);
case INT:
- return origArray.toArray(new Integer[0]);
+ return origArray.toArray(TYPE_ARRAY_INTEGER);
case BIGINT:
- return origArray.toArray(new Long[0]);
+ return origArray.toArray(TYPE_ARRAY_LONG);
case FLOAT:
- return origArray.toArray(new Float[0]);
+ return origArray.toArray(TYPE_ARRAY_FLOAT);
case DOUBLE:
- return origArray.toArray(new Double[0]);
+ return origArray.toArray(TYPE_ARRAY_DOUBLE);
default:
String errorMsg = String.format("SeaTunnel array type not support this type [%s] now", fieldType.getSqlType());
throw new UnsupportedOperationException(errorMsg);
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 09eaa067464..2dedcd13df6 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -85,4 +85,9 @@ public class Config {
*/
public static final String ASSIGN_PARTITIONS = "assign_partitions";
+ /**
+ * Determine the key of the kafka send partition
+ */
+ public static final String PARTITION_KEY = "partition_key";
+
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 29599bbdbe7..24a2b242f69 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -48,4 +48,11 @@ public ProducerRecord serializeRow(SeaTunnelRow row) {
return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
}
}
+
+ @Override
+ public ProducerRecord serializeRowByKey(String key, SeaTunnelRow row) {
+ //if the key is null, kafka will send message to a random partition
+ return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
+ }
+
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
index 9f12591ea2d..d96753155d5 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -30,4 +30,13 @@ public interface SeaTunnelRowSerializer {
* @return kafka record.
*/
ProducerRecord serializeRow(SeaTunnelRow row);
+
+ /**
+ * Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
+ *
+ * @param key String
+ * @param row seatunnel row
+ * @return kafka record.
+ */
+ ProducerRecord serializeRowByKey(String key, SeaTunnelRow row);
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 1f61482e785..9712ba33162 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -19,6 +19,7 @@
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
@@ -38,10 +39,12 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
+import java.util.function.Function;
/**
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka.
@@ -50,6 +53,7 @@ public class KafkaSinkWriter implements SinkWriter partitionExtractor;
private String transactionPrefix;
private long lastCheckpointId = 0;
@@ -63,7 +67,15 @@ public class KafkaSinkWriter implements SinkWriter producerRecord = seaTunnelRowSerializer.serializeRow(element);
+ ProducerRecord producerRecord = null;
+ //Determine the partition of the kafka send message based on the field name
+ if (pluginConfig.hasPath(PARTITION_KEY)){
+ String key = partitionExtractor.apply(element);
+ producerRecord = seaTunnelRowSerializer.serializeRowByKey(key, element);
+ }
+ else {
+ producerRecord = seaTunnelRowSerializer.serializeRow(element);
+ }
kafkaProducerSender.send(producerRecord);
}
@@ -74,6 +86,7 @@ public KafkaSinkWriter(
List kafkaStates) {
this.context = context;
this.pluginConfig = pluginConfig;
+ this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType);
if (pluginConfig.hasPath(PARTITION)) {
this.partition = pluginConfig.getInt(PARTITION);
}
@@ -175,4 +188,20 @@ private void restoreState(List states) {
}
}
+ private Function createPartitionExtractor(Config pluginConfig,
+ SeaTunnelRowType seaTunnelRowType) {
+ String partitionKey = pluginConfig.getString(PARTITION_KEY);
+ List fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
+ if (!fieldNames.contains(partitionKey)) {
+ return row -> partitionKey;
+ }
+ int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
+ return row -> {
+ Object partitionFieldValue = row.getField(partitionFieldIndex);
+ if (partitionFieldValue != null) {
+ return partitionFieldValue.toString();
+ }
+ return null;
+ };
+ }
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index a7c48832b66..eb0517cfbb2 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,6 +21,7 @@
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -32,6 +33,7 @@
import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.types.Row;
import java.net.URL;
@@ -76,7 +78,11 @@ public List> execute(List> upstreamDataStreams)
SeaTunnelSink seaTunnelSink = plugins.get(i);
DataStream stream = fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
- stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
+ DataStreamSink dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
+ if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ int parallelism = sinkConfig.getInt(CollectionConstants.PARALLELISM);
+ dataStreamSink.setParallelism(parallelism);
+ }
}
// the sink is the last stream
return null;
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 1dc378a835e..0ae0f151d35 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -22,6 +22,7 @@
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -75,6 +76,10 @@ public List> execute(List> upstreamDataStreams)
"SeaTunnel " + internalSource.getClass().getSimpleName(),
internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
Config pluginConfig = pluginConfigs.get(i);
+ if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ int parallelism = pluginConfig.getInt(CollectionConstants.PARALLELISM);
+ sourceStream.setParallelism(parallelism);
+ }
registerResultTable(pluginConfig, sourceStream);
sources.add(sourceStream);
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index fbcecd7c820..c5d8276fefb 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -20,6 +20,7 @@
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -71,6 +72,13 @@ public List> execute(List> upstreamDataStreams) throws
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
+ int parallelism;
+ if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ parallelism = sinkConfig.getInt(CollectionConstants.PARALLELISM);
+ } else {
+ parallelism = sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
+ }
+ dataset.sparkSession().read().option(CollectionConstants.PARALLELISM, parallelism);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save();
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index eda67291486..6f2f7901cb2 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -20,6 +20,7 @@
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -56,15 +57,15 @@ public List> execute(List> upstreamDataStreams) {
SeaTunnelSource, ?, ?> source = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
int parallelism;
- if (pluginConfig.hasPath(Constants.SOURCE_PARALLELISM)) {
- parallelism = pluginConfig.getInt(Constants.SOURCE_PARALLELISM);
+ if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ parallelism = pluginConfig.getInt(CollectionConstants.PARALLELISM);
} else {
- parallelism = sparkEnvironment.getSparkConf().getInt(Constants.SOURCE_PARALLELISM, 1);
+ parallelism = sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
}
Dataset dataset = sparkEnvironment.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option(Constants.SOURCE_PARALLELISM, parallelism)
+ .option(CollectionConstants.PARALLELISM, parallelism)
.option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
.schema((StructType) TypeConverterUtils.convert(source.getProducedType())).load();
sources.add(dataset);
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index cc8bd323873..938ca8f9702 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -73,12 +73,12 @@ public void execute() throws CommandExecuteException {
} catch (ExecutionException | InterruptedException e) {
throw new CommandExecuteException("SeaTunnel job executed failed", e);
} finally {
- if (instance != null) {
- instance.shutdown();
- }
if (clientJobProxy != null) {
clientJobProxy.close();
}
+ if (instance != null) {
+ instance.shutdown();
+ }
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index e85cff0b9dc..75b692302af 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -95,7 +95,6 @@ public void init() {
nodeEngine.getClusterService().getThisAddress());
sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join();
} catch (Exception e) {
- LOGGER.warning(e);
LOGGER.warning("failed send heartbeat to resource manager, will retry later. this address: " +
nodeEngine.getClusterService().getThisAddress());
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 437ecd74b7e..b76bf208a3c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -181,7 +181,7 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex
FlowLifeCycle lifeCycle;
List>> flowLifeCycles = new ArrayList<>();
if (!flow.getNext().isEmpty()) {
- for (Flow f : executionFlow.getNext()) {
+ for (Flow f : flow.getNext()) {
flowLifeCycles.add((OneInputFlowLifeCycle>) convertFlowToActionLifeCycle(f));
}
}
@@ -203,7 +203,7 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex
new SeaTunnelTransformCollector(flowLifeCycles), completableFuture);
} else if (f.getAction() instanceof PartitionTransformAction) {
// TODO use index and taskID to create ringbuffer list
- if (executionFlow.getNext().isEmpty()) {
+ if (flow.getNext().isEmpty()) {
lifeCycle = new PartitionTransformSinkFlowLifeCycle(this, completableFuture);
} else {
lifeCycle = new PartitionTransformSourceFlowLifeCycle(this, completableFuture);
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 212ec1cdfdb..aea7d4c8c50 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -28,32 +28,35 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
- FakeSource {
- result_table_name = "fake"
- row.num = 16
- schema = {
- fields {
- name = "string"
- age = "int"
- }
+ FakeSource {
+ parallelism = 2
+ result_table_name = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
}
}
+ }
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
- sql {
- sql = "select name,age from fake"
- }
+ sql {
+ sql = "select name,age from fake"
+ }
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
- Console {}
+ Console {
+ parallelism = 3
+ }
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 4adb2a9ff2f..7692598da63 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -24,7 +24,7 @@ env {
# see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
#job.mode = BATCH
job.name = "SeaTunnel"
- spark.executor.instances = 2
+ spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
@@ -34,6 +34,7 @@ source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
row.num = 16
+ parallelism = 2
schema = {
fields {
c_map = "map"
@@ -82,7 +83,9 @@ transform {
sink {
# choose stdout output plugin to output data to console
- Console {}
+ Console {
+ parallelism = 2
+ }
# you can also you other output plugins, such as sql
# hdfs {
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 22a4859aa5d..69b22a6254a 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -20,6 +20,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
import org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
@@ -59,14 +60,14 @@ public DataSourceReader createReader(StructType rowType, DataSourceOptions optio
@Override
public DataSourceReader createReader(DataSourceOptions options) {
SeaTunnelSource seaTunnelSource = getSeaTunnelSource(options);
- int parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
+ int parallelism = options.getInt(CollectionConstants.PARALLELISM, 1);
return new BatchSourceReader(seaTunnelSource, parallelism);
}
@Override
public MicroBatchReader createMicroBatchReader(Optional rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
SeaTunnelSource seaTunnelSource = getSeaTunnelSource(options);
- Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
+ Integer parallelism = options.getInt(CollectionConstants.PARALLELISM, 1);
Integer checkpointInterval = options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state");
Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();