From 40f2fb64a68cb95f173b884916c5b4f38114915d Mon Sep 17 00:00:00 2001 From: hailin0 Date: Tue, 18 Oct 2022 00:18:57 +0800 Subject: [PATCH 1/8] [Hotfix][seatunnel-engine-server] Fix convertFlowToActionLifeCycle StackOverflowError (#3121) --- .../apache/seatunnel/engine/server/task/SeaTunnelTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java index 437ecd74b7e..b76bf208a3c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java @@ -181,7 +181,7 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex FlowLifeCycle lifeCycle; List>> flowLifeCycles = new ArrayList<>(); if (!flow.getNext().isEmpty()) { - for (Flow f : executionFlow.getNext()) { + for (Flow f : flow.getNext()) { flowLifeCycles.add((OneInputFlowLifeCycle>) convertFlowToActionLifeCycle(f)); } } @@ -203,7 +203,7 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex new SeaTunnelTransformCollector(flowLifeCycles), completableFuture); } else if (f.getAction() instanceof PartitionTransformAction) { // TODO use index and taskID to create ringbuffer list - if (executionFlow.getNext().isEmpty()) { + if (flow.getNext().isEmpty()) { lifeCycle = new PartitionTransformSinkFlowLifeCycle(this, completableFuture); } else { lifeCycle = new PartitionTransformSourceFlowLifeCycle(this, completableFuture); From 0fd8da9c1c414576dc4f9a164e3b862639f23965 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 18 Oct 2022 00:20:14 +0800 Subject: [PATCH 2/8] [Hotfix][SeaTunnel-Engine][Log] fix log show problem (#3118) --- .../starter/seatunnel/command/ClientExecuteCommand.java | 6 +++--- .../engine/server/service/slot/DefaultSlotService.java | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index cc8bd323873..938ca8f9702 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -73,12 +73,12 @@ public void execute() throws CommandExecuteException { } catch (ExecutionException | InterruptedException e) { throw new CommandExecuteException("SeaTunnel job executed failed", e); } finally { - if (instance != null) { - instance.shutdown(); - } if (clientJobProxy != null) { clientJobProxy.close(); } + if (instance != null) { + instance.shutdown(); + } } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java index e85cff0b9dc..75b692302af 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java @@ -95,7 +95,6 @@ public void init() { nodeEngine.getClusterService().getThisAddress()); sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join(); } catch (Exception e) { - LOGGER.warning(e); LOGGER.warning("failed send heartbeat to resource manager, will retry later. this address: " + nodeEngine.getClusterService().getThisAddress()); } From 385e1f42c04dd3d07abdb410ca310338a0d91c12 Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Tue, 18 Oct 2022 09:59:51 +0800 Subject: [PATCH 3/8] [feature][connector][kafka] Support extract partition from SeaTunnelRow fields (#3085) --- docs/en/connector-v2/sink/Kafka.md | 24 ++++++++++++-- .../seatunnel/kafka/config/Config.java | 5 +++ .../DefaultSeaTunnelRowSerializer.java | 7 +++++ .../serialize/SeaTunnelRowSerializer.java | 9 ++++++ .../seatunnel/kafka/sink/KafkaSinkWriter.java | 31 ++++++++++++++++++- 5 files changed, 73 insertions(+), 3 deletions(-) diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index 3dbc6af8992..b5f4ef300e5 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on | bootstrap.servers | string | yes | - | | kafka.* | kafka producer config | no | - | | semantic | string | no | NON | +| partition_key | string | no | - | | partition | int | no | - | | assign_partitions | list | no | - | | transaction_prefix | string | no | - | @@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated. +### partition_key [string] + +Configure which field is used as the key of the kafka message. + +For example, if you want to use value of a field from upstream data as key, you can assign it to the field name. + +Upstream data is the following: + +| name | age | data | +| ---- | ---- | ------------- | +| Jack | 16 | data-example1 | +| Mary | 23 | data-example2 | + +If name is set as the key, then the hash value of the name column will determine which partition the message is sent to. + +If the field name does not exist in the upstream data, the configured parameter will be used as the key. + ### partition [int] We can specify the partition, all messages will be sent to this partition. @@ -93,7 +111,9 @@ sink { ### change log #### next version - + - Add kafka sink doc - New feature : Kafka specified partition to send - - New feature : Determine the partition that kafka send based on the message content + - New feature : Determine the partition that kafka send message based on the message content + - New feature : Configure which field is used as the key of the kafka message + diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index 0502afda330..d577b2badfa 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -59,4 +59,9 @@ public class Config { * Determine the partition to send based on the content of the message. */ public static final String ASSIGN_PARTITIONS = "assign_partitions"; + + /** + * Determine the key of the kafka send partition + */ + public static final String PARTITION_KEY = "partition_key"; } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java index 29599bbdbe7..24a2b242f69 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java @@ -48,4 +48,11 @@ public ProducerRecord serializeRow(SeaTunnelRow row) { return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row)); } } + + @Override + public ProducerRecord serializeRowByKey(String key, SeaTunnelRow row) { + //if the key is null, kafka will send message to a random partition + return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row)); + } + } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java index 9f12591ea2d..d96753155d5 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java @@ -30,4 +30,13 @@ public interface SeaTunnelRowSerializer { * @return kafka record. */ ProducerRecord serializeRow(SeaTunnelRow row); + + /** + * Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}. + * + * @param key String + * @param row seatunnel row + * @return kafka record. + */ + ProducerRecord serializeRowByKey(String key, SeaTunnelRow row); } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java index 1f61482e785..9712ba33162 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java @@ -19,6 +19,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX; @@ -38,10 +39,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.Random; +import java.util.function.Function; /** * KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka. @@ -50,6 +53,7 @@ public class KafkaSinkWriter implements SinkWriter partitionExtractor; private String transactionPrefix; private long lastCheckpointId = 0; @@ -63,7 +67,15 @@ public class KafkaSinkWriter implements SinkWriter producerRecord = seaTunnelRowSerializer.serializeRow(element); + ProducerRecord producerRecord = null; + //Determine the partition of the kafka send message based on the field name + if (pluginConfig.hasPath(PARTITION_KEY)){ + String key = partitionExtractor.apply(element); + producerRecord = seaTunnelRowSerializer.serializeRowByKey(key, element); + } + else { + producerRecord = seaTunnelRowSerializer.serializeRow(element); + } kafkaProducerSender.send(producerRecord); } @@ -74,6 +86,7 @@ public KafkaSinkWriter( List kafkaStates) { this.context = context; this.pluginConfig = pluginConfig; + this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType); if (pluginConfig.hasPath(PARTITION)) { this.partition = pluginConfig.getInt(PARTITION); } @@ -175,4 +188,20 @@ private void restoreState(List states) { } } + private Function createPartitionExtractor(Config pluginConfig, + SeaTunnelRowType seaTunnelRowType) { + String partitionKey = pluginConfig.getString(PARTITION_KEY); + List fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames()); + if (!fieldNames.contains(partitionKey)) { + return row -> partitionKey; + } + int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey); + return row -> { + Object partitionFieldValue = row.getField(partitionFieldIndex); + if (partitionFieldValue != null) { + return partitionFieldValue.toString(); + } + return null; + }; + } } From ba99de08c825764e800f575956734fc29880f7bc Mon Sep 17 00:00:00 2001 From: Harvey Yue Date: Tue, 18 Oct 2022 10:01:47 +0800 Subject: [PATCH 4/8] [Hotfix][Connector-V2][File] Fix ParquetReadStrategy get NPE (#3122) --- .../seatunnel/file/source/reader/ParquetReadStrategy.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java index 442e4cfd7f9..0789985d29c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java @@ -114,6 +114,9 @@ public void read(String path, Collector output) throws Exception { } private Object resolveObject(Object field, SeaTunnelDataType fieldType) { + if (field == null) { + return null; + } switch (fieldType.getSqlType()) { case ARRAY: ArrayList origArray = new ArrayList<>(); From da0646ac6d30be13fed15096c8a208f4f0cc7acf Mon Sep 17 00:00:00 2001 From: hailin0 Date: Tue, 18 Oct 2022 10:02:37 +0800 Subject: [PATCH 5/8] [Improve][Connector-v2][file] Reuse array type container when read row data (#3123) --- .../source/reader/AbstractReadStrategy.java | 17 ++++++++++++- .../file/source/reader/OrcReadStrategy.java | 25 +++++++++---------- .../source/reader/ParquetReadStrategy.java | 16 ++++++------ 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index fdfbc9756fe..bdcd2e02961 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -39,6 +39,9 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; @@ -47,6 +50,18 @@ @Slf4j public abstract class AbstractReadStrategy implements ReadStrategy { + protected static final String[] TYPE_ARRAY_STRING = new String[0]; + protected static final Boolean[] TYPE_ARRAY_BOOLEAN = new Boolean[0]; + protected static final Byte[] TYPE_ARRAY_BYTE = new Byte[0]; + protected static final Short[] TYPE_ARRAY_SHORT = new Short[0]; + protected static final Integer[] TYPE_ARRAY_INTEGER = new Integer[0]; + protected static final Long[] TYPE_ARRAY_LONG = new Long[0]; + protected static final Float[] TYPE_ARRAY_FLOAT = new Float[0]; + protected static final Double[] TYPE_ARRAY_DOUBLE = new Double[0]; + protected static final BigDecimal[] TYPE_ARRAY_BIG_DECIMAL = new BigDecimal[0]; + protected static final LocalDate[] TYPE_ARRAY_LOCAL_DATE = new LocalDate[0]; + protected static final LocalDateTime[] TYPE_ARRAY_LOCAL_DATETIME = new LocalDateTime[0]; + protected HadoopConf hadoopConf; protected SeaTunnelRowType seaTunnelRowType; protected SeaTunnelRowType seaTunnelRowTypeWithPartition; @@ -142,7 +157,7 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea return seaTunnelRowType; } // get all names of partitions fields - String[] partitionNames = partitionsMap.keySet().toArray(new String[0]); + String[] partitionNames = partitionsMap.keySet().toArray(TYPE_ARRAY_STRING); // initialize data type for partition fields SeaTunnelDataType[] partitionTypes = new SeaTunnelDataType[partitionNames.length]; Arrays.fill(partitionTypes, BasicType.STRING_TYPE); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java index 9b81950de9a..52b2abdd2ed 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java @@ -58,7 +58,6 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; import java.time.LocalDate; -import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -236,7 +235,7 @@ private SeaTunnelDataType orcDataType2SeaTunnelDataType(TypeDescription typeD return new MapType<>(orcDataType2SeaTunnelDataType(keyType), orcDataType2SeaTunnelDataType(valueType)); case STRUCT: List children = typeDescription.getChildren(); - String[] fieldNames = typeDescription.getFieldNames().toArray(new String[0]); + String[] fieldNames = typeDescription.getFieldNames().toArray(TYPE_ARRAY_STRING); SeaTunnelDataType[] fieldTypes = children.stream().map(this::orcDataType2SeaTunnelDataType).toArray(SeaTunnelDataType[]::new); return new SeaTunnelRowType(fieldNames, fieldTypes); default: @@ -533,15 +532,15 @@ private Object[] readLongListVector(LongColumnVector longVector, TypeDescription } } if (childType.getCategory() == TypeDescription.Category.BOOLEAN) { - return longList.toArray(new Boolean[0]); + return longList.toArray(TYPE_ARRAY_BOOLEAN); } else if (childType.getCategory() == TypeDescription.Category.INT) { - return longList.toArray(new Integer[0]); + return longList.toArray(TYPE_ARRAY_INTEGER); } else if (childType.getCategory() == TypeDescription.Category.BYTE) { - return longList.toArray(new Byte[0]); + return longList.toArray(TYPE_ARRAY_BYTE); } else if (childType.getCategory() == TypeDescription.Category.SHORT) { - return longList.toArray(new Short[0]); + return longList.toArray(TYPE_ARRAY_SHORT); } else { - return longList.toArray(new Long[0]); + return longList.toArray(TYPE_ARRAY_LONG); } } @@ -567,9 +566,9 @@ private Object[] readDoubleListVector(DoubleColumnVector doubleVec, TypeDescript } } if (colType.getCategory() == TypeDescription.Category.FLOAT) { - return doubleList.toArray(new Float[0]); + return doubleList.toArray(TYPE_ARRAY_FLOAT); } else { - return doubleList.toArray(new Double[0]); + return doubleList.toArray(TYPE_ARRAY_DOUBLE); } } @@ -599,7 +598,7 @@ private Object[] readBytesListVector(BytesColumnVector bytesVec, TypeDescription } } if (childType.getCategory() == TypeDescription.Category.STRING) { - return bytesValList.toArray(new String[0]); + return bytesValList.toArray(TYPE_ARRAY_STRING); } else { return bytesValList.toArray(); } @@ -622,7 +621,7 @@ private Object[] readDecimalListVector(DecimalColumnVector decimalVector, int of decimalList.add(null); } } - return decimalList.toArray(new BigDecimal[0]); + return decimalList.toArray(TYPE_ARRAY_BIG_DECIMAL); } private Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) { @@ -651,9 +650,9 @@ private Object[] readTimestampListVector(TimestampColumnVector timestampVector, } } if (childType.getCategory() == TypeDescription.Category.DATE) { - return timestampList.toArray(new LocalDate[0]); + return timestampList.toArray(TYPE_ARRAY_LOCAL_DATE); } else { - return timestampList.toArray(new LocalDateTime[0]); + return timestampList.toArray(TYPE_ARRAY_LOCAL_DATETIME); } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java index 0789985d29c..52df095a45d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java @@ -124,21 +124,21 @@ private Object resolveObject(Object field, SeaTunnelDataType fieldType) { SeaTunnelDataType elementType = ((ArrayType) fieldType).getElementType(); switch (elementType.getSqlType()) { case STRING: - return origArray.toArray(new String[0]); + return origArray.toArray(TYPE_ARRAY_STRING); case BOOLEAN: - return origArray.toArray(new Boolean[0]); + return origArray.toArray(TYPE_ARRAY_BOOLEAN); case TINYINT: - return origArray.toArray(new Byte[0]); + return origArray.toArray(TYPE_ARRAY_BYTE); case SMALLINT: - return origArray.toArray(new Short[0]); + return origArray.toArray(TYPE_ARRAY_SHORT); case INT: - return origArray.toArray(new Integer[0]); + return origArray.toArray(TYPE_ARRAY_INTEGER); case BIGINT: - return origArray.toArray(new Long[0]); + return origArray.toArray(TYPE_ARRAY_LONG); case FLOAT: - return origArray.toArray(new Float[0]); + return origArray.toArray(TYPE_ARRAY_FLOAT); case DOUBLE: - return origArray.toArray(new Double[0]); + return origArray.toArray(TYPE_ARRAY_DOUBLE); default: String errorMsg = String.format("SeaTunnel array type not support this type [%s] now", fieldType.getSqlType()); throw new UnsupportedOperationException(errorMsg); From 210c58e2eb952cf947386bed18fb13a995795306 Mon Sep 17 00:00:00 2001 From: KingsleyY <42668765+CallMeKingsley97@users.noreply.github.com> Date: Tue, 18 Oct 2022 10:04:48 +0800 Subject: [PATCH 6/8] fix the dead link of CNCF CLOUD NATIVE Landscape (#3105) update the link --- docs/en/intro/about.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/intro/about.md b/docs/en/intro/about.md index 9b7a76811c3..5f38f22a9a2 100644 --- a/docs/en/intro/about.md +++ b/docs/en/intro/about.md @@ -64,7 +64,7 @@ SeaTunnel have lots of users which you can find more information in [users](http

  

-SeaTunnel enriches the CNCF CLOUD NATIVE Landscape. +SeaTunnel enriches the CNCF CLOUD NATIVE Landscape.

## What's More From 7404c180de8b2bf2b43cbff866ac4a9a951e8bdb Mon Sep 17 00:00:00 2001 From: TyrantLucifer Date: Tue, 18 Oct 2022 10:27:43 +0800 Subject: [PATCH 7/8] [Feature][Connector-V2][File] Fix filesystem get error (#3117) --- .../connectors/seatunnel/file/sink/util/FileSystemUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java index 9354130bb9e..652f7e6ac98 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java @@ -25,9 +25,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -39,7 +39,7 @@ public class FileSystemUtils { public static Configuration CONF; public static FileSystem getFileSystem(@NonNull String path) throws IOException { - FileSystem fileSystem = FileSystem.get(new File(path).toURI(), CONF); + FileSystem fileSystem = FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), CONF); fileSystem.setWriteChecksum(false); return fileSystem; } From d6f82683592237e2cf6505fccac76c58854092fc Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Tue, 18 Oct 2022 10:51:47 +0800 Subject: [PATCH 8/8] [Imporve][Seatunnel-code][Seatunnel-flink-starter] customize operator parallel for flink and spark (#2941) * customize operator parallel --- docs/en/connector-v2/sink/common-options.md | 10 +++++++ docs/en/connector-v2/source/common-options.md | 7 +++++ .../apache/seatunnel/common/Constants.java | 2 -- .../flink/execution/SinkExecuteProcessor.java | 8 +++++- .../execution/SourceExecuteProcessor.java | 5 ++++ .../spark/execution/SinkExecuteProcessor.java | 8 ++++++ .../execution/SourceExecuteProcessor.java | 9 ++++--- .../resources/examples/fake_to_console.conf | 27 ++++++++++--------- .../main/resources/examples/spark.batch.conf | 7 +++-- .../spark/source/SeaTunnelSourceSupport.java | 5 ++-- 10 files changed, 65 insertions(+), 23 deletions(-) diff --git a/docs/en/connector-v2/sink/common-options.md b/docs/en/connector-v2/sink/common-options.md index ac4a2e4288f..53c623086a5 100644 --- a/docs/en/connector-v2/sink/common-options.md +++ b/docs/en/connector-v2/sink/common-options.md @@ -5,6 +5,8 @@ | name | type | required | default value | | ----------------- | ------ | -------- | ------------- | | source_table_name | string | no | - | +| parallelism | int | no | - | + ### source_table_name [string] @@ -12,11 +14,18 @@ When `source_table_name` is not specified, the current plug-in processes the dat When `source_table_name` is specified, the current plug-in is processing the data set corresponding to this parameter. +### parallelism [int] + +When `parallelism` is not specified, the `parallelism` in env is used by default. + +When parallelism is specified, it will override the parallelism in env. + ## Examples ```bash source { FakeSourceStream { + parallelism = 2 result_table_name = "fake" field_name = "name,age" } @@ -37,6 +46,7 @@ transform { sink { console { + parallelism = 3 source_table_name = "fake_name" } } diff --git a/docs/en/connector-v2/source/common-options.md b/docs/en/connector-v2/source/common-options.md index 7254c44e565..7fc32c505ee 100644 --- a/docs/en/connector-v2/source/common-options.md +++ b/docs/en/connector-v2/source/common-options.md @@ -5,6 +5,7 @@ | name | type | required | default value | | ----------------- | ------ | -------- | ------------- | | result_table_name | string | no | - | +| parallelism | int | no | - | ### result_table_name [string] @@ -12,6 +13,12 @@ When `result_table_name` is not specified, the data processed by this plugin wil When `result_table_name` is specified, the data processed by this plugin will be registered as a data set `(dataStream/dataset)` that can be directly accessed by other plugins, or called a temporary table `(table)` . The data set `(dataStream/dataset)` registered here can be directly accessed by other plugins by specifying `source_table_name` . +### parallelism [int] + +When `parallelism` is not specified, the `parallelism` in env is used by default. + +When parallelism is specified, it will override the parallelism in env. + ## Example ```bash diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java index ad9f8e222d1..c5a24d22897 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java @@ -31,8 +31,6 @@ public final class Constants { public static final String SOURCE_SERIALIZATION = "source.serialization"; - public static final String SOURCE_PARALLELISM = "parallelism"; - public static final String HDFS_ROOT = "hdfs.root"; public static final String HDFS_USER = "hdfs.user"; diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index a7c48832b66..eb0517cfbb2 100644 --- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.flink.FlinkEnvironment; import org.apache.seatunnel.plugin.discovery.PluginIdentifier; @@ -32,6 +33,7 @@ import com.google.common.collect.Lists; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.types.Row; import java.net.URL; @@ -76,7 +78,11 @@ public List> execute(List> upstreamDataStreams) SeaTunnelSink seaTunnelSink = plugins.get(i); DataStream stream = fromSourceTable(sinkConfig).orElse(input); seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType())); - stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName()); + DataStreamSink dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName()); + if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) { + int parallelism = sinkConfig.getInt(CollectionConstants.PARALLELISM); + dataStreamSink.setParallelism(parallelism); + } } // the sink is the last stream return null; diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index 1dc378a835e..0ae0f151d35 100644 --- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportCoordinate; +import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.flink.FlinkEnvironment; import org.apache.seatunnel.plugin.discovery.PluginIdentifier; @@ -75,6 +76,10 @@ public List> execute(List> upstreamDataStreams) "SeaTunnel " + internalSource.getClass().getSimpleName(), internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED); Config pluginConfig = pluginConfigs.get(i); + if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) { + int parallelism = pluginConfig.getInt(CollectionConstants.PARALLELISM); + sourceStream.setParallelism(parallelism); + } registerResultTable(pluginConfig, sourceStream); sources.add(sourceStream); } diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index fbcecd7c820..c5d8276fefb 100644 --- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; @@ -71,6 +72,13 @@ public List> execute(List> upstreamDataStreams) throws Config sinkConfig = pluginConfigs.get(i); SeaTunnelSink seaTunnelSink = plugins.get(i); Dataset dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input); + int parallelism; + if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) { + parallelism = sinkConfig.getInt(CollectionConstants.PARALLELISM); + } else { + parallelism = sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1); + } + dataset.sparkSession().read().option(CollectionConstants.PARALLELISM, parallelism); // TODO modify checkpoint location seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema())); SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save(); diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java index eda67291486..6f2f7901cb2 100644 --- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.common.Constants; +import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; @@ -56,15 +57,15 @@ public List> execute(List> upstreamDataStreams) { SeaTunnelSource source = plugins.get(i); Config pluginConfig = pluginConfigs.get(i); int parallelism; - if (pluginConfig.hasPath(Constants.SOURCE_PARALLELISM)) { - parallelism = pluginConfig.getInt(Constants.SOURCE_PARALLELISM); + if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) { + parallelism = pluginConfig.getInt(CollectionConstants.PARALLELISM); } else { - parallelism = sparkEnvironment.getSparkConf().getInt(Constants.SOURCE_PARALLELISM, 1); + parallelism = sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1); } Dataset dataset = sparkEnvironment.getSparkSession() .read() .format(SeaTunnelSource.class.getSimpleName()) - .option(Constants.SOURCE_PARALLELISM, parallelism) + .option(CollectionConstants.PARALLELISM, parallelism) .option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source)) .schema((StructType) TypeConverterUtils.convert(source.getProducedType())).load(); sources.add(dataset); diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf index 212ec1cdfdb..aea7d4c8c50 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf @@ -28,32 +28,35 @@ env { source { # This is a example source plugin **only for test and demonstrate the feature source plugin** - FakeSource { - result_table_name = "fake" - row.num = 16 - schema = { - fields { - name = "string" - age = "int" - } + FakeSource { + parallelism = 2 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" } } + } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, # please go to https://seatunnel.apache.org/docs/category/source-v2 } transform { - sql { - sql = "select name,age from fake" - } + sql { + sql = "select name,age from fake" + } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/category/transform } sink { - Console {} + Console { + parallelism = 3 + } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, # please go to https://seatunnel.apache.org/docs/category/sink-v2 diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf index 4adb2a9ff2f..7692598da63 100644 --- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf +++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf @@ -24,7 +24,7 @@ env { # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties #job.mode = BATCH job.name = "SeaTunnel" - spark.executor.instances = 2 + spark.executor.instances = 1 spark.executor.cores = 1 spark.executor.memory = "1g" spark.master = local @@ -34,6 +34,7 @@ source { # This is a example input plugin **only for test and demonstrate the feature input plugin** FakeSource { row.num = 16 + parallelism = 2 schema = { fields { c_map = "map" @@ -82,7 +83,9 @@ transform { sink { # choose stdout output plugin to output data to console - Console {} + Console { + parallelism = 2 + } # you can also you other output plugins, such as sql # hdfs { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java index 22a4859aa5d..69b22a6254a 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.Constants; +import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader; import org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader; @@ -59,14 +60,14 @@ public DataSourceReader createReader(StructType rowType, DataSourceOptions optio @Override public DataSourceReader createReader(DataSourceOptions options) { SeaTunnelSource seaTunnelSource = getSeaTunnelSource(options); - int parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1); + int parallelism = options.getInt(CollectionConstants.PARALLELISM, 1); return new BatchSourceReader(seaTunnelSource, parallelism); } @Override public MicroBatchReader createMicroBatchReader(Optional rowTypeOptional, String checkpointLocation, DataSourceOptions options) { SeaTunnelSource seaTunnelSource = getSeaTunnelSource(options); - Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1); + Integer parallelism = options.getInt(CollectionConstants.PARALLELISM, 1); Integer checkpointInterval = options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT); String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state"); Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();