Skip to content

Commit

Permalink
[Improve][Paimon] Add check for the base type between source and sink
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Jun 6, 2024
1 parent 23a744b commit 49165a9
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA;

/**
* The common error of SeaTunnel. This is an alternative to {@link CommonErrorCodeDeprecated} and is
Expand Down Expand Up @@ -245,4 +247,32 @@ public static SeaTunnelRuntimeException unsupportedRowKind(
params.put("rowKind", rowKind);
return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
}

public static SeaTunnelRuntimeException writeRowErrorWithSchemaIncompatibleSchema(
String connector,
String sourceFiledName,
String sourceFiledType,
String exceptType,
String sinkFiledName,
String sinkFiledType) {
Map<String, String> params = new HashMap<>();
params.put("connector", connector);
params.put("sourceFiledName", sourceFiledName);
params.put("sourceFiledType", sourceFiledType);
params.put("exceptType", exceptType);
params.put("sinkFiledName", sinkFiledName);
params.put("sinkFiledType", sinkFiledType);
return new SeaTunnelRuntimeException(
WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA, params);
}

public static SeaTunnelRuntimeException writeRowErrorWithFiledsNotMatch(
String connector, int sourceFieldsNum, int sinkFieldsNum) {
Map<String, String> params = new HashMap<>();
params.put("connector", connector);
params.put("sourceFiledName", String.valueOf(sourceFieldsNum));
params.put("sourceFiledType", String.valueOf(sinkFieldsNum));
return new SeaTunnelRuntimeException(
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"COMMON-28",
"'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
UNSUPPORTED_ROW_KIND(
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'");
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'"),

WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA(
"COMMON-30",
"<connector> write SeaTunnelRow failed, The source filed with name '<sourceFiledName>' has the sql type '<sourceFiledType>', except datatype of sink is '<exceptType>'; but the filed with name '<sinkFiledName>' in sink table which actual type is '<sinkFiledType>'.Please check schema of sink table."),

WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH(
"COMMON-31",
"<connector> write SeaTunnelRow failed. The source has '<sourceFieldsNum>' fields, but paimon table of sink has '<sinkFieldsNum>' fields. Please check schema of sink table.");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,14 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
*/
public static InternalRow reconvert(
SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) {
List<DataField> fields = tableSchema.fields();
BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
List<DataField> sinkTotalFields = tableSchema.fields();
int sourceTotalFields = seaTunnelRowType.getTotalFields();
if (sourceTotalFields != sinkTotalFields.size()) {
throw new CommonError()
.writeRowErrorWithFiledsNotMatch(
"Paimon", sourceTotalFields, sinkTotalFields.size());
}
BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
// Convert SeaTunnel RowKind to Paimon RowKind
org.apache.paimon.types.RowKind rowKind =
Expand All @@ -362,6 +368,7 @@ public static InternalRow reconvert(
binaryWriter.setNullAt(i);
continue;
}
checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields);
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
Expand Down Expand Up @@ -408,7 +415,7 @@ public static InternalRow reconvert(
.setValue(binaryWriter, i, DateTimeUtils.toInternal(date));
break;
case TIMESTAMP:
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
DataField dataField = SchemaUtil.getDataField(sinkTotalFields, fieldName);
int precision = ((TimestampType) dataField.type()).getPrecision();
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
binaryWriter.writeTimestamp(
Expand Down Expand Up @@ -462,4 +469,24 @@ public static InternalRow reconvert(
}
return binaryRow;
}

private static void checkCanWriteWithType(
int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
String sourceFieldName = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType<?> sourceFieldType = seaTunnelRowType.getFieldType(i);
DataField sinkDataField = fields.get(i);
DataType exceptDataType =
RowTypeConverter.reconvert(sourceFieldName, seaTunnelRowType.getFieldType(i));
DataType sinkDataType = sinkDataField.type();
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
throw new CommonError()
.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
sourceFieldName,
sourceFieldType.getSqlType().toString(),
exceptDataType.getTypeRoot().toString(),
sinkDataField.name(),
sinkDataField.type().getTypeRoot().toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

Expand Down Expand Up @@ -120,6 +121,7 @@ public void testSinkWithMultipleInBatchMode(TestContainer container) throws Exce
});
}

@Order(1)
@TestTemplate
public void testFakeCDCSinkPaimon(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf");
Expand Down Expand Up @@ -148,6 +150,21 @@ public void testFakeCDCSinkPaimon(TestContainer container) throws Exception {
});
}

@Order(2)
@TestTemplate
public void testSinkWithIncompatibleSchema(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Container.ExecResult errResult =
container.executeJob("/fake_cdc_sink_paimon_case1_with_error_schema.conf");
Assertions.assertEquals(1, errResult.getExitCode());
Assertions.assertTrue(
errResult
.getStderr()
.contains(
"ErrorCode:[COMMON-30], ErrorDescription:[Paimon write SeaTunnelRow failed, The source filed with name 'name' has the sql type 'INT', except datatype of sink is 'INT'; but the filed with name 'name' in sink table which actual type is 'STRING'.Please check schema of sink table."));
}

@TestTemplate
public void testFakeMultipleTableSinkPaimon(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case2.conf");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = int
score = string
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, 100, "A"]
},
{
kind = INSERT
fields = [2, 100, "B"]
},
{
kind = INSERT
fields = [3, 100, "C"]
}
]
}
}

sink {
Paimon {
warehouse = "file:///tmp/paimon"
database = "seatunnel_namespace1"
table = "st_test"
}
}

0 comments on commit 49165a9

Please sign in to comment.