Skip to content

Commit

Permalink
[FLINK-25113] Cleanup from Parquet and Orc the partition key handling…
Browse files Browse the repository at this point in the history
… logic
  • Loading branch information
Aitozi committed Nov 14, 2022
1 parent b12372e commit 8558e4d
Show file tree
Hide file tree
Showing 32 changed files with 369 additions and 1,066 deletions.
7 changes: 7 additions & 0 deletions flink-connectors/flink-connector-files/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.file.src.util;

/** Record mapper definition. */
@FunctionalInterface
public interface RecordMapper<I, O> {
/** Map the record. Both input value and output value are expected to be non-null. */
O map(I in);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@
*/
public class RecordMapperWrapperRecordIterator<I, O> implements BulkFormat.RecordIterator<O> {

/** Record mapper definition. */
@FunctionalInterface
public interface RecordMapper<I, O> {
/** Map the record. Both input value and output value are expected to be non-null. */
O map(I in);
}

private final BulkFormat.RecordIterator<I> wrapped;
private final RecordMapper<I, O> mapper;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.file.table;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;

import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** A helper class to build the fixed and mutable row index mapping. */
public class FileInfoExtractor implements Serializable {

private static final long serialVersionUID = 1L;

private final List<PartitionColumn> partitionColumns;
private final int[] extendedRowIndexMapping;

public FileInfoExtractor(
DataType producedDataType,
List<String> metadataColumns,
List<String> partitionColumns) {

// Compute index mapping for the extended row and the functions to compute metadata
List<DataTypes.Field> producedRowField = DataType.getFields(producedDataType);
List<String> producedRowFieldNames =
producedRowField.stream()
.map(DataTypes.Field::getName)
.collect(Collectors.toList());

// Filter out partition columns not in producedDataType
final List<String> partitionKeysToExtract =
DataType.getFieldNames(producedDataType).stream()
.filter(partitionColumns::contains)
.collect(Collectors.toList());

List<String> mutableRowFieldNames =
producedRowFieldNames.stream()
.filter(
key ->
!metadataColumns.contains(key)
&& !partitionKeysToExtract.contains(key))
.collect(Collectors.toList());

List<String> fixedRowFieldNames =
Stream.concat(metadataColumns.stream(), partitionKeysToExtract.stream())
.collect(Collectors.toList());
this.partitionColumns =
partitionKeysToExtract.stream()
.map(
name ->
new PartitionColumn(
name,
producedRowField
.get(producedRowFieldNames.indexOf(name))
.getDataType()))
.collect(Collectors.toList());

this.extendedRowIndexMapping =
EnrichedRowData.computeIndexMapping(
producedRowFieldNames, mutableRowFieldNames, fixedRowFieldNames);
}

public List<PartitionColumn> getPartitionColumns() {
return partitionColumns;
}

public int[] getExtendedRowIndexMapping() {
return extendedRowIndexMapping;
}

/** Info of the partition column. */
public static class PartitionColumn implements Serializable {
private static final long serialVersionUID = 1L;
public final String fieldName;
public final DataFormatConverters.DataFormatConverter converter;
public final DataType dataType;

public PartitionColumn(String fieldName, DataType dataType) {
this.fieldName = fieldName;
this.dataType = dataType;
this.converter = DataFormatConverters.getConverterForDataType(dataType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,15 @@
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;

import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* This {@link BulkFormat} is a wrapper that attaches file information columns to the output
Expand All @@ -46,67 +41,24 @@ public class FileInfoExtractorBulkFormat<SplitT extends FileSourceSplit>
implements BulkFormat<RowData, SplitT> {

private final BulkFormat<RowData, SplitT> wrapped;
private final TypeInformation<RowData> producedType;

private final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
private final List<Map.Entry<String, DataType>> partitionColumnTypes;
private final int[] extendedRowIndexMapping;

private final String defaultPartName;
private final PartitionFieldExtractor<SplitT> partitionFieldExtractor;
private final FileInfoExtractor fileInfoExtractor;
public final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
public final TypeInformation<RowData> producedType;

public FileInfoExtractorBulkFormat(
BulkFormat<RowData, SplitT> wrapped,
DataType producedDataType,
TypeInformation<RowData> producedTypeInformation,
Map<String, FileSystemTableSource.FileInfoAccessor> metadataColumns,
List<String> partitionColumns,
String defaultPartName) {
PartitionFieldExtractor<SplitT> partitionFieldExtractor) {
List<String> metadataFieldNames = new ArrayList<>(metadataColumns.keySet());
this.wrapped = wrapped;
this.partitionFieldExtractor = partitionFieldExtractor;
this.producedType = producedTypeInformation;
this.defaultPartName = defaultPartName;

// Compute index mapping for the extended row and the functions to compute metadata
List<DataTypes.Field> producedRowField = DataType.getFields(producedDataType);
List<String> producedRowFieldNames =
producedRowField.stream()
.map(DataTypes.Field::getName)
.collect(Collectors.toList());

// Filter out partition columns not in producedDataType
final List<String> partitionKeysToExtract =
DataType.getFieldNames(producedDataType).stream()
.filter(partitionColumns::contains)
.collect(Collectors.toList());

List<String> mutableRowFieldNames =
producedRowFieldNames.stream()
.filter(
key ->
!metadataColumns.containsKey(key)
&& !partitionKeysToExtract.contains(key))
.collect(Collectors.toList());
List<String> metadataFieldNames = new ArrayList<>(metadataColumns.keySet());

List<String> fixedRowFieldNames =
Stream.concat(metadataFieldNames.stream(), partitionKeysToExtract.stream())
.collect(Collectors.toList());

this.partitionColumnTypes =
partitionKeysToExtract.stream()
.map(
fieldName ->
new SimpleImmutableEntry<>(
fieldName,
producedRowField
.get(
producedRowFieldNames.indexOf(
fieldName))
.getDataType()))
.collect(Collectors.toList());

this.extendedRowIndexMapping =
EnrichedRowData.computeIndexMapping(
producedRowFieldNames, mutableRowFieldNames, fixedRowFieldNames);
this.fileInfoExtractor =
new FileInfoExtractor(producedDataType, metadataFieldNames, partitionColumns);
this.metadataColumnsFunctions =
metadataFieldNames.stream().map(metadataColumns::get).collect(Collectors.toList());
}
Expand All @@ -131,41 +83,38 @@ public TypeInformation<RowData> getProducedType() {
return producedType;
}

private Reader<RowData> wrapReader(Reader<RowData> superReader, FileSourceSplit split) {
private Reader<RowData> wrapReader(Reader<RowData> superReader, SplitT split) {
// Fill the metadata + partition columns row
List<FileInfoExtractor.PartitionColumn> partitionColumns =
fileInfoExtractor.getPartitionColumns();
final GenericRowData fileInfoRowData =
new GenericRowData(metadataColumnsFunctions.size() + partitionColumnTypes.size());
new GenericRowData(metadataColumnsFunctions.size() + partitionColumns.size());
int fileInfoRowIndex = 0;
for (; fileInfoRowIndex < metadataColumnsFunctions.size(); fileInfoRowIndex++) {
fileInfoRowData.setField(
fileInfoRowIndex,
metadataColumnsFunctions.get(fileInfoRowIndex).getValue(split));
}
if (!partitionColumnTypes.isEmpty()) {
final LinkedHashMap<String, String> partitionSpec =
PartitionPathUtils.extractPartitionSpecFromPath(split.path());
if (!partitionColumns.isEmpty()) {
for (int partitionFieldIndex = 0;
fileInfoRowIndex < fileInfoRowData.getArity();
fileInfoRowIndex++, partitionFieldIndex++) {
final String fieldName = partitionColumnTypes.get(partitionFieldIndex).getKey();
final DataType fieldType = partitionColumnTypes.get(partitionFieldIndex).getValue();
if (!partitionSpec.containsKey(fieldName)) {
throw new RuntimeException(
"Cannot find the partition value from path for partition: "
+ fieldName);
}
FileInfoExtractor.PartitionColumn partition =
partitionColumns.get(partitionFieldIndex);

Object partitionValue =
partitionFieldExtractor.extract(
split, partition.fieldName, partition.dataType.getLogicalType());

String valueStr = partitionSpec.get(fieldName);
valueStr = valueStr.equals(defaultPartName) ? null : valueStr;
fileInfoRowData.setField(
fileInfoRowIndex,
PartitionPathUtils.convertStringToInternalValue(valueStr, fieldType));
fileInfoRowIndex, partition.converter.toInternal(partitionValue));
}
}

// This row is going to be reused for every record
final EnrichedRowData producedRowData =
new EnrichedRowData(fileInfoRowData, this.extendedRowIndexMapping);
new EnrichedRowData(
fileInfoRowData, fileInfoExtractor.getExtendedRowIndexMapping());

return RecordMapperWrapperRecordIterator.wrapReader(
superReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private Optional<CompactReader.Factory<RowData>> createCompactReaderFactory(Cont
context.createTypeInformation(producedDataType),
Collections.emptyMap(),
partitionKeys,
defaultPartName);
PartitionFieldExtractor.forFileSystem(defaultPartName));
return Optional.of(CompactBulkReader.factory(format));
} else if (deserializationFormat != null) {
final DeserializationSchema<RowData> decoder =
Expand All @@ -320,7 +320,7 @@ private Optional<CompactReader.Factory<RowData>> createCompactReaderFactory(Cont
context.createTypeInformation(producedDataType),
Collections.emptyMap(),
partitionKeys,
defaultPartName);
PartitionFieldExtractor.forFileSystem(defaultPartName));
return Optional.of(CompactBulkReader.factory(format));
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private BulkFormat<RowData, FileSourceSplit> wrapBulkFormat(
context.createTypeInformation(producedDataType),
metadataColumns,
partitionKeys,
defaultPartName);
PartitionFieldExtractor.forFileSystem(defaultPartName));
}
bulkFormat = LimitableBulkFormat.create(bulkFormat, limit);
return bulkFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/** Interface to extract partition field from split. */
@FunctionalInterface
@Internal
public interface PartitionFieldExtractor<T extends FileSourceSplit> extends Serializable {
public interface PartitionFieldExtractor<T> extends Serializable {

Object extract(T split, String fieldName, LogicalType fieldType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ private TableFunction<RowData> getLookupFunction(int[] keys) {
getProducedTableSchema().getFieldNames(),
catalogTable.getPartitionKeys(),
projectedFields,
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
producedTypes,
defaultPartitionName);

return new FileSystemLookupFunction<>(
partitionFetcher,
Expand Down

0 comments on commit 8558e4d

Please sign in to comment.