Skip to content

Commit

Permalink
[FLINK-24397][connectors/hbase] Remove TableSchema usage from Hbase t…
Browse files Browse the repository at this point in the history
…able connector
  • Loading branch information
Fabian Paul committed Oct 18, 2021
1 parent 4d76629 commit 7ba9f91
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
Expand Down Expand Up @@ -70,15 +69,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseClientConf = getHBaseConfiguration(options);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSource(
hbaseClientConf,
Expand All @@ -95,16 +94,16 @@ public DynamicTableSink createDynamicTableSink(Context context) {

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(options);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSink(
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
Expand Down Expand Up @@ -72,16 +71,16 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(options);
HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSource(
hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
Expand All @@ -94,16 +93,16 @@ public DynamicTableSink createDynamicTableSink(Context context) {

final ReadableConfig tableOptions = helper.getOptions();

TableSchema tableSchema = context.getCatalogTable().getSchema();
Map<String, String> options = context.getCatalogTable().getOptions();

validatePrimaryKey(tableSchema);
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(options);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSink(
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -56,11 +57,9 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
hbaseSchema.getRowKeyName().isPresent(),
"HBase schema must have a row key when used in lookup mode.");
checkArgument(
hbaseSchema
.convertsToTableSchema()
.getTableColumn(context.getKeys()[0][0])
.filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
.isPresent(),
DataType.getFieldNames(hbaseSchema.convertToDataType())
.get(context.getKeys()[0][0])
.equals(hbaseSchema.getRowKeyName().get()),
"Currently, HBase table only supports lookup by rowkey field.");
if (lookupOptions.getLookupAsync()) {
return AsyncTableFunctionProvider.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.util.HBaseTestBase;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

Expand All @@ -38,6 +38,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.DOUBLE;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -98,23 +104,18 @@ private HBaseRowDataAsyncLookupFunction buildRowDataAsyncLookupFunction() {
lookupOptions =
HBaseLookupOptions.builder().setCacheMaxSize(4).setCacheExpireMs(10000).build();
}
TableSchema schema =
TableSchema.builder()
.field(ROW_KEY, DataTypes.INT())
.field(FAMILY1, DataTypes.ROW(DataTypes.FIELD(F1COL1, DataTypes.INT())))
.field(
FAMILY2,
DataTypes.ROW(
DataTypes.FIELD(F2COL1, DataTypes.STRING()),
DataTypes.FIELD(F2COL2, DataTypes.BIGINT())))
.field(
DataType dataType =
ROW(
FIELD(ROW_KEY, INT()),
FIELD(FAMILY1, ROW(FIELD(F1COL1, INT()))),
FIELD(FAMILY2, ROW(FIELD(F2COL1, STRING()), FIELD(F2COL2, BIGINT()))),
FIELD(
FAMILY3,
DataTypes.ROW(
DataTypes.FIELD(F3COL1, DataTypes.DOUBLE()),
DataTypes.FIELD(F3COL2, DataTypes.BOOLEAN()),
DataTypes.FIELD(F3COL3, DataTypes.STRING())))
.build();
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
ROW(
FIELD(F3COL1, DOUBLE()),
FIELD(F3COL2, DataTypes.BOOLEAN()),
FIELD(F3COL3, STRING()))));
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
return new HBaseRowDataAsyncLookupFunction(
getConf(), TEST_TABLE_1, hbaseSchema, "null", lookupOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -77,11 +76,9 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
hbaseSchema.getRowKeyName().isPresent(),
"HBase schema must have a row key when used in lookup mode.");
checkArgument(
hbaseSchema
.convertsToTableSchema()
.getTableColumn(context.getKeys()[0][0])
.filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
.isPresent(),
DataType.getFieldNames(hbaseSchema.convertToDataType())
.get(context.getKeys()[0][0])
.equals(hbaseSchema.getRowKeyName().get()),
"Currently, HBase table only supports lookup by rowkey field.");

return TableFunctionProvider.of(
Expand All @@ -97,10 +94,9 @@ public boolean supportsNestedProjection() {

@Override
public void applyProjection(int[][] projectedFields) {
TableSchema projectSchema =
TableSchemaUtils.projectSchema(
hbaseSchema.convertsToTableSchema(), projectedFields);
this.hbaseSchema = HBaseTableSchema.fromTableSchema(projectSchema);
this.hbaseSchema =
HBaseTableSchema.fromDataType(
DataType.projectFields(hbaseSchema.convertToDataType(), projectedFields));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -57,29 +57,31 @@ public class HBaseConnectorOptionsUtil {
* type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the primary key
* constraint must be defined on the single row key field.
*/
public static void validatePrimaryKey(TableSchema schema) {
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
public static void validatePrimaryKey(DataType dataType, int[] primaryKeyIndexes) {
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
if (!hbaseSchema.getRowKeyName().isPresent()) {
throw new IllegalArgumentException(
"HBase table requires to define a row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
schema.getPrimaryKey()
.ifPresent(
k -> {
if (k.getColumns().size() > 1) {
throw new IllegalArgumentException(
"HBase table doesn't support a primary Key on multiple columns. "
+ "The primary key of HBase table must be defined on row key field.");
}
if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
throw new IllegalArgumentException(
"Primary key of HBase table must be defined on the row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
});
if (primaryKeyIndexes.length == 0) {
return;
}
if (primaryKeyIndexes.length > 1) {
throw new IllegalArgumentException(
"HBase table doesn't support a primary Key on multiple columns. "
+ "The primary key of HBase table must be defined on row key field.");
}
if (!hbaseSchema
.getRowKeyName()
.get()
.equals(DataType.getFieldNames(dataType).get(primaryKeyIndexes[0]))) {
throw new IllegalArgumentException(
"Primary key of HBase table must be defined on the row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
}

public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig tableOptions) {
Expand Down
Loading

0 comments on commit 7ba9f91

Please sign in to comment.