Skip to content

Commit d56d64f

Browse files
authored
[Improve][Paimon] Add check for the base type between source and sink before write. (#6953)
1 parent 658643a commit d56d64f

File tree

6 files changed

+143
-6
lines changed

6 files changed

+143
-6
lines changed

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
4646
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
4747
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
48+
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH;
49+
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA;
4850

4951
/**
5052
* The common error of SeaTunnel. This is an alternative to {@link CommonErrorCodeDeprecated} and is
@@ -245,4 +247,28 @@ public static SeaTunnelRuntimeException unsupportedRowKind(
245247
params.put("rowKind", rowKind);
246248
return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
247249
}
250+
251+
public static SeaTunnelRuntimeException writeRowErrorWithSchemaIncompatibleSchema(
252+
String connector,
253+
String sourceFieldSqlSchema,
254+
String exceptFieldSqlSchema,
255+
String sinkFieldSqlSchema) {
256+
Map<String, String> params = new HashMap<>();
257+
params.put("connector", connector);
258+
params.put("sourceFieldSqlSchema", sourceFieldSqlSchema);
259+
params.put("exceptFieldSqlSchema", exceptFieldSqlSchema);
260+
params.put("sinkFieldSqlSchema", sinkFieldSqlSchema);
261+
return new SeaTunnelRuntimeException(
262+
WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA, params);
263+
}
264+
265+
public static SeaTunnelRuntimeException writeRowErrorWithFiledsCountNotMatch(
266+
String connector, int sourceFieldsNum, int sinkFieldsNum) {
267+
Map<String, String> params = new HashMap<>();
268+
params.put("connector", connector);
269+
params.put("sourceFiledName", String.valueOf(sourceFieldsNum));
270+
params.put("sourceFiledType", String.valueOf(sinkFieldsNum));
271+
return new SeaTunnelRuntimeException(
272+
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params);
273+
}
248274
}

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,15 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
6262
"COMMON-28",
6363
"'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
6464
UNSUPPORTED_ROW_KIND(
65-
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'");
65+
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'"),
66+
67+
WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA(
68+
"COMMON-30",
69+
"<connector>: The source filed with schema '<sourceFieldSqlSchema>', except filed schema of sink is '<exceptFieldSqlSchema>'; but the filed in sink table which actual schema is '<sinkFieldSqlSchema>'. Please check schema of sink table."),
70+
71+
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH(
72+
"COMMON-31",
73+
"<connector>: The source has '<sourceFieldsNum>' fields, but the table of sink has '<sinkFieldsNum>' fields. Please check schema of sink table.");
6674

6775
private final String code;
6876
private final String description;

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.seatunnel.common.exception.CommonError;
2727
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
2828

29+
import org.apache.commons.lang3.StringUtils;
2930
import org.apache.paimon.data.BinaryArray;
3031
import org.apache.paimon.data.BinaryArrayWriter;
3132
import org.apache.paimon.data.BinaryMap;
@@ -350,8 +351,14 @@ public static SeaTunnelRow convert(
350351
*/
351352
public static InternalRow reconvert(
352353
SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) {
353-
List<DataField> fields = tableSchema.fields();
354-
BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
354+
List<DataField> sinkTotalFields = tableSchema.fields();
355+
int sourceTotalFields = seaTunnelRowType.getTotalFields();
356+
if (sourceTotalFields != sinkTotalFields.size()) {
357+
throw new CommonError()
358+
.writeRowErrorWithFiledsCountNotMatch(
359+
"Paimon", sourceTotalFields, sinkTotalFields.size());
360+
}
361+
BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
355362
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
356363
// Convert SeaTunnel RowKind to Paimon RowKind
357364
org.apache.paimon.types.RowKind rowKind =
@@ -370,6 +377,7 @@ public static InternalRow reconvert(
370377
binaryWriter.setNullAt(i);
371378
continue;
372379
}
380+
checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields);
373381
String fieldName = seaTunnelRowType.getFieldName(i);
374382
switch (fieldTypes[i].getSqlType()) {
375383
case TINYINT:
@@ -416,7 +424,7 @@ public static InternalRow reconvert(
416424
.setValue(binaryWriter, i, DateTimeUtils.toInternal(date));
417425
break;
418426
case TIMESTAMP:
419-
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
427+
DataField dataField = SchemaUtil.getDataField(sinkTotalFields, fieldName);
420428
int precision = ((TimestampType) dataField.type()).getPrecision();
421429
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
422430
binaryWriter.writeTimestamp(
@@ -470,4 +478,23 @@ public static InternalRow reconvert(
470478
}
471479
return binaryRow;
472480
}
481+
482+
private static void checkCanWriteWithType(
483+
int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
484+
String sourceFieldName = seaTunnelRowType.getFieldName(i);
485+
SeaTunnelDataType<?> sourceFieldType = seaTunnelRowType.getFieldType(i);
486+
DataField sinkDataField = fields.get(i);
487+
DataType exceptDataType =
488+
RowTypeConverter.reconvert(sourceFieldName, seaTunnelRowType.getFieldType(i));
489+
DataField exceptDataField = new DataField(i, sourceFieldName, exceptDataType);
490+
DataType sinkDataType = sinkDataField.type();
491+
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
492+
throw new CommonError()
493+
.writeRowErrorWithSchemaIncompatibleSchema(
494+
"Paimon",
495+
sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(),
496+
exceptDataField.asSQLString(),
497+
sinkDataField.asSQLString());
498+
}
499+
}
473500
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,20 @@ public void testFakeCDCSinkPaimon(TestContainer container) throws Exception {
148148
});
149149
}
150150

151+
@TestTemplate
152+
public void testSinkWithIncompatibleSchema(TestContainer container) throws Exception {
153+
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf");
154+
Assertions.assertEquals(0, execResult.getExitCode());
155+
Container.ExecResult errResult =
156+
container.executeJob("/fake_cdc_sink_paimon_case1_with_error_schema.conf");
157+
Assertions.assertEquals(1, errResult.getExitCode());
158+
Assertions.assertTrue(
159+
errResult
160+
.getStderr()
161+
.contains(
162+
"[Paimon: The source filed with schema 'name INT', except filed schema of sink is '`name` INT'; but the filed in sink table which actual schema is '`name` STRING'. Please check schema of sink table.]"));
163+
}
164+
151165
@TestTemplate
152166
public void testFakeMultipleTableSinkPaimon(TestContainer container) throws Exception {
153167
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case2.conf");
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
parallelism = 1
23+
job.mode = "BATCH"
24+
}
25+
26+
source {
27+
FakeSource {
28+
schema = {
29+
fields {
30+
pk_id = bigint
31+
name = int
32+
score = string
33+
}
34+
primaryKey {
35+
name = "pk_id"
36+
columnNames = [pk_id]
37+
}
38+
}
39+
rows = [
40+
{
41+
kind = INSERT
42+
fields = [1, 100, "A"]
43+
},
44+
{
45+
kind = INSERT
46+
fields = [2, 100, "B"]
47+
},
48+
{
49+
kind = INSERT
50+
fields = [3, 100, "C"]
51+
}
52+
]
53+
}
54+
}
55+
56+
sink {
57+
Paimon {
58+
warehouse = "file:///tmp/paimon"
59+
database = "seatunnel_namespace1"
60+
table = "st_test"
61+
}
62+
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
}, {
5959
"id" : 12,
6060
"name" : "c_date",
61-
"type" : "TIMESTAMP(3)"
61+
"type" : "DATE"
6262
}, {
6363
"id" : 13,
6464
"name" : "c_timestamp",
@@ -68,4 +68,4 @@
6868
"partitionKeys" : [ ],
6969
"primaryKeys" : [ ],
7070
"options" : { }
71-
}
71+
}

0 commit comments

Comments
 (0)