Skip to content

Commit

Permalink
[FLINK-26712][table-planner] Metadata keys should not conflict with p…
Browse files Browse the repository at this point in the history
…hysical columns
  • Loading branch information
twalthr committed Mar 24, 2022
1 parent 3d62a65 commit e5a7efb
Show file tree
Hide file tree
Showing 20 changed files with 332 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.flink.util.CollectionUtil.entry;

/** File system table source. */
@Internal
public class FileSystemTableSource extends AbstractFileSystemTable
Expand Down Expand Up @@ -109,20 +112,14 @@ public FileSystemTableSource(

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
// When this table has no partition, just return a empty source.
// When this table has no partition, just return an empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
}

// Resolve metadata and make sure to filter out metadata not in the producedDataType
final List<String> metadataKeys =
DataType.getFieldNames(producedDataType).stream()
.filter(
((this.metadataKeys == null)
? Collections.emptyList()
: this.metadataKeys)
::contains)
.collect(Collectors.toList());
this.metadataKeys == null ? Collections.emptyList() : this.metadataKeys;
final List<ReadableFileInfo> metadataToExtract =
metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());

Expand Down Expand Up @@ -225,16 +222,27 @@ private BulkFormat<RowData, FileSourceSplit> wrapBulkFormat(
List<ReadableFileInfo> metadata,
List<String> partitionKeys) {
if (!metadata.isEmpty() || !partitionKeys.isEmpty()) {
final List<String> producedFieldNames = DataType.getFieldNames(producedDataType);
final Map<String, FileInfoAccessor> metadataColumns =
IntStream.range(0, metadata.size())
.mapToObj(
i -> {
// Access metadata columns from behind because the
// names are decided by the planner
final int columnPos =
producedFieldNames.size() - metadata.size() + i;
return entry(
producedFieldNames.get(columnPos),
metadata.get(i).getAccessor());
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

bulkFormat =
new FileInfoExtractorBulkFormat(
bulkFormat,
producedDataType,
context.createTypeInformation(producedDataType),
metadata.stream()
.collect(
Collectors.toMap(
ReadableFileInfo::getKey,
ReadableFileInfo::getAccessor)),
metadataColumns,
partitionKeys,
defaultPartName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
* <pre>{@code
* // for t1 and t2
* ROW < i INT, s STRING, d DOUBLE > // physical input
* ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > // final input
* ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > // final input
*
* // for t3
* ROW < i INT, s STRING, d DOUBLE > // physical input
Expand Down Expand Up @@ -115,7 +115,8 @@ public interface SupportsWritingMetadata {
*
* @param metadataKeys a subset of the keys returned by {@link #listWritableMetadata()}, ordered
* by the iteration order of returned map
* @param consumedDataType the final input type of the sink
* @param consumedDataType the final input type of the sink, it is intended to be only forwarded
* and the planner will decide on the field names to avoid collisions
* @see EncodingFormat#applyWritableMetadata(List)
*/
void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
* <pre>{@code
* // for t1 and t2
* ROW < i INT, s STRING, d DOUBLE > // physical output
* ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > // final output
* ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > // final output
* }</pre>
*/
@PublicEvolving
Expand Down Expand Up @@ -129,7 +129,8 @@ public interface SupportsReadingMetadata {
*
* @param metadataKeys a subset of the keys returned by {@link #listReadableMetadata()}, ordered
* by the iteration order of returned map
* @param producedDataType the final output type of the source
* @param producedDataType the final output type of the source, it is intended to be only
* forwarded and the planner will decide on the field names to avoid collisions
* @see DecodingFormat#applyReadableMetadata(List)
*/
void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
@Internal
public final class DynamicSinkUtils {

// Ensures that physical and metadata columns don't collide.
private static final String METADATA_COLUMN_PREFIX = "$metadata$";

/** Converts an {@link TableResult#collect()} sink to a {@link RelNode}. */
public static RelNode convertCollectToRel(
FlinkRelBuilder relBuilder,
Expand Down Expand Up @@ -642,7 +645,11 @@ private static RowType createConsumedType(ResolvedSchema schema, DynamicTableSin

final Stream<RowField> metadataFields =
createRequiredMetadataKeys(schema, sink).stream()
.map(k -> new RowField(k, metadataMap.get(k).getLogicalType()));
.map(
k ->
new RowField(
METADATA_COLUMN_PREFIX + k,
metadataMap.get(k).getLogicalType()));

final List<RowField> rowFields =
Stream.concat(physicalFields, metadataFields).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@
@Internal
public final class DynamicSourceUtils {

// Ensures that physical and metadata columns don't collide.
public static final String METADATA_COLUMN_PREFIX = "$metadata$";

/**
* Converts a given {@link DataStream} to a {@link RelNode}. It adds helper projections if
* necessary.
Expand Down Expand Up @@ -204,7 +207,11 @@ public static RowType createProducedType(ResolvedSchema schema, DynamicTableSour

final Stream<RowField> metadataFields =
createRequiredMetadataKeys(schema, source).stream()
.map(k -> new RowField(k, metadataMap.get(k).getLogicalType()));
.map(
k ->
new RowField(
METADATA_COLUMN_PREFIX + k,
metadataMap.get(k).getLogicalType()));

final List<RowField> rowFields =
Stream.concat(physicalFields, metadataFields).collect(Collectors.toList());
Expand Down Expand Up @@ -315,7 +322,9 @@ private static void pushMetadataProjection(FlinkRelBuilder relBuilder, ResolvedS
.getMetadataKey()
.orElse(metadataColumn.getName());
return rexBuilder.makeAbstractCast(
relDataType, relBuilder.field(metadataKey));
relDataType,
relBuilder.field(
METADATA_COLUMN_PREFIX + metadataKey));
} else {
return relBuilder.field(c.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.METADATA_COLUMN_PREFIX;
import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
Expand Down Expand Up @@ -308,6 +309,7 @@ private RowType performPushDown(
final List<String> projectedMetadataKeys =
projectedMetadataColumns.stream()
.map(NestedColumn::name)
.map(k -> k.substring(METADATA_COLUMN_PREFIX.length()))
.collect(Collectors.toList());

abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, newProducedType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
Collection<Row> data = registeredData.getOrDefault(dataId, Collections.emptyList());
List<Map<String, String>> partitions =
parsePartitionList(helper.getOptions().get(PARTITION_LIST));
DataType producedDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
DataType producedDataType = context.getPhysicalRowDataType();
// pushing project into scan will prune schema and we have to get the mapping between
// partition and row
Map<Map<String, String>, Collection<Row>> partition2Rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ public void testProjectionIncludingOnlyMetadata() {
assertThat(appliedMetadataDataType.get()).isNotNull();

assertThat(DataType.getFieldNames(appliedProjectionDataType.get())).isEmpty();
assertThat(DataType.getFieldNames(appliedMetadataDataType.get())).containsExactly("m2");
assertThat(DataType.getFieldNames(appliedMetadataDataType.get()))
.containsExactly("$metadata$m2");
}

@Test
Expand All @@ -363,7 +364,7 @@ public void testProjectionWithMetadataAndPhysicalFields() {

assertThat(DataType.getFieldNames(appliedProjectionDataType.get())).containsExactly("f1");
assertThat(DataType.getFieldNames(appliedMetadataDataType.get()))
.isEqualTo(Arrays.asList("f1", "m2"));
.isEqualTo(Arrays.asList("f1", "$metadata$m2"));
}

// ---------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, filem
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta])
+- Calc(select=[a, b, CAST(file.path AS VARCHAR(2147483647)) AS filemeta])
+- TableSourceScan(table=[[default_catalog, default_database, MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b, file.path])
+- Calc(select=[a, b, CAST($metadata$file.path AS VARCHAR(2147483647)) AS filemeta])
+- TableSourceScan(table=[[default_catalog, default_database, MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b, $metadata$file.path])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + metadata_1) AS results])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + $metadata$metadata_1) AS results])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"abilities" : [ {
"type" : "WritingMetadata",
"metadataKeys" : [ "m" ],
"consumedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> NOT NULL"
"consumedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` VARCHAR(2147483647)> NOT NULL"
} ]
},
"inputChangelogMode" : [ "INSERT" ],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
}, {
"type" : "ReadingMetadata",
"metadataKeys" : [ "m" ],
"producedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> NOT NULL"
"producedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` VARCHAR(2147483647)> NOT NULL"
} ]
},
"outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, m])",
"outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, $metadata$m])",
"inputProperties" : [ ]
}, {
"id" : 2,
Expand Down Expand Up @@ -82,8 +82,8 @@
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>",
"description" : "Sink(table=[default_catalog.default_database.sink], fields=[a, b, m])"
"outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` VARCHAR(2147483647)>",
"description" : "Sink(table=[default_catalog.default_database.sink], fields=[a, b, $metadata$m])"
} ],
"edges" : [ {
"source" : 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[$3], computed=[$4])
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, b, c, metadata, computed])
+- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST(metadata_2 AS BIGINT) AS metadata, +(CAST(metadata_2 AS BIGINT), b) AS computed])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, CAST(+(CAST(metadata_2 AS BIGINT), +(CAST(metadata_2 AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, b, c, metadata_2])
+- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST($metadata$metadata_2 AS BIGINT) AS metadata, +(CAST($metadata$metadata_2 AS BIGINT), b) AS computed])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, CAST(+(CAST($metadata$metadata_2 AS BIGINT), +(CAST($metadata$metadata_2 AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, b, c, $metadata$metadata_2])
]]>
</Resource>
</TestCase>
Expand Down
Loading

0 comments on commit e5a7efb

Please sign in to comment.