diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index f2839633471..5a25ddc8f8c 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -45,6 +45,8 @@ import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND; import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED; import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR; +import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH; +import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA; /** * The common error of SeaTunnel. This is an alternative to {@link CommonErrorCodeDeprecated} and is @@ -245,4 +247,32 @@ public static SeaTunnelRuntimeException unsupportedRowKind( params.put("rowKind", rowKind); return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params); } + + public static SeaTunnelRuntimeException writeRowErrorWithSchemaIncompatibleSchema( + String connector, + String sourceFiledName, + String sourceFiledType, + String exceptType, + String sinkFiledName, + String sinkFiledType) { + Map params = new HashMap<>(); + params.put("connector", connector); + params.put("sourceFiledName", sourceFiledName); + params.put("sourceFiledType", sourceFiledType); + params.put("exceptType", exceptType); + params.put("sinkFiledName", sinkFiledName); + params.put("sinkFiledType", sinkFiledType); + return new SeaTunnelRuntimeException( + WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA, params); + } + + public static SeaTunnelRuntimeException writeRowErrorWithFiledsNotMatch( + String connector, int sourceFieldsNum, int sinkFieldsNum) { + Map params = new HashMap<>(); + params.put("connector", connector); + params.put("sourceFiledName", String.valueOf(sourceFieldsNum)); + params.put("sourceFiledType", String.valueOf(sinkFieldsNum)); + return new SeaTunnelRuntimeException( + WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params); + } } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index 3cf69285cb2..465dcdc38fd 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -62,7 +62,15 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { "COMMON-28", "'' array type not support genericType '' of ''"), UNSUPPORTED_ROW_KIND( - "COMMON-29", "'' table '' not support rowKind ''"); + "COMMON-29", "'' table '' not support rowKind ''"), + + WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA( + "COMMON-30", + " write SeaTunnelRow failed, The source filed with name '' has the sql type '', except datatype of sink is ''; but the filed with name '' in sink table which actual type is ''.Please check schema of sink table."), + + WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH( + "COMMON-31", + " write SeaTunnelRow failed. The source has '' fields, but paimon table of sink has '' fields. Please check schema of sink table."); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java index 11d9b3b61a4..6cbd8fe747c 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java @@ -342,8 +342,14 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn */ public static InternalRow reconvert( SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) { - List fields = tableSchema.fields(); - BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields()); + List sinkTotalFields = tableSchema.fields(); + int sourceTotalFields = seaTunnelRowType.getTotalFields(); + if (sourceTotalFields != sinkTotalFields.size()) { + throw new CommonError() + .writeRowErrorWithFiledsNotMatch( + "Paimon", sourceTotalFields, sinkTotalFields.size()); + } + BinaryRow binaryRow = new BinaryRow(sourceTotalFields); BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow); // Convert SeaTunnel RowKind to Paimon RowKind org.apache.paimon.types.RowKind rowKind = @@ -362,6 +368,7 @@ public static InternalRow reconvert( binaryWriter.setNullAt(i); continue; } + checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields); String fieldName = seaTunnelRowType.getFieldName(i); switch (fieldTypes[i].getSqlType()) { case TINYINT: @@ -408,7 +415,7 @@ public static InternalRow reconvert( .setValue(binaryWriter, i, DateTimeUtils.toInternal(date)); break; case TIMESTAMP: - DataField dataField = SchemaUtil.getDataField(fields, fieldName); + DataField dataField = SchemaUtil.getDataField(sinkTotalFields, fieldName); int precision = ((TimestampType) dataField.type()).getPrecision(); LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i); binaryWriter.writeTimestamp( @@ -462,4 +469,24 @@ public static InternalRow reconvert( } return binaryRow; } + + private static void checkCanWriteWithType( + int i, SeaTunnelRowType seaTunnelRowType, List fields) { + String sourceFieldName = seaTunnelRowType.getFieldName(i); + SeaTunnelDataType sourceFieldType = seaTunnelRowType.getFieldType(i); + DataField sinkDataField = fields.get(i); + DataType exceptDataType = + RowTypeConverter.reconvert(sourceFieldName, seaTunnelRowType.getFieldType(i)); + DataType sinkDataType = sinkDataField.type(); + if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) { + throw new CommonError() + .writeRowErrorWithSchemaIncompatibleSchema( + "Paimon", + sourceFieldName, + sourceFieldType.getSqlType().toString(), + exceptDataType.getTypeRoot().toString(), + sinkDataField.name(), + sinkDataField.type().getTypeRoot().toString()); + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index 05fa3db4b95..bd21dd27d58 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -47,6 +47,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; @@ -120,6 +121,7 @@ public void testSinkWithMultipleInBatchMode(TestContainer container) throws Exce }); } + @Order(1) @TestTemplate public void testFakeCDCSinkPaimon(TestContainer container) throws Exception { Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf"); @@ -148,6 +150,21 @@ public void testFakeCDCSinkPaimon(TestContainer container) throws Exception { }); } + @Order(2) + @TestTemplate + public void testSinkWithIncompatibleSchema(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Container.ExecResult errResult = + container.executeJob("/fake_cdc_sink_paimon_case1_with_error_schema.conf"); + Assertions.assertEquals(1, errResult.getExitCode()); + Assertions.assertTrue( + errResult + .getStderr() + .contains( + "ErrorCode:[COMMON-30], ErrorDescription:[Paimon write SeaTunnelRow failed, The source filed with name 'name' has the sql type 'INT', except datatype of sink is 'INT'; but the filed with name 'name' in sink table which actual type is 'STRING'.Please check schema of sink table.")); + } + @TestTemplate public void testFakeMultipleTableSinkPaimon(TestContainer container) throws Exception { Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case2.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf new file mode 100644 index 00000000000..70bcedff29d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = int + score = string + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, 100, "A"] + }, + { + kind = INSERT + fields = [2, 100, "B"] + }, + { + kind = INSERT + fields = [3, 100, "C"] + } + ] + } +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "seatunnel_namespace1" + table = "st_test" + } +}