Skip to content

Commit df1954d

Browse files
authored
[Hotfix][Jdbc] Fix table/query columns order merge for jdbc catalog (#6771)
1 parent 45a4e15 commit df1954d

File tree

2 files changed

+44
-12
lines changed

2 files changed

+44
-12
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,17 @@ static CatalogTable mergeCatalogTable(CatalogTable tableOfPath, CatalogTable tab
233233
boolean schemaEquals =
234234
schemaIncludeAllColumns && columnsOfMerge.size() == columnsOfPath.size();
235235
if (schemaEquals) {
236-
return tableOfPath;
236+
// Reorder the field list
237+
return CatalogTable.of(
238+
tableOfPath.getTableId(),
239+
TableSchema.builder()
240+
.primaryKey(tableSchemaOfPath.getPrimaryKey())
241+
.constraintKey(tableSchemaOfPath.getConstraintKeys())
242+
.columns(columnsOfMerge)
243+
.build(),
244+
tableOfPath.getOptions(),
245+
tableOfPath.getPartitionKeys(),
246+
tableOfPath.getComment());
237247
}
238248

239249
PrimaryKey primaryKeyOfPath = tableSchemaOfPath.getPrimaryKey();

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
1919

2020
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
import org.apache.seatunnel.api.table.catalog.Column;
2122
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
2223
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
2324
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
@@ -30,6 +31,8 @@
3031

3132
import java.util.Arrays;
3233
import java.util.Collections;
34+
import java.util.List;
35+
import java.util.Map;
3336
import java.util.stream.Collectors;
3437

3538
public class JdbcCatalogUtilsTest {
@@ -103,9 +106,9 @@ public void testColumnEqualsMerge() {
103106
TableSchema.builder()
104107
.column(
105108
PhysicalColumn.of(
106-
"f1",
107-
BasicType.LONG_TYPE,
108-
null,
109+
"f2",
110+
BasicType.STRING_TYPE,
111+
10,
109112
true,
110113
null,
111114
null,
@@ -117,10 +120,10 @@ public void testColumnEqualsMerge() {
117120
null))
118121
.column(
119122
PhysicalColumn.of(
120-
"f2",
123+
"f3",
121124
BasicType.STRING_TYPE,
122-
10,
123-
true,
125+
20,
126+
false,
124127
null,
125128
null,
126129
null,
@@ -131,10 +134,10 @@ public void testColumnEqualsMerge() {
131134
null))
132135
.column(
133136
PhysicalColumn.of(
134-
"f3",
135-
BasicType.STRING_TYPE,
136-
20,
137-
false,
137+
"f1",
138+
BasicType.LONG_TYPE,
139+
null,
140+
true,
138141
null,
139142
null,
140143
null,
@@ -149,7 +152,26 @@ public void testColumnEqualsMerge() {
149152
null);
150153

151154
CatalogTable mergeTable = JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
152-
Assertions.assertEquals(DEFAULT_TABLE, mergeTable);
155+
Assertions.assertEquals(DEFAULT_TABLE.getTableId(), mergeTable.getTableId());
156+
Assertions.assertEquals(DEFAULT_TABLE.getOptions(), mergeTable.getOptions());
157+
Assertions.assertEquals(DEFAULT_TABLE.getComment(), mergeTable.getComment());
158+
Assertions.assertEquals(DEFAULT_TABLE.getCatalogName(), mergeTable.getCatalogName());
159+
Assertions.assertNotEquals(DEFAULT_TABLE.getTableSchema(), mergeTable.getTableSchema());
160+
Assertions.assertEquals(
161+
DEFAULT_TABLE.getTableSchema().getPrimaryKey(),
162+
mergeTable.getTableSchema().getPrimaryKey());
163+
Assertions.assertEquals(
164+
DEFAULT_TABLE.getTableSchema().getConstraintKeys(),
165+
mergeTable.getTableSchema().getConstraintKeys());
166+
167+
Map<String, Column> columnMap =
168+
DEFAULT_TABLE.getTableSchema().getColumns().stream()
169+
.collect(Collectors.toMap(e -> e.getName(), e -> e));
170+
List<Column> sortByQueryColumns =
171+
tableOfQuery.getTableSchema().getColumns().stream()
172+
.map(e -> columnMap.get(e.getName()))
173+
.collect(Collectors.toList());
174+
Assertions.assertEquals(sortByQueryColumns, mergeTable.getTableSchema().getColumns());
153175
}
154176

155177
@Test

0 commit comments

Comments
 (0)