Skip to content

Commit

Permalink
[feat-#846][hdfs]hdfs sql support partitionColumn
Browse files Browse the repository at this point in the history
  • Loading branch information
Paddy0523 authored and FlechazoW committed May 30, 2022
1 parent d6c7f89 commit a3fc935
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ public class HdfsDynamicTableSource implements ScanTableSource {

private final HdfsConf hdfsConf;
private final TableSchema tableSchema;
private final List<String> partitionKeyList;

public HdfsDynamicTableSource(HdfsConf hdfsConf, TableSchema tableSchema) {
this(hdfsConf, tableSchema, new ArrayList<>());
}

public HdfsDynamicTableSource(
HdfsConf hdfsConf, TableSchema tableSchema, List<String> partitionKeyList) {
this.hdfsConf = hdfsConf;
this.tableSchema = tableSchema;
this.partitionKeyList = partitionKeyList;
}

@Override
Expand All @@ -61,10 +68,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
String[] fieldNames = tableSchema.getFieldNames();
List<FieldConf> columnList = new ArrayList<>(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) {
String fieldName = fieldNames[i];
FieldConf field = new FieldConf();
field.setName(fieldNames[i]);
field.setName(fieldName);
field.setType(rowType.getTypeAt(i).asSummaryString());
field.setIndex(i);
if (partitionKeyList.contains(fieldName)) {
field.setPart(true);
}
columnList.add(field);
}
hdfsConf.setColumn(columnList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.utils.TableSchemaUtils;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -94,11 +95,12 @@ public DynamicTableSource createDynamicTableSource(Context context) {
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
HdfsConf hdfsConf = getHdfsConf(config);
List<String> partitionKeys = context.getCatalogTable().getPartitionKeys();
hdfsConf.setParallelism(config.get(SourceOptions.SCAN_PARALLELISM));
hdfsConf.setHadoopConfig(
HdfsOptions.getHadoopConfig(context.getCatalogTable().getOptions()));

return new HdfsDynamicTableSource(hdfsConf, physicalSchema);
return new HdfsDynamicTableSource(hdfsConf, physicalSchema, partitionKeys);
}

@Override
Expand Down

0 comments on commit a3fc935

Please sign in to comment.