Skip to content

Commit

Permalink
[FLINK-24397][connectors/jdbc] Remove TableSchema usage from Jdbc tab…
Browse files Browse the repository at this point in the history
…le connector
  • Loading branch information
Fabian Paul committed Oct 13, 2021
1 parent a28e4f6 commit a307c35
Show file tree
Hide file tree
Showing 24 changed files with 142 additions and 2,764 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
Expand Down Expand Up @@ -207,23 +207,16 @@ public CatalogBaseTable getTable(ObjectPath tablePath)

ResultSetMetaData rsmd = ps.getMetaData();

String[] names = new String[rsmd.getColumnCount()];
DataType[] types = new DataType[rsmd.getColumnCount()];

Schema.Builder builder = Schema.newBuilder();
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
names[i - 1] = rsmd.getColumnName(i);
types[i - 1] = fromJDBCType(rsmd, i);
DataType type = fromJDBCType(rsmd, i);
if (rsmd.isNullable(i) == ResultSetMetaData.columnNoNulls) {
types[i - 1] = types[i - 1].notNull();
type = type.notNull();
}
builder.column(rsmd.getColumnName(i), type);
}

TableSchema.Builder tableBuilder = new TableSchema.Builder().fields(names, types);
primaryKey.ifPresent(
pk ->
tableBuilder.primaryKey(
pk.getName(), pk.getColumns().toArray(new String[0])));
TableSchema tableSchema = tableBuilder.build();
primaryKey.ifPresent(pk -> builder.primaryKeyNamed(pk.getName(), pk.getColumns()));

Map<String, String> props = new HashMap<>();
props.put(CONNECTOR.key(), IDENTIFIER);
Expand All @@ -232,7 +225,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
props.put(USERNAME.key(), username);
props.put(PASSWORD.key(), pwd);

return new CatalogTableImpl(tableSchema, props, "");
return CatalogTable.of(builder.build(), "", Collections.emptyList(), props);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
* {@link JdbcBatchStatementExecutor} that provides upsert semantics by updating row if it exists
* and inserting otherwise. Used in Table API.
*
* @deprecated This has been replaced with {@link TableInsertOrUpdateStatementExecutor}, will remove
* this once {@link org.apache.flink.connector.jdbc.table.JdbcUpsertTableSink} is removed.
* @deprecated This has been replaced with {@link TableInsertOrUpdateStatementExecutor}
*/
@Internal
public final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchStatementExecutor<R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;

import java.util.Arrays;
Expand Down Expand Up @@ -80,14 +79,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
helper.validate();
validateConfigOptions(config);
JdbcConnectorOptions jdbcOptions = getJdbcOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

return new JdbcDynamicTableSink(
jdbcOptions,
getJdbcExecutionOptions(config),
getJdbcDmlOptions(jdbcOptions, physicalSchema),
physicalSchema);
getJdbcDmlOptions(
jdbcOptions,
context.getPhysicalRowDataType(),
context.getPrimaryKeyIndexes()),
context.getPhysicalRowDataType());
}

@Override
Expand All @@ -98,13 +98,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {

helper.validate();
validateConfigOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new JdbcDynamicTableSource(
getJdbcOptions(helper.getOptions()),
getJdbcReadOptions(helper.getOptions()),
getJdbcLookupOptions(helper.getOptions()),
physicalSchema);
context.getPhysicalRowDataType());
}

private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) {
Expand Down Expand Up @@ -154,17 +152,19 @@ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
return builder.build();
}

private JdbcDmlOptions getJdbcDmlOptions(JdbcConnectorOptions jdbcOptions, TableSchema schema) {
private JdbcDmlOptions getJdbcDmlOptions(
JdbcConnectorOptions jdbcOptions, DataType dataType, int[] primaryKeyIndexes) {

String[] keyFields =
schema.getPrimaryKey()
.map(pk -> pk.getColumns().toArray(new String[0]))
.orElse(null);
Arrays.stream(primaryKeyIndexes)
.mapToObj(i -> DataType.getFieldNames(dataType).get(i))
.toArray(String[]::new);

return JdbcDmlOptions.builder()
.withTableName(jdbcOptions.getTableName())
.withDialect(jdbcOptions.getDialect())
.withFieldNames(schema.getFieldNames())
.withKeyFields(keyFields)
.withFieldNames(DataType.getFieldNames(dataType).toArray(new String[0]))
.withKeyFields(keyFields.length > 0 ? keyFields : null)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;

import java.util.Objects;
Expand All @@ -42,18 +42,18 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
private final JdbcConnectorOptions jdbcOptions;
private final JdbcExecutionOptions executionOptions;
private final JdbcDmlOptions dmlOptions;
private final TableSchema tableSchema;
private final DataType physicalRowDataType;
private final String dialectName;

public JdbcDynamicTableSink(
JdbcConnectorOptions jdbcOptions,
JdbcExecutionOptions executionOptions,
JdbcDmlOptions dmlOptions,
TableSchema tableSchema) {
DataType physicalRowDataType) {
this.jdbcOptions = jdbcOptions;
this.executionOptions = executionOptions;
this.dmlOptions = dmlOptions;
this.tableSchema = tableSchema;
this.physicalRowDataType = physicalRowDataType;
this.dialectName = dmlOptions.getDialect().dialectName();
}

Expand All @@ -77,21 +77,23 @@ private void validatePrimaryKey(ChangelogMode requestedMode) {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final TypeInformation<RowData> rowDataTypeInformation =
context.createTypeInformation(tableSchema.toRowDataType());
context.createTypeInformation(physicalRowDataType);
final JdbcOutputFormatBuilder builder = new JdbcOutputFormatBuilder();

builder.setJdbcOptions(jdbcOptions);
builder.setJdbcDmlOptions(dmlOptions);
builder.setJdbcExecutionOptions(executionOptions);
builder.setRowDataTypeInfo(rowDataTypeInformation);
builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
builder.setFieldDataTypes(
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]));
return SinkFunctionProvider.of(
new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
}

@Override
public DynamicTableSink copy() {
return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions, tableSchema);
return new JdbcDynamicTableSink(
jdbcOptions, executionOptions, dmlOptions, physicalRowDataType);
}

@Override
Expand All @@ -111,12 +113,13 @@ public boolean equals(Object o) {
return Objects.equals(jdbcOptions, that.jdbcOptions)
&& Objects.equals(executionOptions, that.executionOptions)
&& Objects.equals(dmlOptions, that.dmlOptions)
&& Objects.equals(tableSchema, that.tableSchema)
&& Objects.equals(physicalRowDataType, that.physicalRowDataType)
&& Objects.equals(dialectName, that.dialectName);
}

@Override
public int hashCode() {
return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName);
return Objects.hash(
jdbcOptions, executionOptions, dmlOptions, physicalRowDataType, dialectName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
Expand All @@ -33,8 +32,8 @@
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;

import java.util.Objects;
Expand All @@ -50,19 +49,19 @@ public class JdbcDynamicTableSource
private final JdbcConnectorOptions options;
private final JdbcReadOptions readOptions;
private final JdbcLookupOptions lookupOptions;
private TableSchema physicalSchema;
private DataType physicalRowDataType;
private final String dialectName;
private long limit = -1;

public JdbcDynamicTableSource(
JdbcConnectorOptions options,
JdbcReadOptions readOptions,
JdbcLookupOptions lookupOptions,
TableSchema physicalSchema) {
DataType physicalRowDataType) {
this.options = options;
this.readOptions = readOptions;
this.lookupOptions = lookupOptions;
this.physicalSchema = physicalSchema;
this.physicalRowDataType = physicalRowDataType;
this.dialectName = options.getDialect().dialectName();
}

Expand All @@ -74,16 +73,16 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(
innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
}
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();

return TableFunctionProvider.of(
new JdbcRowDataLookupFunction(
options,
lookupOptions,
physicalSchema.getFieldNames(),
physicalSchema.getFieldDataTypes(),
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
rowType));
}
Expand All @@ -104,7 +103,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
final JdbcDialect dialect = options.getDialect();
String query =
dialect.getSelectFromStatement(
options.getTableName(), physicalSchema.getFieldNames(), new String[0]);
options.getTableName(),
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
new String[0]);
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
Expand All @@ -121,10 +122,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
query = String.format("%s %s", query, dialect.getLimitClause(limit));
}
builder.setQuery(query);
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
builder.setRowDataTypeInfo(
runtimeProviderContext.createTypeInformation(physicalSchema.toRowDataType()));
runtimeProviderContext.createTypeInformation(physicalRowDataType));

return InputFormatProvider.of(builder.build());
}
Expand All @@ -142,12 +143,12 @@ public boolean supportsNestedProjection() {

@Override
public void applyProjection(int[][] projectedFields) {
this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
this.physicalRowDataType = DataType.projectFields(physicalRowDataType, projectedFields);
}

@Override
public DynamicTableSource copy() {
return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalSchema);
return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalRowDataType);
}

@Override
Expand All @@ -167,15 +168,15 @@ public boolean equals(Object o) {
return Objects.equals(options, that.options)
&& Objects.equals(readOptions, that.readOptions)
&& Objects.equals(lookupOptions, that.lookupOptions)
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(physicalRowDataType, that.physicalRowDataType)
&& Objects.equals(dialectName, that.dialectName)
&& Objects.equals(limit, that.limit);
}

@Override
public int hashCode() {
return Objects.hash(
options, readOptions, lookupOptions, physicalSchema, dialectName, limit);
options, readOptions, lookupOptions, physicalRowDataType, dialectName, limit);
}

@Override
Expand Down
Loading

0 comments on commit a307c35

Please sign in to comment.