Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-24397] Remove TableSchema usages from Flink connectors #17459

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
Expand Down Expand Up @@ -70,15 +69,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseClientConf = getHBaseConfiguration(options);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSource(
hbaseClientConf,
Expand All @@ -95,16 +94,16 @@ public DynamicTableSink createDynamicTableSink(Context context) {

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(options);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSink(
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
Expand Down Expand Up @@ -72,16 +71,16 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(options);
HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSource(
hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
Expand All @@ -94,16 +93,16 @@ public DynamicTableSink createDynamicTableSink(Context context) {

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(options);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSink(
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -56,11 +57,9 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
hbaseSchema.getRowKeyName().isPresent(),
"HBase schema must have a row key when used in lookup mode.");
checkArgument(
hbaseSchema
.convertsToTableSchema()
.getTableColumn(context.getKeys()[0][0])
.filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
.isPresent(),
DataType.getFieldNames(hbaseSchema.convertToDataType())
.get(context.getKeys()[0][0])
.equals(hbaseSchema.getRowKeyName().get()),
"Currently, HBase table only supports lookup by rowkey field.");
if (lookupOptions.getLookupAsync()) {
return AsyncTableFunctionProvider.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.util.HBaseTestBase;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

Expand All @@ -38,6 +38,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.DOUBLE;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -98,23 +104,18 @@ private HBaseRowDataAsyncLookupFunction buildRowDataAsyncLookupFunction() {
lookupOptions =
HBaseLookupOptions.builder().setCacheMaxSize(4).setCacheExpireMs(10000).build();
}
TableSchema schema =
TableSchema.builder()
.field(ROW_KEY, DataTypes.INT())
.field(FAMILY1, DataTypes.ROW(DataTypes.FIELD(F1COL1, DataTypes.INT())))
.field(
FAMILY2,
DataTypes.ROW(
DataTypes.FIELD(F2COL1, DataTypes.STRING()),
DataTypes.FIELD(F2COL2, DataTypes.BIGINT())))
.field(
DataType dataType =
ROW(
FIELD(ROW_KEY, INT()),
FIELD(FAMILY1, ROW(FIELD(F1COL1, INT()))),
FIELD(FAMILY2, ROW(FIELD(F2COL1, STRING()), FIELD(F2COL2, BIGINT()))),
FIELD(
FAMILY3,
DataTypes.ROW(
DataTypes.FIELD(F3COL1, DataTypes.DOUBLE()),
DataTypes.FIELD(F3COL2, DataTypes.BOOLEAN()),
DataTypes.FIELD(F3COL3, DataTypes.STRING())))
.build();
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
ROW(
FIELD(F3COL1, DOUBLE()),
FIELD(F3COL2, DataTypes.BOOLEAN()),
FIELD(F3COL3, STRING()))));
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
return new HBaseRowDataAsyncLookupFunction(
getConf(), TEST_TABLE_1, hbaseSchema, "null", lookupOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -77,11 +76,9 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
hbaseSchema.getRowKeyName().isPresent(),
"HBase schema must have a row key when used in lookup mode.");
checkArgument(
hbaseSchema
.convertsToTableSchema()
.getTableColumn(context.getKeys()[0][0])
.filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
.isPresent(),
DataType.getFieldNames(hbaseSchema.convertToDataType())
.get(context.getKeys()[0][0])
.equals(hbaseSchema.getRowKeyName().get()),
"Currently, HBase table only supports lookup by rowkey field.");

return TableFunctionProvider.of(
Expand All @@ -97,10 +94,9 @@ public boolean supportsNestedProjection() {

@Override
public void applyProjection(int[][] projectedFields) {
TableSchema projectSchema =
TableSchemaUtils.projectSchema(
hbaseSchema.convertsToTableSchema(), projectedFields);
this.hbaseSchema = HBaseTableSchema.fromTableSchema(projectSchema);
this.hbaseSchema =
HBaseTableSchema.fromDataType(
DataType.projectFields(hbaseSchema.convertToDataType(), projectedFields));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -57,29 +57,31 @@ public class HBaseConnectorOptionsUtil {
* type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the primary key
* constraint must be defined on the single row key field.
*/
public static void validatePrimaryKey(TableSchema schema) {
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
public static void validatePrimaryKey(DataType dataType, int[] primaryKeyIndexes) {
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
if (!hbaseSchema.getRowKeyName().isPresent()) {
throw new IllegalArgumentException(
"HBase table requires to define a row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
schema.getPrimaryKey()
.ifPresent(
k -> {
if (k.getColumns().size() > 1) {
throw new IllegalArgumentException(
"HBase table doesn't support a primary Key on multiple columns. "
+ "The primary key of HBase table must be defined on row key field.");
}
if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
throw new IllegalArgumentException(
"Primary key of HBase table must be defined on the row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
});
if (primaryKeyIndexes.length == 0) {
return;
}
if (primaryKeyIndexes.length > 1) {
throw new IllegalArgumentException(
"HBase table doesn't support a primary Key on multiple columns. "
+ "The primary key of HBase table must be defined on row key field.");
}
if (!hbaseSchema
.getRowKeyName()
.get()
.equals(DataType.getFieldNames(dataType).get(primaryKeyIndexes[0]))) {
throw new IllegalArgumentException(
"Primary key of HBase table must be defined on the row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
}

public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig tableOptions) {
Expand Down
Loading