Skip to content

Commit

Permalink
[FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Mar 9, 2023
1 parent 13779ab commit a6d8d40
Show file tree
Hide file tree
Showing 39 changed files with 991 additions and 712 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveCatalogLock;
import org.apache.flink.table.connector.RequireCatalogLock;
Expand Down Expand Up @@ -124,7 +124,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
return source;
}

final CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable());
final ResolvedCatalogTable catalogTable =
Preconditions.checkNotNull(context.getCatalogTable());

final boolean isStreamingSource = configuration.get(STREAMING_SOURCE_ENABLE);
final boolean includeAllPartition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
Expand Down Expand Up @@ -79,7 +79,7 @@ public HiveLookupTableSource(
JobConf jobConf,
ReadableConfig flinkConf,
ObjectPath tablePath,
CatalogTable catalogTable) {
ResolvedCatalogTable catalogTable) {
super(jobConf, flinkConf, tablePath, catalogTable);
this.configuration = new Configuration();
catalogTable.getOptions().forEach(configuration::setString);
Expand Down Expand Up @@ -165,8 +165,6 @@ private TableFunction<RowData> getLookupFunction(int[] keys) {
hiveShim,
new JobConfWrapper(jobConf),
catalogTable.getPartitionKeys(),
getProducedTableSchema().getFieldDataTypes(),
getProducedTableSchema().getFieldNames(),
configuration,
defaultPartitionName);

Expand Down Expand Up @@ -258,8 +256,12 @@ private TableFunction<RowData> getLookupFunction(int[] keys) {
jobConf,
hiveVersion,
tablePath,
getTableSchema().getFieldDataTypes(),
getTableSchema().getFieldNames(),
DataType.getFieldDataTypes(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.toArray(new DataType[0]),
DataType.getFieldNames(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.toArray(new String[0]),
catalogTable.getPartitionKeys(),
projectedFields,
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
Expand All @@ -268,7 +270,7 @@ private TableFunction<RowData> getLookupFunction(int[] keys) {
partitionFetcher,
fetcherContext,
partitionReader,
(RowType) getProducedTableSchema().toRowDataType().getLogicalType(),
(RowType) producedDataType.getLogicalType(),
keys,
hiveTableReloadInterval);
}
Expand All @@ -284,17 +286,13 @@ public HiveTablePartitionFetcherContext(
HiveShim hiveShim,
JobConfWrapper confWrapper,
List<String> partitionKeys,
DataType[] fieldTypes,
String[] fieldNames,
Configuration configuration,
String defaultPartitionName) {
super(
tablePath,
hiveShim,
confWrapper,
partitionKeys,
fieldTypes,
fieldNames,
configuration,
defaultPartitionName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
Expand All @@ -57,7 +56,6 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -84,11 +82,12 @@ public class HiveSourceBuilder {

private final ObjectPath tablePath;
private final Map<String, String> tableOptions;
private final TableSchema fullSchema;
private final List<String> partitionKeys;
private final String hiveVersion;

private int[] projectedFields;
private DataType fullSchemaType;
@Nullable private DataType newProducedType;

private Long limit;
private List<HiveTablePartition> partitions;
private List<String> dynamicFilterPartitionKeys;
Expand Down Expand Up @@ -122,8 +121,8 @@ public HiveSourceBuilder(
try (HiveMetastoreClientWrapper client =
new HiveMetastoreClientWrapper(hiveConf, hiveShim)) {
Table hiveTable = client.getTable(dbName, tableName);
this.fullSchema =
HiveTableUtil.createTableSchema(hiveConf, hiveTable, client, hiveShim);
this.fullSchemaType =
HiveTableUtil.extractRowType(hiveConf, hiveTable, client, hiveShim);
this.partitionKeys = HiveCatalog.getFieldNames(hiveTable.getPartitionKeys());
this.tableOptions = new HashMap<>(hiveTable.getParameters());
this.tableOptions.putAll(tableOptions);
Expand All @@ -150,13 +149,13 @@ public HiveSourceBuilder(
@Nonnull ReadableConfig flinkConf,
@Nonnull ObjectPath tablePath,
@Nullable String hiveVersion,
@Nonnull CatalogTable catalogTable) {
@Nonnull ResolvedCatalogTable catalogTable) {
this.jobConf = jobConf;
this.flinkConf = flinkConf;
this.fallbackMappedReader = flinkConf.get(TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER);
this.tablePath = tablePath;
this.hiveVersion = hiveVersion == null ? HiveShimLoader.getHiveVersion() : hiveVersion;
this.fullSchema = catalogTable.getSchema();
this.fullSchemaType = catalogTable.getResolvedSchema().toPhysicalRowDataType();
this.partitionKeys = catalogTable.getPartitionKeys();
this.tableOptions = catalogTable.getOptions();
validateScanConfigurations(tableOptions);
Expand Down Expand Up @@ -219,8 +218,6 @@ public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulk
HiveShimLoader.loadHiveShim(hiveVersion),
new JobConfWrapper(jobConf),
partitionKeys,
fullSchema.getFieldDataTypes(),
fullSchema.getFieldNames(),
configuration,
defaultPartitionName);
}
Expand Down Expand Up @@ -281,12 +278,12 @@ public HiveSourceBuilder setLimit(Long limit) {
}

/**
* Sets the indices of projected fields.
* Sets the new produced type.
*
* @param projectedFields indices of the fields, starting from 0
* @param producedType the new produced type.
*/
public HiveSourceBuilder setProjectedFields(int[] projectedFields) {
this.projectedFields = projectedFields;
public HiveSourceBuilder setProducedType(DataType producedType) {
this.newProducedType = producedType;
return this;
}

Expand Down Expand Up @@ -341,36 +338,16 @@ private boolean isStreamingSource() {
STREAMING_SOURCE_ENABLE.defaultValue().toString()));
}

private RowType getProducedRowType() {
TableSchema producedSchema;
if (projectedFields == null) {
producedSchema = fullSchema;
} else {
String[] fullNames = fullSchema.getFieldNames();
DataType[] fullTypes = fullSchema.getFieldDataTypes();
producedSchema =
TableSchema.builder()
.fields(
Arrays.stream(projectedFields)
.mapToObj(i -> fullNames[i])
.toArray(String[]::new),
Arrays.stream(projectedFields)
.mapToObj(i -> fullTypes[i])
.toArray(DataType[]::new))
.build();
}
return (RowType) producedSchema.toRowDataType().bridgedTo(RowData.class).getLogicalType();
}

protected BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
DataType producedType = newProducedType == null ? fullSchemaType : newProducedType;
return LimitableBulkFormat.create(
new HiveInputFormat(
new JobConfWrapper(jobConf),
partitionKeys,
fullSchema.getFieldNames(),
fullSchema.getFieldDataTypes(),
DataType.getFieldNames(fullSchemaType).toArray(new String[0]),
DataType.getFieldDataTypes(fullSchemaType).toArray(new DataType[0]),
hiveVersion,
getProducedRowType(),
(RowType) producedType.bridgedTo(RowData.class).getLogicalType(),
fallbackMappedReader),
limit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
Expand All @@ -75,9 +76,9 @@
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -128,9 +129,9 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
private final boolean fallbackMappedReader;
private final boolean fallbackMappedWriter;
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ResolvedCatalogTable catalogTable;
private final ObjectIdentifier identifier;
private final TableSchema tableSchema;
private final ResolvedSchema resolvedSchema;
private final String hiveVersion;
private final HiveShim hiveShim;
private final boolean dynamicGroupingEnabled;
Expand All @@ -147,7 +148,7 @@ public HiveTableSink(
ReadableConfig flinkConf,
JobConf jobConf,
ObjectIdentifier identifier,
CatalogTable table,
ResolvedCatalogTable table,
@Nullable Integer configuredSinkParallelism) {
this(
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
Expand All @@ -169,7 +170,7 @@ private HiveTableSink(
int gatherStatsThreadNum,
JobConf jobConf,
ObjectIdentifier identifier,
CatalogTable table,
ResolvedCatalogTable table,
@Nullable Integer configuredSinkParallelism) {
this.fallbackMappedReader = fallbackMappedReader;
this.fallbackMappedWriter = fallbackMappedWriter;
Expand All @@ -184,15 +185,15 @@ private HiveTableSink(
jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
"Hive version is not defined");
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
resolvedSchema = table.getResolvedSchema();
this.configuredSinkParallelism = configuredSinkParallelism;
validateAutoGatherStatistic(autoGatherStatistic, catalogTable);
}

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStructureConverter converter =
context.createDataStructureConverter(tableSchema.toRowDataType());
context.createDataStructureConverter(resolvedSchema.toPhysicalRowDataType());
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
Expand Down Expand Up @@ -284,7 +285,7 @@ private DataStreamSink<?> consume(
jobConf,
hiveOutputFormatClz,
sd.getSerdeInfo(),
tableSchema,
resolvedSchema,
getPartitionKeyArray(),
tableProps,
hiveShim,
Expand Down Expand Up @@ -469,8 +470,8 @@ private DataStreamSink<?> createBatchCompactSink(
new HiveRowPartitionComputer(
hiveShim,
JobConfUtils.getDefaultPartitionName(jobConf),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
resolvedSchema.getColumnNames().toArray(new String[0]),
resolvedSchema.getColumnDataTypes().toArray(new DataType[0]),
partitionColumns);

SingleOutputStreamOperator<Row> map =
Expand Down Expand Up @@ -531,8 +532,8 @@ private DataStreamSink<?> createBatchCompactSink(
new HiveRowDataPartitionComputer(
hiveShim,
JobConfUtils.getDefaultPartitionName(jobConf),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
resolvedSchema.getColumnNames().toArray(new String[0]),
resolvedSchema.getColumnDataTypes().toArray(new DataType[0]),
getPartitionKeyArray());
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
HiveRollingPolicy rollingPolicy =
Expand Down Expand Up @@ -590,8 +591,8 @@ private DataStreamSink<Row> createBatchNoCompactSink(
new HiveRowPartitionComputer(
hiveShim,
JobConfUtils.getDefaultPartitionName(jobConf),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
resolvedSchema.getColumnNames().toArray(new String[0]),
resolvedSchema.getColumnDataTypes().toArray(new DataType[0]),
getPartitionKeyArray()));
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(getPartitionKeyArray());
Expand Down Expand Up @@ -703,7 +704,7 @@ private CompactReader.Factory<RowData> createCompactReaderFactory(
jobConf,
catalogTable,
hiveVersion,
(RowType) tableSchema.toRowDataType().getLogicalType(),
(RowType) resolvedSchema.toSinkRowDataType().getLogicalType(),
fallbackMappedReader);
}

Expand Down Expand Up @@ -741,12 +742,12 @@ private HadoopFileSystemFactory fsFactory() {
private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(
String[] partitionColumns, StorageDescriptor sd) {
String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
int formatFieldCount = tableSchema.getFieldCount() - partitionColumns.length;
int formatFieldCount = resolvedSchema.getColumns().size() - partitionColumns.length;
String[] formatNames = new String[formatFieldCount];
LogicalType[] formatTypes = new LogicalType[formatFieldCount];
for (int i = 0; i < formatFieldCount; i++) {
formatNames[i] = tableSchema.getFieldName(i).get();
formatTypes[i] = tableSchema.getFieldDataType(i).get().getLogicalType();
formatNames[i] = resolvedSchema.getColumnNames().get(i);
formatTypes[i] = resolvedSchema.getColumnDataTypes().get(i).getLogicalType();
}
RowType formatType = RowType.of(formatTypes, formatNames);
if (serLib.contains("parquet")) {
Expand Down

0 comments on commit a6d8d40

Please sign in to comment.