Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19789][hive] Migrate Hive connector to new table source sink interface #13771

Merged
merged 7 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,35 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.table.catalog.config.CatalogConfig.IS_GENERIC;

/**
* A dynamic table factory implementation for Hive catalog. Now it only support generic tables.
* Hive tables should be resolved by {@link HiveTableFactory}.
* A dynamic table factory implementation for Hive catalog.
*/
public class HiveDynamicTableFactory implements
DynamicTableSourceFactory,
DynamicTableSinkFactory {

private final HiveConf hiveConf;

public HiveDynamicTableFactory(HiveConf hiveConf) {
this.hiveConf = hiveConf;
}

@Override
public String factoryIdentifier() {
throw new UnsupportedOperationException("Hive factory is only work for catalog.");
Expand Down Expand Up @@ -69,23 +78,46 @@ private static CatalogTable removeIsGenericFlag(Context context) {

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
removeIsGenericFlag(context),
context.getConfiguration(),
context.getClassLoader(),
false);
boolean isGeneric = Boolean.parseBoolean(context.getCatalogTable().getOptions().get(CatalogConfig.IS_GENERIC));

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
return new HiveTableSink(
context.getConfiguration().get(
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
new JobConf(hiveConf),
context.getObjectIdentifier(),
context.getCatalogTable());
} else {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
removeIsGenericFlag(context),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
}
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
return FactoryUtil.createTableSource(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
removeIsGenericFlag(context),
context.getConfiguration(),
context.getClassLoader(),
false);
boolean isGeneric = Boolean.parseBoolean(context.getCatalogTable().getOptions().get(CatalogConfig.IS_GENERIC));

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
return new HiveTableSource(
new JobConf(hiveConf),
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
context.getCatalogTable());
} else {
return FactoryUtil.createTableSource(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
removeIsGenericFlag(context),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.Preconditions;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;

import java.util.List;
import java.util.Map;

Expand All @@ -41,13 +37,7 @@
* A table factory implementation for Hive catalog.
*/
public class HiveTableFactory
implements TableSourceFactory<RowData>, TableSinkFactory {

private final HiveConf hiveConf;

public HiveTableFactory(HiveConf hiveConf) {
this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
}
implements TableSourceFactory, TableSinkFactory {

@Override
public Map<String, String> requiredContext() {
Expand All @@ -60,19 +50,15 @@ public List<String> supportedProperties() {
}

@Override
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
public TableSource createTableSource(TableSourceFactory.Context context) {
CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);

boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
return new HiveTableSource(
new JobConf(hiveConf),
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
table);
throw new UnsupportedOperationException("Hive table should be resolved by HiveDynamicTableFactory.");
} else {
return TableFactoryUtil.findAndCreateTableSource(context);
}
Expand All @@ -87,13 +73,7 @@ public TableSink createTableSink(TableSinkFactory.Context context) {

// temporary table doesn't have the IS_GENERIC flag but we still consider it generic
if (!isGeneric && !context.isTemporary()) {
return new HiveTableSink(
context.getConfiguration().get(
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
context.isBounded(),
new JobConf(hiveConf),
context.getObjectIdentifier(),
table);
throw new UnsupportedOperationException("Hive table should be resolved by HiveDynamicTableFactory.");
} else {
return TableFactoryUtil.findAndCreateTableSink(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.connectors.hive;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
Expand All @@ -44,15 +44,15 @@
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
Expand Down Expand Up @@ -90,12 +90,11 @@
/**
* Table sink to write to Hive tables.
*/
public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {

private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);

private final boolean userMrWriter;
private final boolean isBounded;
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ObjectIdentifier identifier;
Expand All @@ -104,14 +103,12 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
private final HiveShim hiveShim;

private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();

private boolean overwrite = false;
private boolean dynamicGrouping = false;

public HiveTableSink(
boolean userMrWriter, boolean isBounded, JobConf jobConf, ObjectIdentifier identifier, CatalogTable table) {
boolean userMrWriter, JobConf jobConf, ObjectIdentifier identifier, CatalogTable table) {
this.userMrWriter = userMrWriter;
this.isBounded = isBounded;
this.jobConf = jobConf;
this.identifier = identifier;
this.catalogTable = table;
Expand All @@ -122,7 +119,13 @@ public HiveTableSink(
}

@Override
public final DataStreamSink consumeDataStream(DataStream dataStream) {
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStructureConverter converter = context.createDataStructureConverter(tableSchema.toRowDataType());
return (DataStreamSinkProvider) dataStream -> consume(dataStream, context.isBounded(), converter);
}

private DataStreamSink<?> consume(
DataStream<RowData> dataStream, boolean isBounded, DataStructureConverter converter) {
checkAcidTable(catalogTable, identifier.toObjectPath());
String[] partitionColumns = getPartitionKeys().toArray(new String[0]);
String dbName = identifier.getDatabaseName();
Expand Down Expand Up @@ -175,6 +178,7 @@ public final DataStreamSink consumeDataStream(DataStream dataStream) {
toStagingDir(sd.getLocation(), jobConf)));
builder.setOutputFileConfig(outputFileConfig);
return dataStream
.map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value))
.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
} else {
Expand Down Expand Up @@ -276,23 +280,7 @@ private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(String[] p
}

@Override
public DataType getConsumedDataType() {
DataType dataType = getTableSchema().toRowDataType();
return isBounded ? dataType : dataType.bridgedTo(RowData.class);
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}

@Override
public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) {
return this;
}

@Override
public boolean configurePartitionGrouping(boolean supportsGrouping) {
public boolean requiresPartitionGrouping(boolean supportsGrouping) {
this.dynamicGrouping = supportsGrouping;
return supportsGrouping;
}
Expand All @@ -317,21 +305,40 @@ private List<String> getPartitionKeys() {
}

@Override
public void setStaticPartition(Map<String, String> partitionSpec) {
public void applyStaticPartition(Map<String, String> partition) {
// make it a LinkedHashMap to maintain partition column order
staticPartitionSpec = new LinkedHashMap<>();
for (String partitionCol : getPartitionKeys()) {
if (partitionSpec.containsKey(partitionCol)) {
staticPartitionSpec.put(partitionCol, partitionSpec.get(partitionCol));
if (partition.containsKey(partitionCol)) {
staticPartitionSpec.put(partitionCol, partition.get(partitionCol));
}
}
}

@Override
public void setOverwrite(boolean overwrite) {
public void applyOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}

@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return ChangelogMode.insertOnly();
}

@Override
public DynamicTableSink copy() {
HiveTableSink sink = new HiveTableSink(userMrWriter, jobConf, identifier, catalogTable);
sink.staticPartitionSpec = staticPartitionSpec;
sink.overwrite = overwrite;
sink.dynamicGrouping = dynamicGrouping;
return sink;
}

@Override
public String asSummaryString() {
return "HiveSink";
}

/**
* Getting size of the file is too expensive. See {@link HiveBulkWriterFactory#create}.
* We can't check for every element, which will cause great pressure on DFS.
Expand Down