Skip to content

Commit e62bf6c

Browse files
authored
[Fix][Connector-V2] Fix maxcompute read with partition spec (#8896)
1 parent 8140862 commit e62bf6c

File tree

2 files changed

+7
-29
lines changed

2 files changed

+7
-29
lines changed

seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,12 @@ public CatalogTable getTable(TablePath tablePath, List<String> fieldNames)
158158
}
159159
Table odpsTable;
160160
com.aliyun.odps.TableSchema odpsSchema;
161-
boolean isPartitioned;
162161
try {
163162
Odps odps = getOdps(tablePath.getDatabaseName());
164163
odpsTable =
165164
MaxcomputeUtil.parseTable(
166165
odps, tablePath.getDatabaseName(), tablePath.getTableName());
167166
odpsSchema = odpsTable.getSchema();
168-
isPartitioned = odpsTable.isPartitioned();
169167
} catch (Exception ex) {
170168
throw new CatalogException(catalogName, ex);
171169
}
@@ -193,31 +191,6 @@ public CatalogTable getTable(TablePath tablePath, List<String> fieldNames)
193191
.build();
194192
return MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
195193
});
196-
if (isPartitioned) {
197-
buildColumnsWithErrorCheck(
198-
tablePath,
199-
builder,
200-
odpsSchema.getPartitionColumns().stream()
201-
.filter(
202-
column ->
203-
fieldNames == null
204-
|| fieldNames.isEmpty()
205-
|| fieldNames.contains(column.getName()))
206-
.iterator(),
207-
(column) -> {
208-
BasicTypeDefine<TypeInfo> typeDefine =
209-
BasicTypeDefine.<TypeInfo>builder()
210-
.name(column.getName())
211-
.nativeType(column.getTypeInfo())
212-
.columnType(column.getTypeInfo().getTypeName())
213-
.dataType(column.getTypeInfo().getTypeName())
214-
.nullable(column.isNullable())
215-
.comment(column.getComment())
216-
.build();
217-
partitionKeys.add(column.getName());
218-
return MaxComputeTypeConverter.INSTANCE.convert(typeDefine);
219-
});
220-
}
221194
TableSchema tableSchema = builder.build();
222195
TableIdentifier tableIdentifier = getTableIdentifier(tablePath);
223196
return CatalogTable.of(

seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ private Map<TablePath, SourceTableInfo> getSourceTableInfos(ReadonlyConfig reado
9595
.orElse(readonlyConfig.get(PROJECT));
9696
TablePath tablePath =
9797
TablePath.of(project, subReadonlyConfig.get(TABLE_NAME));
98+
String partitionSpec =
99+
subReadonlyConfig
100+
.getOptional(PARTITION_SPEC)
101+
.orElse(readonlyConfig.get(PARTITION_SPEC));
102+
98103
if (subReadonlyConfig
99104
.getOptional(ConnectorCommonOptions.SCHEMA)
100105
.isPresent()) {
@@ -108,7 +113,7 @@ private Map<TablePath, SourceTableInfo> getSourceTableInfos(ReadonlyConfig reado
108113
catalogTable.getTablePath(),
109114
new SourceTableInfo(
110115
catalogTable,
111-
subReadonlyConfig.get(PARTITION_SPEC),
116+
partitionSpec,
112117
subReadonlyConfig.get(SPLIT_ROW)));
113118
} else {
114119
Integer splitRow =
@@ -120,7 +125,7 @@ private Map<TablePath, SourceTableInfo> getSourceTableInfos(ReadonlyConfig reado
120125
new SourceTableInfo(
121126
catalog.getTable(
122127
tablePath, subReadonlyConfig.get(READ_COLUMNS)),
123-
subReadonlyConfig.get(PARTITION_SPEC),
128+
partitionSpec,
124129
splitRow));
125130
}
126131
}

0 commit comments

Comments
 (0)