diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java index ee5a399e77d9c..f22369f8524bd 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java @@ -20,12 +20,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -207,23 +207,16 @@ public CatalogBaseTable getTable(ObjectPath tablePath) ResultSetMetaData rsmd = ps.getMetaData(); - String[] names = new String[rsmd.getColumnCount()]; - DataType[] types = new DataType[rsmd.getColumnCount()]; - + Schema.Builder builder = Schema.newBuilder(); for (int i = 1; i <= rsmd.getColumnCount(); i++) { - names[i - 1] = rsmd.getColumnName(i); - types[i - 1] = fromJDBCType(rsmd, i); + DataType type = fromJDBCType(rsmd, i); if (rsmd.isNullable(i) == ResultSetMetaData.columnNoNulls) { - types[i - 1] = types[i - 1].notNull(); + type = type.notNull(); } + builder.column(rsmd.getColumnName(i), type); } - TableSchema.Builder tableBuilder = new TableSchema.Builder().fields(names, types); - primaryKey.ifPresent( - pk -> - tableBuilder.primaryKey( - pk.getName(), pk.getColumns().toArray(new String[0]))); - TableSchema tableSchema = tableBuilder.build(); + primaryKey.ifPresent(pk -> builder.primaryKeyNamed(pk.getName(), pk.getColumns())); Map props = new HashMap<>(); props.put(CONNECTOR.key(), IDENTIFIER); @@ -232,7 +225,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) props.put(USERNAME.key(), username); props.put(PASSWORD.key(), pwd); - return new CatalogTableImpl(tableSchema, props, ""); + return CatalogTable.of(builder.build(), "", Collections.emptyList(), props); } catch (Exception e) { throw new CatalogException( String.format("Failed getting table %s", tablePath.getFullName()), e); diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java index 19daa2f084c83..838cbc5293386 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java @@ -40,8 +40,7 @@ * {@link JdbcBatchStatementExecutor} that provides upsert semantics by updating row if it exists * and inserting otherwise. Used in Table API. * - * @deprecated This has been replaced with {@link TableInsertOrUpdateStatementExecutor}, will remove - * this once {@link org.apache.flink.connector.jdbc.table.JdbcUpsertTableSink} is removed. + * @deprecated This has been replaced with {@link TableInsertOrUpdateStatementExecutor} */ @Internal public final class InsertOrUpdateJdbcExecutor implements JdbcBatchStatementExecutor { diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java index 40e5495e1db8a..80e5bd3d44efc 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java @@ -28,13 +28,12 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; import java.util.Arrays; @@ -80,14 +79,15 @@ public DynamicTableSink createDynamicTableSink(Context context) { helper.validate(); validateConfigOptions(config); JdbcConnectorOptions jdbcOptions = getJdbcOptions(config); - TableSchema physicalSchema = - TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); return new JdbcDynamicTableSink( jdbcOptions, getJdbcExecutionOptions(config), - getJdbcDmlOptions(jdbcOptions, physicalSchema), - physicalSchema); + getJdbcDmlOptions( + jdbcOptions, + context.getPhysicalRowDataType(), + context.getPrimaryKeyIndexes()), + context.getPhysicalRowDataType()); } @Override @@ -98,13 +98,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { helper.validate(); validateConfigOptions(config); - TableSchema physicalSchema = - TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); return new JdbcDynamicTableSource( getJdbcOptions(helper.getOptions()), getJdbcReadOptions(helper.getOptions()), getJdbcLookupOptions(helper.getOptions()), - physicalSchema); + context.getPhysicalRowDataType()); } private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) { @@ -154,17 +152,19 @@ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) { return builder.build(); } - private JdbcDmlOptions getJdbcDmlOptions(JdbcConnectorOptions jdbcOptions, TableSchema schema) { + private JdbcDmlOptions getJdbcDmlOptions( + JdbcConnectorOptions jdbcOptions, DataType dataType, int[] primaryKeyIndexes) { + String[] keyFields = - schema.getPrimaryKey() - .map(pk -> pk.getColumns().toArray(new String[0])) - .orElse(null); + Arrays.stream(primaryKeyIndexes) + .mapToObj(i -> DataType.getFieldNames(dataType).get(i)) + .toArray(String[]::new); return JdbcDmlOptions.builder() .withTableName(jdbcOptions.getTableName()) .withDialect(jdbcOptions.getDialect()) - .withFieldNames(schema.getFieldNames()) - .withKeyFields(keyFields) + .withFieldNames(DataType.getFieldNames(dataType).toArray(new String[0])) + .withKeyFields(keyFields.length > 0 ? keyFields : null) .build(); } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java index 74de265ebdce6..03256ff2c40bd 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java @@ -24,11 +24,11 @@ import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; import java.util.Objects; @@ -42,18 +42,18 @@ public class JdbcDynamicTableSink implements DynamicTableSink { private final JdbcConnectorOptions jdbcOptions; private final JdbcExecutionOptions executionOptions; private final JdbcDmlOptions dmlOptions; - private final TableSchema tableSchema; + private final DataType physicalRowDataType; private final String dialectName; public JdbcDynamicTableSink( JdbcConnectorOptions jdbcOptions, JdbcExecutionOptions executionOptions, JdbcDmlOptions dmlOptions, - TableSchema tableSchema) { + DataType physicalRowDataType) { this.jdbcOptions = jdbcOptions; this.executionOptions = executionOptions; this.dmlOptions = dmlOptions; - this.tableSchema = tableSchema; + this.physicalRowDataType = physicalRowDataType; this.dialectName = dmlOptions.getDialect().dialectName(); } @@ -77,21 +77,23 @@ private void validatePrimaryKey(ChangelogMode requestedMode) { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { final TypeInformation rowDataTypeInformation = - context.createTypeInformation(tableSchema.toRowDataType()); + context.createTypeInformation(physicalRowDataType); final JdbcOutputFormatBuilder builder = new JdbcOutputFormatBuilder(); builder.setJdbcOptions(jdbcOptions); builder.setJdbcDmlOptions(dmlOptions); builder.setJdbcExecutionOptions(executionOptions); builder.setRowDataTypeInfo(rowDataTypeInformation); - builder.setFieldDataTypes(tableSchema.getFieldDataTypes()); + builder.setFieldDataTypes( + DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0])); return SinkFunctionProvider.of( new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism()); } @Override public DynamicTableSink copy() { - return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions, tableSchema); + return new JdbcDynamicTableSink( + jdbcOptions, executionOptions, dmlOptions, physicalRowDataType); } @Override @@ -111,12 +113,13 @@ public boolean equals(Object o) { return Objects.equals(jdbcOptions, that.jdbcOptions) && Objects.equals(executionOptions, that.executionOptions) && Objects.equals(dmlOptions, that.dmlOptions) - && Objects.equals(tableSchema, that.tableSchema) + && Objects.equals(physicalRowDataType, that.physicalRowDataType) && Objects.equals(dialectName, that.dialectName); } @Override public int hashCode() { - return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName); + return Objects.hash( + jdbcOptions, executionOptions, dmlOptions, physicalRowDataType, dialectName); } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java index f4b4f573e4ef0..d6805a73b68c1 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java @@ -24,7 +24,6 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.InputFormatProvider; @@ -33,8 +32,8 @@ import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; import java.util.Objects; @@ -50,7 +49,7 @@ public class JdbcDynamicTableSource private final JdbcConnectorOptions options; private final JdbcReadOptions readOptions; private final JdbcLookupOptions lookupOptions; - private TableSchema physicalSchema; + private DataType physicalRowDataType; private final String dialectName; private long limit = -1; @@ -58,11 +57,11 @@ public JdbcDynamicTableSource( JdbcConnectorOptions options, JdbcReadOptions readOptions, JdbcLookupOptions lookupOptions, - TableSchema physicalSchema) { + DataType physicalRowDataType) { this.options = options; this.readOptions = readOptions; this.lookupOptions = lookupOptions; - this.physicalSchema = physicalSchema; + this.physicalRowDataType = physicalRowDataType; this.dialectName = options.getDialect().dialectName(); } @@ -74,16 +73,16 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); - keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]]; + keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } - final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); return TableFunctionProvider.of( new JdbcRowDataLookupFunction( options, lookupOptions, - physicalSchema.getFieldNames(), - physicalSchema.getFieldDataTypes(), + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), + DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), keyNames, rowType)); } @@ -104,7 +103,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon final JdbcDialect dialect = options.getDialect(); String query = dialect.getSelectFromStatement( - options.getTableName(), physicalSchema.getFieldNames(), new String[0]); + options.getTableName(), + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), + new String[0]); if (readOptions.getPartitionColumnName().isPresent()) { long lowerBound = readOptions.getPartitionLowerBound().get(); long upperBound = readOptions.getPartitionUpperBound().get(); @@ -121,10 +122,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon query = String.format("%s %s", query, dialect.getLimitClause(limit)); } builder.setQuery(query); - final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); builder.setRowConverter(dialect.getRowConverter(rowType)); builder.setRowDataTypeInfo( - runtimeProviderContext.createTypeInformation(physicalSchema.toRowDataType())); + runtimeProviderContext.createTypeInformation(physicalRowDataType)); return InputFormatProvider.of(builder.build()); } @@ -142,12 +143,12 @@ public boolean supportsNestedProjection() { @Override public void applyProjection(int[][] projectedFields) { - this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields); + this.physicalRowDataType = DataType.projectFields(physicalRowDataType, projectedFields); } @Override public DynamicTableSource copy() { - return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalSchema); + return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalRowDataType); } @Override @@ -167,7 +168,7 @@ public boolean equals(Object o) { return Objects.equals(options, that.options) && Objects.equals(readOptions, that.readOptions) && Objects.equals(lookupOptions, that.lookupOptions) - && Objects.equals(physicalSchema, that.physicalSchema) + && Objects.equals(physicalRowDataType, that.physicalRowDataType) && Objects.equals(dialectName, that.dialectName) && Objects.equals(limit, that.limit); } @@ -175,7 +176,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( - options, readOptions, lookupOptions, physicalSchema, dialectName, limit); + options, readOptions, lookupOptions, physicalRowDataType, dialectName, limit); } @Override diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunction.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunction.java deleted file mode 100644 index 601490a16ae8a..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunction.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.table; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; -import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; -import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; -import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl; -import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; -import org.apache.flink.connector.jdbc.utils.JdbcUtils; -import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.types.Row; - -import org.apache.flink.shaded.guava30.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getFieldFromResultSet; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A {@link TableFunction} to query fields from JDBC by keys. The query template like: - * - *
- * SELECT c, d, e, f from T where a = ? and b = ?
- * 
- * - *

Support cache the result to avoid frequent accessing to remote databases. 1.The cacheMaxSize - * is -1 means not use cache. 2.For real-time data, you need to set the TTL of cache. - */ -public class JdbcLookupFunction extends TableFunction { - - private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupFunction.class); - private static final long serialVersionUID = 2L; - - private final String query; - private final JdbcConnectionProvider connectionProvider; - private final TypeInformation[] keyTypes; - private final int[] keySqlTypes; - private final String[] fieldNames; - private final String[] keyNames; - private final TypeInformation[] fieldTypes; - private final int[] outputSqlTypes; - private final long cacheMaxSize; - private final long cacheExpireMs; - private final int maxRetryTimes; - - private transient PreparedStatement statement; - private transient Cache> cache; - - public JdbcLookupFunction( - JdbcConnectorOptions options, - JdbcLookupOptions lookupOptions, - String[] fieldNames, - TypeInformation[] fieldTypes, - String[] keyNames) { - this.connectionProvider = new SimpleJdbcConnectionProvider(options); - this.fieldNames = fieldNames; - this.fieldTypes = fieldTypes; - this.keyNames = keyNames; - List nameList = Arrays.asList(fieldNames); - this.keyTypes = - Arrays.stream(keyNames) - .map( - s -> { - checkArgument( - nameList.contains(s), - "keyName %s can't find in fieldNames %s.", - s, - nameList); - return fieldTypes[nameList.indexOf(s)]; - }) - .toArray(TypeInformation[]::new); - this.cacheMaxSize = lookupOptions.getCacheMaxSize(); - this.cacheExpireMs = lookupOptions.getCacheExpireMs(); - this.maxRetryTimes = lookupOptions.getMaxRetryTimes(); - this.keySqlTypes = - Arrays.stream(keyTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray(); - this.outputSqlTypes = - Arrays.stream(fieldTypes) - .mapToInt(JdbcTypeUtil::typeInformationToSqlType) - .toArray(); - this.query = - FieldNamedPreparedStatementImpl.parseNamedStatement( - options.getDialect() - .getSelectFromStatement( - options.getTableName(), fieldNames, keyNames), - new HashMap<>()); - } - - public static Builder builder() { - return new Builder(); - } - - @Override - public void open(FunctionContext context) throws Exception { - try { - establishConnectionAndStatement(); - this.cache = - cacheMaxSize == -1 || cacheExpireMs == -1 - ? null - : CacheBuilder.newBuilder() - .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS) - .maximumSize(cacheMaxSize) - .build(); - } catch (SQLException sqe) { - throw new IllegalArgumentException("open() failed.", sqe); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("JDBC driver class not found.", cnfe); - } - } - - public void eval(Object... keys) { - Row keyRow = Row.of(keys); - if (cache != null) { - List cachedRows = cache.getIfPresent(keyRow); - if (cachedRows != null) { - for (Row cachedRow : cachedRows) { - collect(cachedRow); - } - return; - } - } - - for (int retry = 0; retry <= maxRetryTimes; retry++) { - try { - statement.clearParameters(); - for (int i = 0; i < keys.length; i++) { - JdbcUtils.setField(statement, keySqlTypes[i], keys[i], i); - } - try (ResultSet resultSet = statement.executeQuery()) { - if (cache == null) { - while (resultSet.next()) { - collect(convertToRowFromResultSet(resultSet)); - } - } else { - ArrayList rows = new ArrayList<>(); - while (resultSet.next()) { - Row row = convertToRowFromResultSet(resultSet); - rows.add(row); - collect(row); - } - rows.trimToSize(); - cache.put(keyRow, rows); - } - } - break; - } catch (SQLException e) { - LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e); - if (retry >= maxRetryTimes) { - throw new RuntimeException("Execution of JDBC statement failed.", e); - } - - try { - if (!connectionProvider.isConnectionValid()) { - statement.close(); - connectionProvider.closeConnection(); - establishConnectionAndStatement(); - } - } catch (SQLException | ClassNotFoundException excpetion) { - LOG.error( - "JDBC connection is not valid, and reestablish connection failed", - excpetion); - throw new RuntimeException("Reestablish JDBC connection failed", excpetion); - } - - try { - Thread.sleep(1000 * retry); - } catch (InterruptedException e1) { - throw new RuntimeException(e1); - } - } - } - } - - private Row convertToRowFromResultSet(ResultSet resultSet) throws SQLException { - Row row = new Row(outputSqlTypes.length); - for (int i = 0; i < outputSqlTypes.length; i++) { - row.setField(i, getFieldFromResultSet(i, outputSqlTypes[i], resultSet)); - } - return row; - } - - private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException { - Connection dbConn = connectionProvider.getOrEstablishConnection(); - statement = dbConn.prepareStatement(query); - } - - @Override - public void close() throws IOException { - if (cache != null) { - cache.cleanUp(); - cache = null; - } - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - LOG.info("JDBC statement could not be closed: " + e.getMessage()); - } finally { - statement = null; - } - } - - connectionProvider.closeConnection(); - } - - @VisibleForTesting - public Connection getDbConnection() { - return connectionProvider.getConnection(); - } - - @Override - public TypeInformation getResultType() { - return new RowTypeInfo(fieldTypes, fieldNames); - } - - @Override - public TypeInformation[] getParameterTypes(Class[] signature) { - return keyTypes; - } - - /** Builder for a {@link JdbcLookupFunction}. */ - public static class Builder { - private JdbcConnectorOptions options; - private JdbcLookupOptions lookupOptions; - protected String[] fieldNames; - protected TypeInformation[] fieldTypes; - protected String[] keyNames; - - /** required, jdbc options. */ - public Builder setOptions(JdbcConnectorOptions options) { - this.options = options; - return this; - } - - /** optional, lookup related options. */ - public Builder setLookupOptions(JdbcLookupOptions lookupOptions) { - this.lookupOptions = lookupOptions; - return this; - } - - /** required, field names of this jdbc table. */ - public Builder setFieldNames(String[] fieldNames) { - this.fieldNames = fieldNames; - return this; - } - - /** required, field types of this jdbc table. */ - public Builder setFieldTypes(TypeInformation[] fieldTypes) { - this.fieldTypes = fieldTypes; - return this; - } - - /** required, key names to query this jdbc table. */ - public Builder setKeyNames(String[] keyNames) { - this.keyNames = keyNames; - return this; - } - - /** - * Finalizes the configuration and checks validity. - * - * @return Configured JdbcLookupFunction - */ - public JdbcLookupFunction build() { - checkNotNull(options, "No JdbcOptions supplied."); - if (lookupOptions == null) { - lookupOptions = JdbcLookupOptions.builder().build(); - } - checkNotNull(fieldNames, "No fieldNames supplied."); - checkNotNull(fieldTypes, "No fieldTypes supplied."); - checkNotNull(keyNames, "No keyNames supplied."); - - return new JdbcLookupFunction(options, lookupOptions, fieldNames, fieldTypes, keyNames); - } - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java deleted file mode 100644 index 8eca00633d049..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.table; - -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.connector.jdbc.JdbcInputFormat; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; -import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; -import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl; -import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.functions.AsyncTableFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.sources.LookupableTableSource; -import org.apache.flink.table.sources.ProjectableTableSource; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.table.sources.TableSource; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.utils.TableConnectorUtils; -import org.apache.flink.types.Row; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Objects; - -import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** {@link TableSource} for JDBC. */ -public class JdbcTableSource - implements StreamTableSource, ProjectableTableSource, LookupableTableSource { - - private final JdbcConnectorOptions options; - private final JdbcReadOptions readOptions; - private final JdbcLookupOptions lookupOptions; - private final TableSchema schema; - - // index of fields selected, null means that all fields are selected - private final int[] selectFields; - private final DataType producedDataType; - - private JdbcTableSource( - JdbcConnectorOptions options, - JdbcReadOptions readOptions, - JdbcLookupOptions lookupOptions, - TableSchema schema) { - this(options, readOptions, lookupOptions, schema, null); - } - - private JdbcTableSource( - JdbcConnectorOptions options, - JdbcReadOptions readOptions, - JdbcLookupOptions lookupOptions, - TableSchema schema, - int[] selectFields) { - this.options = options; - this.readOptions = readOptions; - this.lookupOptions = lookupOptions; - this.schema = schema; - - this.selectFields = selectFields; - - final DataType[] schemaDataTypes = schema.getFieldDataTypes(); - final String[] schemaFieldNames = schema.getFieldNames(); - if (selectFields != null) { - DataType[] dataTypes = new DataType[selectFields.length]; - String[] fieldNames = new String[selectFields.length]; - for (int i = 0; i < selectFields.length; i++) { - dataTypes[i] = schemaDataTypes[selectFields[i]]; - fieldNames[i] = schemaFieldNames[selectFields[i]]; - } - this.producedDataType = - TableSchema.builder().fields(fieldNames, dataTypes).build().toRowDataType(); - } else { - this.producedDataType = schema.toRowDataType(); - } - } - - @Override - public boolean isBounded() { - return true; - } - - @Override - public DataStream getDataStream(StreamExecutionEnvironment execEnv) { - return execEnv.createInput( - getInputFormat(), (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType)) - .name(explainSource()); - } - - @Override - public TableFunction getLookupFunction(String[] lookupKeys) { - final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType); - return JdbcLookupFunction.builder() - .setOptions(options) - .setLookupOptions(lookupOptions) - .setFieldTypes(rowTypeInfo.getFieldTypes()) - .setFieldNames(rowTypeInfo.getFieldNames()) - .setKeyNames(lookupKeys) - .build(); - } - - @Override - public DataType getProducedDataType() { - return producedDataType; - } - - @Override - public TableSource projectFields(int[] fields) { - return new JdbcTableSource(options, readOptions, lookupOptions, schema, fields); - } - - @Override - public AsyncTableFunction getAsyncLookupFunction(String[] lookupKeys) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isAsyncEnabled() { - return false; - } - - @Override - public TableSchema getTableSchema() { - return schema; - } - - @Override - public String explainSource() { - final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType); - return TableConnectorUtils.generateRuntimeName(getClass(), rowTypeInfo.getFieldNames()); - } - - public static Builder builder() { - return new Builder(); - } - - private JdbcInputFormat getInputFormat() { - final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType); - JdbcInputFormat.JdbcInputFormatBuilder builder = - JdbcInputFormat.buildJdbcInputFormat() - .setDrivername(options.getDriverName()) - .setDBUrl(options.getDbURL()) - .setRowTypeInfo( - new RowTypeInfo( - rowTypeInfo.getFieldTypes(), rowTypeInfo.getFieldNames())); - options.getUsername().ifPresent(builder::setUsername); - options.getPassword().ifPresent(builder::setPassword); - - if (readOptions.getFetchSize() != 0) { - builder.setFetchSize(readOptions.getFetchSize()); - } - - final JdbcDialect dialect = options.getDialect(); - String query = getBaseQueryStatement(rowTypeInfo); - if (readOptions.getPartitionColumnName().isPresent()) { - long lowerBound = readOptions.getPartitionLowerBound().get(); - long upperBound = readOptions.getPartitionUpperBound().get(); - int numPartitions = readOptions.getNumPartitions().get(); - builder.setParametersProvider( - new JdbcNumericBetweenParametersProvider(lowerBound, upperBound) - .ofBatchNum(numPartitions)); - query += - " WHERE " - + dialect.quoteIdentifier(readOptions.getPartitionColumnName().get()) - + " BETWEEN ? AND ?"; - } - builder.setQuery(query); - - return builder.finish(); - } - - private String getBaseQueryStatement(RowTypeInfo rowTypeInfo) { - return readOptions - .getQuery() - .orElseGet( - () -> - FieldNamedPreparedStatementImpl.parseNamedStatement( - options.getDialect() - .getSelectFromStatement( - options.getTableName(), - rowTypeInfo.getFieldNames(), - new String[0]), - new HashMap<>())); - } - - @Override - public boolean equals(Object o) { - if (o instanceof JdbcTableSource) { - JdbcTableSource source = (JdbcTableSource) o; - return Objects.equals(options, source.options) - && Objects.equals(readOptions, source.readOptions) - && Objects.equals(lookupOptions, source.lookupOptions) - && Objects.equals(schema, source.schema) - && Arrays.equals(selectFields, source.selectFields); - } else { - return false; - } - } - - /** Builder for a {@link JdbcTableSource}. */ - public static class Builder { - - private JdbcConnectorOptions options; - private JdbcReadOptions readOptions; - private JdbcLookupOptions lookupOptions; - protected TableSchema schema; - - /** required, jdbc options. */ - public Builder setOptions(JdbcConnectorOptions options) { - this.options = options; - return this; - } - - /** - * optional, scan related options. {@link JdbcReadOptions} will be only used for {@link - * StreamTableSource}. - */ - public Builder setReadOptions(JdbcReadOptions readOptions) { - this.readOptions = readOptions; - return this; - } - - /** - * optional, lookup related options. {@link JdbcLookupOptions} only be used for {@link - * LookupableTableSource}. - */ - public Builder setLookupOptions(JdbcLookupOptions lookupOptions) { - this.lookupOptions = lookupOptions; - return this; - } - - /** required, table schema of this table source. */ - public Builder setSchema(TableSchema schema) { - this.schema = JdbcTypeUtil.normalizeTableSchema(schema); - return this; - } - - /** - * Finalizes the configuration and checks validity. - * - * @return Configured JdbcTableSource - */ - public JdbcTableSource build() { - checkNotNull(options, "No options supplied."); - checkNotNull(schema, "No schema supplied."); - if (readOptions == null) { - readOptions = JdbcReadOptions.builder().build(); - } - if (lookupOptions == null) { - lookupOptions = JdbcLookupOptions.builder().build(); - } - return new JdbcTableSource(options, readOptions, lookupOptions, schema); - } - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java deleted file mode 100644 index 6c93479462f01..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.table; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; -import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.JdbcValidator; -import org.apache.flink.table.descriptors.SchemaValidator; -import org.apache.flink.table.factories.StreamTableSinkFactory; -import org.apache.flink.table.factories.StreamTableSourceFactory; -import org.apache.flink.table.sinks.StreamTableSink; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.table.utils.TableSchemaUtils; -import org.apache.flink.types.Row; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_DRIVER; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_LOOKUP_CACHE_MAX_ROWS; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_LOOKUP_CACHE_TTL; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_LOOKUP_MAX_RETRIES; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_PASSWORD; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_FETCH_SIZE; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_COLUMN; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_NUM; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_QUERY; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TABLE; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TYPE_VALUE_JDBC; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_URL; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_USERNAME; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_WRITE_FLUSH_INTERVAL; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_WRITE_FLUSH_MAX_ROWS; -import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_WRITE_MAX_RETRIES; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; - -/** - * Factory for creating configured instances of {@link JdbcTableSource} and {@link - * JdbcUpsertTableSink}. - */ -public class JdbcTableSourceSinkFactory - implements StreamTableSourceFactory, StreamTableSinkFactory> { - - @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_JDBC); // jdbc - context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility - return context; - } - - @Override - public List supportedProperties() { - List properties = new ArrayList<>(); - - // common options - properties.add(CONNECTOR_DRIVER); - properties.add(CONNECTOR_URL); - properties.add(CONNECTOR_TABLE); - properties.add(CONNECTOR_USERNAME); - properties.add(CONNECTOR_PASSWORD); - properties.add(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT); - - // scan options - properties.add(CONNECTOR_READ_QUERY); - properties.add(CONNECTOR_READ_PARTITION_COLUMN); - properties.add(CONNECTOR_READ_PARTITION_NUM); - properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND); - properties.add(CONNECTOR_READ_PARTITION_UPPER_BOUND); - properties.add(CONNECTOR_READ_FETCH_SIZE); - - // lookup options - properties.add(CONNECTOR_LOOKUP_CACHE_MAX_ROWS); - properties.add(CONNECTOR_LOOKUP_CACHE_TTL); - properties.add(CONNECTOR_LOOKUP_MAX_RETRIES); - - // sink options - properties.add(CONNECTOR_WRITE_FLUSH_MAX_ROWS); - properties.add(CONNECTOR_WRITE_FLUSH_INTERVAL); - properties.add(CONNECTOR_WRITE_MAX_RETRIES); - - // schema - properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE); - properties.add(SCHEMA + ".#." + SCHEMA_TYPE); - properties.add(SCHEMA + ".#." + SCHEMA_NAME); - // computed column - properties.add(SCHEMA + ".#." + EXPR); - - // watermark - properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME); - properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR); - properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE); - - // table constraint - properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME); - properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS); - - // comment - properties.add(COMMENT); - - return properties; - } - - @Override - public StreamTableSource createStreamTableSource(Map properties) { - DescriptorProperties descriptorProperties = getValidatedProperties(properties); - TableSchema schema = - TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA)); - - return JdbcTableSource.builder() - .setOptions(getJdbcOptions(descriptorProperties)) - .setReadOptions(getJdbcReadOptions(descriptorProperties)) - .setLookupOptions(getJdbcLookupOptions(descriptorProperties)) - .setSchema(schema) - .build(); - } - - @Override - public StreamTableSink> createStreamTableSink( - Map properties) { - DescriptorProperties descriptorProperties = getValidatedProperties(properties); - TableSchema schema = - TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA)); - - final JdbcUpsertTableSink.Builder builder = - JdbcUpsertTableSink.builder() - .setOptions(getJdbcOptions(descriptorProperties)) - .setTableSchema(schema); - - descriptorProperties - .getOptionalInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS) - .ifPresent(builder::setFlushMaxSize); - descriptorProperties - .getOptionalDuration(CONNECTOR_WRITE_FLUSH_INTERVAL) - .ifPresent(s -> builder.setFlushIntervalMills(s.toMillis())); - descriptorProperties - .getOptionalInt(CONNECTOR_WRITE_MAX_RETRIES) - .ifPresent(builder::setMaxRetryTimes); - - return builder.build(); - } - - private DescriptorProperties getValidatedProperties(Map properties) { - final DescriptorProperties descriptorProperties = new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - new SchemaValidator(true, false, false).validate(descriptorProperties); - new JdbcValidator().validate(descriptorProperties); - - return descriptorProperties; - } - - private JdbcConnectorOptions getJdbcOptions(DescriptorProperties descriptorProperties) { - final String url = descriptorProperties.getString(CONNECTOR_URL); - final JdbcConnectorOptions.Builder builder = - JdbcConnectorOptions.builder() - .setDBUrl(url) - .setTableName(descriptorProperties.getString(CONNECTOR_TABLE)) - .setDialect(JdbcDialectLoader.load(url)); - - descriptorProperties - .getOptionalDuration(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT) - .ifPresent(s -> builder.setConnectionCheckTimeoutSeconds((int) s.getSeconds())); - descriptorProperties.getOptionalString(CONNECTOR_DRIVER).ifPresent(builder::setDriverName); - descriptorProperties.getOptionalString(CONNECTOR_USERNAME).ifPresent(builder::setUsername); - descriptorProperties.getOptionalString(CONNECTOR_PASSWORD).ifPresent(builder::setPassword); - - return builder.build(); - } - - private JdbcReadOptions getJdbcReadOptions(DescriptorProperties descriptorProperties) { - final Optional query = descriptorProperties.getOptionalString(CONNECTOR_READ_QUERY); - final Optional partitionColumnName = - descriptorProperties.getOptionalString(CONNECTOR_READ_PARTITION_COLUMN); - final Optional partitionLower = - descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND); - final Optional partitionUpper = - descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_UPPER_BOUND); - final Optional numPartitions = - descriptorProperties.getOptionalInt(CONNECTOR_READ_PARTITION_NUM); - - final JdbcReadOptions.Builder builder = JdbcReadOptions.builder(); - if (query.isPresent()) { - builder.setQuery(query.get()); - } - if (partitionColumnName.isPresent()) { - builder.setPartitionColumnName(partitionColumnName.get()); - builder.setPartitionLowerBound(partitionLower.get()); - builder.setPartitionUpperBound(partitionUpper.get()); - builder.setNumPartitions(numPartitions.get()); - } - descriptorProperties - .getOptionalInt(CONNECTOR_READ_FETCH_SIZE) - .ifPresent(builder::setFetchSize); - - return builder.build(); - } - - private JdbcLookupOptions getJdbcLookupOptions(DescriptorProperties descriptorProperties) { - final JdbcLookupOptions.Builder builder = JdbcLookupOptions.builder(); - - descriptorProperties - .getOptionalLong(CONNECTOR_LOOKUP_CACHE_MAX_ROWS) - .ifPresent(builder::setCacheMaxSize); - descriptorProperties - .getOptionalDuration(CONNECTOR_LOOKUP_CACHE_TTL) - .ifPresent(s -> builder.setCacheExpireMs(s.toMillis())); - descriptorProperties - .getOptionalInt(CONNECTOR_LOOKUP_MAX_RETRIES) - .ifPresent(builder::setMaxRetryTimes); - - return builder.build(); - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcUpsertTableSink.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcUpsertTableSink.java deleted file mode 100644 index e6adf35a4436d..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcUpsertTableSink.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.table; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.connector.jdbc.JdbcExecutionOptions; -import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; -import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; -import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; -import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; -import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.sinks.UpsertStreamTableSink; -import org.apache.flink.table.utils.TableConnectorUtils; -import org.apache.flink.table.utils.TableSchemaUtils; -import org.apache.flink.types.Row; - -import java.util.Arrays; -import java.util.Objects; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** An upsert {@link UpsertStreamTableSink} for JDBC. */ -public class JdbcUpsertTableSink implements UpsertStreamTableSink { - - private final TableSchema schema; - private final JdbcConnectorOptions options; - private final int flushMaxSize; - private final long flushIntervalMills; - private final int maxRetryTime; - - private String[] keyFields; - private boolean isAppendOnly; - - private JdbcUpsertTableSink( - TableSchema schema, - JdbcConnectorOptions options, - int flushMaxSize, - long flushIntervalMills, - int maxRetryTime) { - this.schema = TableSchemaUtils.checkOnlyPhysicalColumns(schema); - this.options = options; - this.flushMaxSize = flushMaxSize; - this.flushIntervalMills = flushIntervalMills; - this.maxRetryTime = maxRetryTime; - } - - private JdbcOutputFormat, Row, JdbcBatchStatementExecutor> - newFormat() { - if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) { - throw new UnsupportedOperationException("JdbcUpsertTableSink can not support "); - } - - // sql types - int[] jdbcSqlTypes = - Arrays.stream(schema.getFieldTypes()) - .mapToInt(JdbcTypeUtil::typeInformationToSqlType) - .toArray(); - - return JdbcOutputFormat.builder() - .setOptions(options) - .setFieldNames(schema.getFieldNames()) - .setFlushMaxSize(flushMaxSize) - .setFlushIntervalMills(flushIntervalMills) - .setMaxRetryTimes(maxRetryTime) - .setFieldTypes(jdbcSqlTypes) - .setKeyFields(keyFields) - .build(); - } - - @Override - public DataStreamSink consumeDataStream(DataStream> dataStream) { - return dataStream - .addSink(new GenericJdbcSinkFunction<>(newFormat())) - .setParallelism(dataStream.getParallelism()) - .name( - TableConnectorUtils.generateRuntimeName( - this.getClass(), schema.getFieldNames())); - } - - @Override - public void setKeyFields(String[] keys) { - this.keyFields = keys; - } - - @Override - public void setIsAppendOnly(Boolean isAppendOnly) { - this.isAppendOnly = isAppendOnly; - } - - @Override - public TypeInformation> getOutputType() { - return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); - } - - @Override - public TypeInformation getRecordType() { - return new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()); - } - - @Override - public String[] getFieldNames() { - return schema.getFieldNames(); - } - - @Override - public TypeInformation[] getFieldTypes() { - return schema.getFieldTypes(); - } - - @Override - public TableSink> configure( - String[] fieldNames, TypeInformation[] fieldTypes) { - if (!Arrays.equals(getFieldNames(), fieldNames) - || !Arrays.equals(getFieldTypes(), fieldTypes)) { - throw new ValidationException( - "Reconfiguration with different fields is not allowed. " - + "Expected: " - + Arrays.toString(getFieldNames()) - + " / " - + Arrays.toString(getFieldTypes()) - + ". " - + "But was: " - + Arrays.toString(fieldNames) - + " / " - + Arrays.toString(fieldTypes)); - } - - JdbcUpsertTableSink copy = - new JdbcUpsertTableSink( - schema, options, flushMaxSize, flushIntervalMills, maxRetryTime); - copy.keyFields = keyFields; - return copy; - } - - public static Builder builder() { - return new Builder(); - } - - @Override - public boolean equals(Object o) { - if (o instanceof JdbcUpsertTableSink) { - JdbcUpsertTableSink sink = (JdbcUpsertTableSink) o; - return Objects.equals(schema, sink.schema) - && Objects.equals(options, sink.options) - && Objects.equals(flushMaxSize, sink.flushMaxSize) - && Objects.equals(flushIntervalMills, sink.flushIntervalMills) - && Objects.equals(maxRetryTime, sink.maxRetryTime) - && Arrays.equals(keyFields, sink.keyFields) - && Objects.equals(isAppendOnly, sink.isAppendOnly); - } else { - return false; - } - } - - /** Builder for a {@link JdbcUpsertTableSink}. */ - public static class Builder { - protected TableSchema schema; - private JdbcConnectorOptions options; - protected int flushMaxSize = JdbcOutputFormat.DEFAULT_FLUSH_MAX_SIZE; - protected long flushIntervalMills = JdbcOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS; - protected int maxRetryTimes = JdbcExecutionOptions.DEFAULT_MAX_RETRY_TIMES; - - /** required, table schema of this table source. */ - public Builder setTableSchema(TableSchema schema) { - this.schema = JdbcTypeUtil.normalizeTableSchema(schema); - return this; - } - - /** required, jdbc options. */ - public Builder setOptions(JdbcConnectorOptions options) { - this.options = options; - return this; - } - - /** - * optional, flush max size (includes all append, upsert and delete records), over this - * number of records, will flush data. - */ - public Builder setFlushMaxSize(int flushMaxSize) { - this.flushMaxSize = flushMaxSize; - return this; - } - - /** optional, flush interval mills, over this time, asynchronous threads will flush data. */ - public Builder setFlushIntervalMills(long flushIntervalMills) { - this.flushIntervalMills = flushIntervalMills; - return this; - } - - /** optional, max retry times for jdbc connector. */ - public Builder setMaxRetryTimes(int maxRetryTimes) { - this.maxRetryTimes = maxRetryTimes; - return this; - } - - public JdbcUpsertTableSink build() { - checkNotNull(schema, "No schema supplied."); - checkNotNull(options, "No options supplied."); - return new JdbcUpsertTableSink( - schema, options, flushMaxSize, flushIntervalMills, maxRetryTimes); - } - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java index c873e684fa021..9c44315823a9a 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtil.java @@ -24,10 +24,6 @@ import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.TypeTransformations; -import org.apache.flink.table.types.utils.DataTypeUtils; import java.sql.Types; import java.util.Collections; @@ -101,33 +97,4 @@ public static int typeInformationToSqlType(TypeInformation type) { throw new IllegalArgumentException("Unsupported type: " + type); } } - - public static String getTypeName(int type) { - return SQL_TYPE_NAMES.get(type); - } - - public static String getTypeName(TypeInformation type) { - return SQL_TYPE_NAMES.get(typeInformationToSqlType(type)); - } - - /** - * The original table schema may contain generated columns which shouldn't be produced/consumed - * by TableSource/TableSink. And the original TIMESTAMP/DATE/TIME types uses - * LocalDateTime/LocalDate/LocalTime as the conversion classes, however, JDBC connector uses - * Timestamp/Date/Time classes. So that we bridge them to the expected conversion classes. - */ - public static TableSchema normalizeTableSchema(TableSchema schema) { - TableSchema.Builder physicalSchemaBuilder = TableSchema.builder(); - schema.getTableColumns() - .forEach( - c -> { - if (c.isPhysical()) { - final DataType type = - DataTypeUtils.transform( - c.getType(), TypeTransformations.timeToSqlTypes()); - physicalSchemaBuilder.field(c.getName(), type); - } - }); - return physicalSchemaBuilder.build(); - } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java index 43658288e7ef2..501a7a631374a 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/JdbcUtils.java @@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory; import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; /** Utils for jdbc connectors. */ @@ -162,91 +161,6 @@ public static void setField(PreparedStatement upload, int type, Object field, in } } - public static Object getFieldFromResultSet(int index, int type, ResultSet set) - throws SQLException { - Object ret; - switch (type) { - case java.sql.Types.NULL: - ret = null; - break; - case java.sql.Types.BOOLEAN: - case java.sql.Types.BIT: - ret = set.getBoolean(index + 1); - break; - case java.sql.Types.CHAR: - case java.sql.Types.NCHAR: - case java.sql.Types.VARCHAR: - case java.sql.Types.LONGVARCHAR: - case java.sql.Types.LONGNVARCHAR: - ret = set.getString(index + 1); - break; - case java.sql.Types.TINYINT: - ret = set.getByte(index + 1); - break; - case java.sql.Types.SMALLINT: - ret = set.getShort(index + 1); - break; - case java.sql.Types.INTEGER: - ret = set.getInt(index + 1); - break; - case java.sql.Types.BIGINT: - ret = set.getLong(index + 1); - break; - case java.sql.Types.REAL: - ret = set.getFloat(index + 1); - break; - case java.sql.Types.FLOAT: - case java.sql.Types.DOUBLE: - ret = set.getDouble(index + 1); - break; - case java.sql.Types.DECIMAL: - case java.sql.Types.NUMERIC: - ret = set.getBigDecimal(index + 1); - break; - case java.sql.Types.DATE: - ret = set.getDate(index + 1); - break; - case java.sql.Types.TIME: - ret = set.getTime(index + 1); - break; - case java.sql.Types.TIMESTAMP: - ret = set.getTimestamp(index + 1); - break; - case java.sql.Types.BINARY: - case java.sql.Types.VARBINARY: - case java.sql.Types.LONGVARBINARY: - ret = set.getBytes(index + 1); - break; - default: - ret = set.getObject(index + 1); - LOG.warn( - "Unmanaged sql type ({}) for column {}. Best effort approach to get its value: {}.", - type, - index + 1, - ret); - break; - - // case java.sql.Types.SQLXML - // case java.sql.Types.ARRAY: - // case java.sql.Types.JAVA_OBJECT: - // case java.sql.Types.BLOB: - // case java.sql.Types.CLOB: - // case java.sql.Types.NCLOB: - // case java.sql.Types.DATALINK: - // case java.sql.Types.DISTINCT: - // case java.sql.Types.OTHER: - // case java.sql.Types.REF: - // case java.sql.Types.ROWID: - // case java.sql.Types.STRUC - } - - if (set.wasNull()) { - return null; - } else { - return ret; - } - } - public static Row getPrimaryKey(Row row, int[] pkFields) { Row pkRow = new Row(pkFields.length); for (int i = 0; i < pkFields.length; i++) { diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java deleted file mode 100644 index 2420225ef21fb..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.utils.TableSchemaUtils; -import org.apache.flink.util.Preconditions; - -import java.util.Optional; - -import static org.apache.flink.table.descriptors.Schema.SCHEMA; - -/** The validator for JDBC. */ -@Internal -public class JdbcValidator extends ConnectorDescriptorValidator { - - public static final String CONNECTOR_TYPE_VALUE_JDBC = "jdbc"; - - public static final String CONNECTOR_URL = "connector.url"; - public static final String CONNECTOR_TABLE = "connector.table"; - public static final String CONNECTOR_DRIVER = "connector.driver"; - public static final String CONNECTOR_USERNAME = "connector.username"; - public static final String CONNECTOR_PASSWORD = "connector.password"; - public static final String CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT = - "connector.connection.max-retry-timeout"; - - public static final String CONNECTOR_READ_QUERY = "connector.read.query"; - public static final String CONNECTOR_READ_PARTITION_COLUMN = "connector.read.partition.column"; - public static final String CONNECTOR_READ_PARTITION_LOWER_BOUND = - "connector.read.partition.lower-bound"; - public static final String CONNECTOR_READ_PARTITION_UPPER_BOUND = - "connector.read.partition.upper-bound"; - public static final String CONNECTOR_READ_PARTITION_NUM = "connector.read.partition.num"; - public static final String CONNECTOR_READ_FETCH_SIZE = "connector.read.fetch-size"; - - public static final String CONNECTOR_LOOKUP_CACHE_MAX_ROWS = "connector.lookup.cache.max-rows"; - public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl"; - public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries"; - - public static final String CONNECTOR_WRITE_FLUSH_MAX_ROWS = "connector.write.flush.max-rows"; - public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval"; - public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries"; - - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - validateCommonProperties(properties); - validateReadProperties(properties); - validateLookupProperties(properties); - validateSinkProperties(properties); - } - - private void validateCommonProperties(DescriptorProperties properties) { - properties.validateString(CONNECTOR_URL, false, 1); - properties.validateString(CONNECTOR_TABLE, false, 1); - properties.validateString(CONNECTOR_DRIVER, true); - properties.validateString(CONNECTOR_USERNAME, true); - properties.validateString(CONNECTOR_PASSWORD, true); - properties.validateDuration(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1000); - - final String url = properties.getString(CONNECTOR_URL); - final JdbcDialect dialect = JdbcDialectLoader.load(url); - - TableSchema schema = TableSchemaUtils.getPhysicalSchema(properties.getTableSchema(SCHEMA)); - - DataType dataType = schema.toPhysicalRowDataType(); - if (!(dataType.getLogicalType() instanceof RowType)) { - throw new ValidationException("Logical DataType must be a RowType"); - } - - dialect.validate((RowType) dataType.getLogicalType()); - - Optional password = properties.getOptionalString(CONNECTOR_PASSWORD); - if (password.isPresent()) { - Preconditions.checkArgument( - properties.getOptionalString(CONNECTOR_USERNAME).isPresent(), - "Database username must be provided when database password is provided"); - } - } - - private void validateReadProperties(DescriptorProperties properties) { - properties.validateString(CONNECTOR_READ_QUERY, true); - properties.validateString(CONNECTOR_READ_PARTITION_COLUMN, true); - properties.validateLong(CONNECTOR_READ_PARTITION_LOWER_BOUND, true); - properties.validateLong(CONNECTOR_READ_PARTITION_UPPER_BOUND, true); - properties.validateInt(CONNECTOR_READ_PARTITION_NUM, true); - properties.validateInt(CONNECTOR_READ_FETCH_SIZE, true); - - Optional lowerBound = - properties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND); - Optional upperBound = - properties.getOptionalLong(CONNECTOR_READ_PARTITION_UPPER_BOUND); - if (lowerBound.isPresent() && upperBound.isPresent()) { - Preconditions.checkArgument( - lowerBound.get() <= upperBound.get(), - CONNECTOR_READ_PARTITION_LOWER_BOUND - + " must not be larger than " - + CONNECTOR_READ_PARTITION_UPPER_BOUND); - } - - checkAllOrNone( - properties, - new String[] { - CONNECTOR_READ_PARTITION_COLUMN, - CONNECTOR_READ_PARTITION_LOWER_BOUND, - CONNECTOR_READ_PARTITION_UPPER_BOUND, - CONNECTOR_READ_PARTITION_NUM - }); - } - - private void validateLookupProperties(DescriptorProperties properties) { - properties.validateLong(CONNECTOR_LOOKUP_CACHE_MAX_ROWS, true); - properties.validateDuration(CONNECTOR_LOOKUP_CACHE_TTL, true, 1); - properties.validateInt(CONNECTOR_LOOKUP_MAX_RETRIES, true, 0); - - checkAllOrNone( - properties, - new String[] {CONNECTOR_LOOKUP_CACHE_MAX_ROWS, CONNECTOR_LOOKUP_CACHE_TTL}); - } - - private void validateSinkProperties(DescriptorProperties properties) { - properties.validateInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS, true); - properties.validateDuration(CONNECTOR_WRITE_FLUSH_INTERVAL, true, 1); - properties.validateInt(CONNECTOR_WRITE_MAX_RETRIES, true); - } - - private void checkAllOrNone(DescriptorProperties properties, String[] propertyNames) { - int presentCount = 0; - for (String name : propertyNames) { - if (properties.getOptionalString(name).isPresent()) { - presentCount++; - } - } - Preconditions.checkArgument( - presentCount == 0 || presentCount == propertyNames.length, - "Either all or none of the following properties should be provided:\n" - + String.join("\n", propertyNames)); - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory deleted file mode 100644 index 6892b6d738e7f..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java index 6ea3d8c5ba00f..34e75e084d771 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.junit.Assert; @@ -40,11 +40,11 @@ public class JdbcDataTypeTest { "CREATE TABLE T(\n" + "f0 %s\n" + ") WITH (\n" - + " 'connector.type'='jdbc',\n" - + " 'connector.url'='" + + " 'connector'='jdbc',\n" + + " 'url'='" + "jdbc:%s:memory:test" + "',\n" - + " 'connector.table'='myTable'\n" + + " 'table-name'='myTable'\n" + ")"; @Parameterized.Parameters(name = "{index}: {0}") @@ -101,50 +101,46 @@ public static List testData() { createTestItem("postgresql", "ARRAY"), // Unsupported types throws errors. - createTestItem( - "derby", "BINARY", "The Derby dialect doesn't support type: BINARY(1)."), createTestItem( "derby", - "VARBINARY(10)", - "The Derby dialect doesn't support type: VARBINARY(10)."), + "BINARY", + "Unsupported conversion from data type 'BINARY(1)' (conversion class: [B) to type information. Only data types that originated from type information fully support a reverse conversion."), createTestItem( "derby", - "TIMESTAMP_LTZ(3)", - "The Derby dialect doesn't support type: TIMESTAMP_LTZ(3)."), + "VARBINARY(10)", + "Unsupported conversion from data type 'VARBINARY(10)' (conversion class: [B) to type information. Only data types that originated from type information fully support a reverse conversion."), + createTestItem("derby", "TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)"), createTestItem( "derby", "DECIMAL(38, 18)", "The precision of field 'f0' is out of the DECIMAL precision range [1, 31] supported by Derby dialect."), createTestItem( - "mysql", "BINARY", "The MySQL dialect doesn't support type: BINARY(1)."), + "mysql", + "BINARY", + "Unsupported conversion from data type 'BINARY(1)' (conversion class: [B) to type information. Only data types that originated from type information fully support a reverse conversion."), createTestItem( "mysql", "VARBINARY(10)", - "The MySQL dialect doesn't support type: VARBINARY(10)."), + "Unsupported conversion from data type 'VARBINARY(10)' (conversion class: [B) to type information. Only data types that originated from type information fully support a reverse conversion."), createTestItem( "mysql", "TIMESTAMP(9) WITHOUT TIME ZONE", "The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by MySQL dialect."), - createTestItem( - "mysql", - "TIMESTAMP_LTZ(3)", - "The MySQL dialect doesn't support type: TIMESTAMP_LTZ(3)."), + createTestItem("mysql", "TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)"), createTestItem( "postgresql", "BINARY", - "The PostgreSQL dialect doesn't support type: BINARY(1)."), + "Unsupported conversion from data type 'BINARY(1)' (conversion class: [B) to type information. Only data types that originated from type information fully support a reverse conversion."), createTestItem( "postgresql", "VARBINARY(10)", - "The PostgreSQL dialect doesn't support type: VARBINARY(10)."), + "Unsupported conversion from data type 'VARBINARY(10)' (conversion class: [B) to type information. Only data types that originated from type information fully support a reverse conversion."), createTestItem( "postgresql", "TIMESTAMP(9) WITHOUT TIME ZONE", "The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by PostgreSQL dialect."), createTestItem( - "postgresql", - "TIMESTAMP_LTZ(3)", - "The PostgreSQL dialect doesn't support type: TIMESTAMP_LTZ(3).")); + "postgresql", "TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)")); } private static TestItem createTestItem(Object... args) { @@ -171,8 +167,10 @@ public void testDataTypeValidate() { try { tEnv.sqlQuery("SELECT * FROM T"); } catch (Exception ex) { - Assert.assertTrue(ex.getCause() instanceof ValidationException); - Assert.assertEquals(testItem.expectError, ex.getCause().getMessage()); + Assert.assertTrue( + ex instanceof TableException + || ex instanceof UnsupportedOperationException); + Assert.assertEquals(testItem.expectError, ex.getMessage()); } } else { tEnv.sqlQuery("SELECT * FROM T"); diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcLookupFunctionTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcLookupFunctionTest.java deleted file mode 100644 index 4f7afbf06203f..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcLookupFunctionTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; -import org.apache.flink.connector.jdbc.table.JdbcLookupFunction; -import org.apache.flink.connector.jdbc.table.JdbcLookupTestBase; -import org.apache.flink.types.Row; -import org.apache.flink.util.Collector; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import static org.junit.Assert.assertEquals; - -/** Test suite for {@link JdbcLookupFunction}. */ -public class JdbcLookupFunctionTest extends JdbcLookupTestBase { - - public static final String DB_URL = "jdbc:derby:memory:lookup"; - public static final String LOOKUP_TABLE = "lookup_table"; - public static final String DB_DRIVER = "org.apache.derby.jdbc.EmbeddedDriver"; - - private static String[] fieldNames = new String[] {"id1", "id2", "comment1", "comment2"}; - private static TypeInformation[] fieldTypes = - new TypeInformation[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO - }; - - private static String[] lookupKeys = new String[] {"id1", "id2"}; - - @Test - public void testEval() throws Exception { - - JdbcLookupFunction lookupFunction = buildLookupFunction(); - ListOutputCollector collector = new ListOutputCollector(); - lookupFunction.setCollector(collector); - - lookupFunction.open(null); - - lookupFunction.eval(1, "1"); - - // close connection - lookupFunction.getDbConnection().close(); - - lookupFunction.eval(2, "3"); - - List result = - new ArrayList<>(collector.getOutputs()) - .stream().map(Row::toString).sorted().collect(Collectors.toList()); - - List expected = new ArrayList<>(); - expected.add("+I[1, 1, 11-c1-v1, 11-c2-v1]"); - expected.add("+I[1, 1, 11-c1-v2, 11-c2-v2]"); - expected.add("+I[2, 3, null, 23-c2]"); - Collections.sort(expected); - - assertEquals(expected, result); - } - - private JdbcLookupFunction buildLookupFunction() { - JdbcConnectorOptions jdbcOptions = - JdbcConnectorOptions.builder() - .setDriverName(DB_DRIVER) - .setDBUrl(DB_URL) - .setTableName(LOOKUP_TABLE) - .build(); - - JdbcLookupOptions lookupOptions = JdbcLookupOptions.builder().build(); - - JdbcLookupFunction lookupFunction = - JdbcLookupFunction.builder() - .setOptions(jdbcOptions) - .setLookupOptions(lookupOptions) - .setFieldTypes(fieldTypes) - .setFieldNames(fieldNames) - .setKeyNames(lookupKeys) - .build(); - - return lookupFunction; - } - - private static final class ListOutputCollector implements Collector { - - private final List output = new ArrayList<>(); - - @Override - public void collect(Row row) { - this.output.add(row); - } - - @Override - public void close() {} - - public List getOutputs() { - return output; - } - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java index 10abf5c9f5af1..226eabb2a35f7 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.catalog; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -124,29 +124,29 @@ public void testGetTables_TableNotExistException_NoDb() throws TableNotExistExce public void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException { // test postgres.public.user1 - TableSchema schema = getSimpleTable().schema; + Schema schema = getSimpleTable().schema; CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1)); - assertEquals(schema, table.getSchema()); + assertEquals(schema, table.getUnresolvedSchema()); table = catalog.getTable(new ObjectPath("postgres", "public.t1")); - assertEquals(schema, table.getSchema()); + assertEquals(schema, table.getUnresolvedSchema()); // test testdb.public.user2 table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2)); - assertEquals(schema, table.getSchema()); + assertEquals(schema, table.getUnresolvedSchema()); table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2")); - assertEquals(schema, table.getSchema()); + assertEquals(schema, table.getUnresolvedSchema()); // test testdb.testschema.user2 table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3")); - assertEquals(schema, table.getSchema()); + assertEquals(schema, table.getUnresolvedSchema()); } @Test @@ -155,7 +155,7 @@ public void testPrimitiveDataTypes() throws TableNotExistException { catalog.getTable( new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE)); - assertEquals(getPrimitiveTable().schema, table.getSchema()); + assertEquals(getPrimitiveTable().schema, table.getUnresolvedSchema()); } @Test @@ -164,7 +164,7 @@ public void tesArrayDataTypes() throws TableNotExistException { catalog.getTable( new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE)); - assertEquals(getArrayTable().schema, table.getSchema()); + assertEquals(getArrayTable().schema, table.getUnresolvedSchema()); } @Test @@ -173,6 +173,6 @@ public void testSerialDataTypes() throws TableNotExistException { catalog.getTable( new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_SERIAL_TYPE)); - assertEquals(getSerialTable().schema, table.getSchema()); + assertEquals(getSerialTable().schema, table.getUnresolvedSchema()); } } diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java index d063396c44dbc..8e3482a9a0bdf 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.catalog; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.types.logical.DecimalType; import com.opentable.db.postgres.junit.EmbeddedPostgresRules; @@ -166,11 +166,11 @@ public static void executeSQL(String db, String sql) throws SQLException { /** Object holding schema and corresponding sql. */ public static class TestTable { - TableSchema schema; + Schema schema; String pgSchemaSql; String values; - public TestTable(TableSchema schema, String pgSchemaSql, String values) { + public TestTable(Schema schema, String pgSchemaSql, String values) { this.schema = schema; this.pgSchemaSql = pgSchemaSql; this.values = values; @@ -179,7 +179,7 @@ public TestTable(TableSchema schema, String pgSchemaSql, String values) { public static TestTable getSimpleTable() { return new TestTable( - TableSchema.builder().field("id", DataTypes.INT()).build(), "id integer", "1"); + Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1"); } // posgres doesn't support to use the same primary key name across different tables, @@ -196,26 +196,26 @@ public static TestTable getPrimitiveTable() { // reverse conversion. public static TestTable getPrimitiveTable(String primaryKeyName) { return new TestTable( - TableSchema.builder() - .field("int", DataTypes.INT().notNull()) - .field("bytea", DataTypes.BYTES()) - .field("short", DataTypes.SMALLINT().notNull()) - .field("long", DataTypes.BIGINT()) - .field("real", DataTypes.FLOAT()) - .field("double_precision", DataTypes.DOUBLE()) - .field("numeric", DataTypes.DECIMAL(10, 5)) - .field("decimal", DataTypes.DECIMAL(10, 1)) - .field("boolean", DataTypes.BOOLEAN()) - .field("text", DataTypes.STRING()) - .field("char", DataTypes.CHAR(1)) - .field("character", DataTypes.CHAR(3)) - .field("character_varying", DataTypes.VARCHAR(20)) - .field("timestamp", DataTypes.TIMESTAMP(5)) + Schema.newBuilder() + .column("int", DataTypes.INT().notNull()) + .column("bytea", DataTypes.BYTES()) + .column("short", DataTypes.SMALLINT().notNull()) + .column("long", DataTypes.BIGINT()) + .column("real", DataTypes.FLOAT()) + .column("double_precision", DataTypes.DOUBLE()) + .column("numeric", DataTypes.DECIMAL(10, 5)) + .column("decimal", DataTypes.DECIMAL(10, 1)) + .column("boolean", DataTypes.BOOLEAN()) + .column("text", DataTypes.STRING()) + .column("char", DataTypes.CHAR(1)) + .column("character", DataTypes.CHAR(3)) + .column("character_varying", DataTypes.VARCHAR(20)) + .column("timestamp", DataTypes.TIMESTAMP(5)) // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) - .field("date", DataTypes.DATE()) - .field("time", DataTypes.TIME(0)) - .field("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) - .primaryKey(primaryKeyName, new String[] {"short", "int"}) + .column("date", DataTypes.DATE()) + .column("time", DataTypes.TIME(0)) + .column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) + .primaryKeyNamed(primaryKeyName, "short", "int") .build(), "int integer, " + "bytea bytea, " @@ -263,30 +263,30 @@ public static TestTable getPrimitiveTable(String primaryKeyName) { // TODO: add back timestamptz once planner supports timestamp with timezone public static TestTable getArrayTable() { return new TestTable( - TableSchema.builder() - .field("int_arr", DataTypes.ARRAY(DataTypes.INT())) - .field("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) - .field("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT())) - .field("long_arr", DataTypes.ARRAY(DataTypes.BIGINT())) - .field("real_arr", DataTypes.ARRAY(DataTypes.FLOAT())) - .field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE())) - .field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5))) - .field( + Schema.newBuilder() + .column("int_arr", DataTypes.ARRAY(DataTypes.INT())) + .column("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT())) + .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT())) + .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE())) + .column("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5))) + .column( "numeric_arr_default", DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))) - .field("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2))) - .field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN())) - .field("text_arr", DataTypes.ARRAY(DataTypes.STRING())) - .field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1))) - .field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) - .field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) - .field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) + .column("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2))) + .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN())) + .column("text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1))) + .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) + .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) + .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) // .field("timestamptz_arr", // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))) - .field("date_arr", DataTypes.ARRAY(DataTypes.DATE())) - .field("time_arr", DataTypes.ARRAY(DataTypes.TIME(0))) - .field("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) - .field("null_text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .column("date_arr", DataTypes.ARRAY(DataTypes.DATE())) + .column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0))) + .column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING())) .build(), "int_arr integer[], " + "bytea_arr bytea[], " @@ -335,14 +335,14 @@ public static TestTable getArrayTable() { public static TestTable getSerialTable() { return new TestTable( - TableSchema.builder() + Schema.newBuilder() // serial fields are returned as not null by ResultSetMetaData.columnNoNulls - .field("f0", DataTypes.SMALLINT().notNull()) - .field("f1", DataTypes.INT().notNull()) - .field("f2", DataTypes.SMALLINT().notNull()) - .field("f3", DataTypes.INT().notNull()) - .field("f4", DataTypes.BIGINT().notNull()) - .field("f5", DataTypes.BIGINT().notNull()) + .column("f0", DataTypes.SMALLINT().notNull()) + .column("f1", DataTypes.INT().notNull()) + .column("f2", DataTypes.SMALLINT().notNull()) + .column("f3", DataTypes.INT().notNull()) + .column("f4", DataTypes.BIGINT().notNull()) + .column("f5", DataTypes.BIGINT().notNull()) .build(), "f0 smallserial, " + "f1 serial, " diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java index 68f44c334c5aa..86d51f6897a9e 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java @@ -24,7 +24,6 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; @@ -46,8 +45,8 @@ import static org.junit.Assert.fail; /** - * Test for {@link JdbcTableSource} and {@link JdbcUpsertTableSink} created by {@link - * JdbcTableSourceSinkFactory}. + * Test for {@link JdbcDynamicTableSource} and {@link JdbcDynamicTableSink} created by {@link + * JdbcDynamicTableFactory}. */ public class JdbcDynamicTableFactoryTest { @@ -92,7 +91,7 @@ public void testJdbcCommonProperties() { options, JdbcReadOptions.builder().build(), lookupOptions, - TableSchema.fromResolvedSchema(SCHEMA)); + SCHEMA.toPhysicalRowDataType()); assertEquals(expectedSource, actualSource); // validation for sink @@ -113,10 +112,7 @@ public void testJdbcCommonProperties() { .build(); JdbcDynamicTableSink expectedSink = new JdbcDynamicTableSink( - options, - executionOptions, - dmlOptions, - TableSchema.fromResolvedSchema(SCHEMA)); + options, executionOptions, dmlOptions, SCHEMA.toPhysicalRowDataType()); assertEquals(expectedSink, actualSink); } @@ -154,10 +150,7 @@ public void testJdbcReadProperties() { .build(); JdbcDynamicTableSource expected = new JdbcDynamicTableSource( - options, - readOptions, - lookupOptions, - TableSchema.fromResolvedSchema(SCHEMA)); + options, readOptions, lookupOptions, SCHEMA.toPhysicalRowDataType()); assertEquals(expected, actual); } @@ -187,7 +180,7 @@ public void testJdbcLookupProperties() { options, JdbcReadOptions.builder().build(), lookupOptions, - TableSchema.fromResolvedSchema(SCHEMA)); + SCHEMA.toPhysicalRowDataType()); assertEquals(expected, actual); } @@ -222,10 +215,7 @@ public void testJdbcSinkProperties() { JdbcDynamicTableSink expected = new JdbcDynamicTableSink( - options, - executionOptions, - dmlOptions, - TableSchema.fromResolvedSchema(SCHEMA)); + options, executionOptions, dmlOptions, SCHEMA.toPhysicalRowDataType()); assertEquals(expected, actual); } @@ -259,10 +249,7 @@ public void testJDBCSinkWithParallelism() { JdbcDynamicTableSink expected = new JdbcDynamicTableSink( - options, - executionOptions, - dmlOptions, - TableSchema.fromResolvedSchema(SCHEMA)); + options, executionOptions, dmlOptions, SCHEMA.toPhysicalRowDataType()); assertEquals(expected, actual); } diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java deleted file mode 100644 index e85dc26a88c71..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.table; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.types.DataType; -import org.apache.flink.types.Row; -import org.apache.flink.util.CollectionUtil; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.flink.table.api.Expressions.$; -import static org.junit.Assert.assertEquals; - -/** IT case for lookup source of JDBC connector. */ -@RunWith(Parameterized.class) -public class JdbcLookupTableITCase extends JdbcLookupTestBase { - - private final String tableFactory; - private final boolean useCache; - - public JdbcLookupTableITCase(String tableFactory, boolean useCache) { - this.useCache = useCache; - this.tableFactory = tableFactory; - } - - @Parameterized.Parameters(name = "Table factory = {0}, use cache {1}") - @SuppressWarnings("unchecked,rawtypes") - public static Collection useCache() { - return Arrays.asList( - new Object[][] { - {"legacyFactory", true}, - {"legacyFactory", false}, - {"dynamicFactory", true}, - {"dynamicFactory", false} - }); - } - - @Test - public void testLookup() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - Iterator collected; - if ("legacyFactory".equals(tableFactory)) { - collected = useLegacyTableFactory(env, tEnv); - } else { - collected = useDynamicTableFactory(env, tEnv); - } - List result = - CollectionUtil.iteratorToList(collected).stream() - .map(Row::toString) - .sorted() - .collect(Collectors.toList()); - - List expected = new ArrayList<>(); - expected.add("+I[1, 1, 11-c1-v1, 11-c2-v1]"); - expected.add("+I[1, 1, 11-c1-v1, 11-c2-v1]"); - expected.add("+I[1, 1, 11-c1-v2, 11-c2-v2]"); - expected.add("+I[1, 1, 11-c1-v2, 11-c2-v2]"); - expected.add("+I[2, 3, null, 23-c2]"); - expected.add("+I[2, 5, 25-c1, 25-c2]"); - expected.add("+I[3, 8, 38-c1, 38-c2]"); - Collections.sort(expected); - - assertEquals(expected, result); - } - - private Iterator useLegacyTableFactory( - StreamExecutionEnvironment env, StreamTableEnvironment tEnv) { - Table t = - tEnv.fromDataStream( - env.fromCollection( - Arrays.asList( - new Tuple2<>(1, "1"), - new Tuple2<>(1, "1"), - new Tuple2<>(2, "3"), - new Tuple2<>(2, "5"), - new Tuple2<>(3, "5"), - new Tuple2<>(3, "8"))), - $("id1"), - $("id2")); - - tEnv.registerTable("T", t); - JdbcTableSource.Builder builder = - JdbcTableSource.builder() - .setOptions( - JdbcConnectorOptions.builder() - .setDBUrl(DB_URL) - .setTableName(LOOKUP_TABLE) - .build()) - .setSchema( - TableSchema.builder() - .fields( - new String[] {"id1", "comment1", "comment2", "id2"}, - new DataType[] { - DataTypes.INT(), - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.STRING() - }) - .build()); - JdbcLookupOptions.Builder lookupOptionsBuilder = - JdbcLookupOptions.builder().setMaxRetryTimes(0); - if (useCache) { - lookupOptionsBuilder.setCacheMaxSize(1000).setCacheExpireMs(1000 * 1000); - } - builder.setLookupOptions(lookupOptionsBuilder.build()); - tEnv.registerFunction( - "jdbcLookup", builder.build().getLookupFunction(t.getSchema().getFieldNames())); - - // do not use the first N fields as lookup keys for better coverage - String sqlQuery = - "SELECT id1, id2, comment1, comment2 FROM T, " - + "LATERAL TABLE(jdbcLookup(id1, id2)) AS S(l_id1, comment1, comment2, l_id2)"; - return tEnv.executeSql(sqlQuery).collect(); - } - - private Iterator useDynamicTableFactory( - StreamExecutionEnvironment env, StreamTableEnvironment tEnv) { - Table t = - tEnv.fromDataStream( - env.fromCollection( - Arrays.asList( - new Tuple2<>(1, "1"), - new Tuple2<>(1, "1"), - new Tuple2<>(2, "3"), - new Tuple2<>(2, "5"), - new Tuple2<>(3, "5"), - new Tuple2<>(3, "8"))), - $("id1"), - $("id2"), - $("proctime").proctime()); - - tEnv.createTemporaryView("T", t); - - String cacheConfig = ", 'lookup.cache.max-rows'='4', 'lookup.cache.ttl'='10000'"; - tEnv.executeSql( - String.format( - "create table lookup (" - + " id1 INT," - + " comment1 VARCHAR," - + " comment2 VARCHAR," - + " id2 VARCHAR" - + ") with(" - + " 'connector'='jdbc'," - + " 'url'='" - + DB_URL - + "'," - + " 'table-name'='" - + LOOKUP_TABLE - + "'," - + " 'lookup.max-retries' = '0'" - + " %s)", - useCache ? cacheConfig : "")); - - // do not use the first N fields as lookup keys for better coverage - String sqlQuery = - "SELECT source.id1, source.id2, L.comment1, L.comment2 FROM T AS source " - + "JOIN lookup for system_time as of source.proctime AS L " - + "ON source.id1 = L.id1 and source.id2 = L.id2"; - return tEnv.executeSql(sqlQuery).collect(); - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java deleted file mode 100644 index 5666ed9eb0cda..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.table; - -import org.apache.flink.connector.jdbc.JdbcTestBase; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.types.Row; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Iterator; -import java.util.List; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; - -/** ITCase for {@link JdbcTableSource}. */ -public class JdbcTableSourceITCase extends AbstractTestBase { - - public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; - public static final String DB_URL = "jdbc:derby:memory:test"; - public static final String INPUT_TABLE = "jdbcSource"; - - @Before - public void before() throws ClassNotFoundException, SQLException { - System.setProperty( - "derby.stream.error.field", JdbcTestBase.class.getCanonicalName() + ".DEV_NULL"); - Class.forName(DRIVER_CLASS); - - try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true"); - Statement statement = conn.createStatement()) { - statement.executeUpdate( - "CREATE TABLE " - + INPUT_TABLE - + " (" - + "id BIGINT NOT NULL," - + "timestamp6_col TIMESTAMP, " - + "timestamp9_col TIMESTAMP, " - + "time_col TIME, " - + "real_col FLOAT(23), " - + // A precision of 23 or less makes FLOAT equivalent to REAL. - "double_col FLOAT(24)," - + // A precision of 24 or greater makes FLOAT equivalent to DOUBLE - // PRECISION. - "decimal_col DECIMAL(10, 4))"); - statement.executeUpdate( - "INSERT INTO " - + INPUT_TABLE - + " VALUES (" - + "1, TIMESTAMP('2020-01-01 15:35:00.123456'), TIMESTAMP('2020-01-01 15:35:00.123456789'), " - + "TIME('15:35:00'), 1.175E-37, 1.79769E+308, 100.1234)"); - statement.executeUpdate( - "INSERT INTO " - + INPUT_TABLE - + " VALUES (" - + "2, TIMESTAMP('2020-01-01 15:36:01.123456'), TIMESTAMP('2020-01-01 15:36:01.123456789'), " - + "TIME('15:36:01'), -1.175E-37, -1.79769E+308, 101.1234)"); - } - } - - @After - public void clearOutputTable() throws Exception { - Class.forName(DRIVER_CLASS); - try (Connection conn = DriverManager.getConnection(DB_URL); - Statement stat = conn.createStatement()) { - stat.executeUpdate("DROP TABLE " + INPUT_TABLE); - } - } - - @Test - public void testJdbcSource() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - tEnv.executeSql( - "CREATE TABLE " - + INPUT_TABLE - + "(" - + "id BIGINT," - + "timestamp6_col TIMESTAMP(6)," - + "timestamp9_col TIMESTAMP(9)," - + "time_col TIME," - + "real_col FLOAT," - + "double_col DOUBLE," - + "decimal_col DECIMAL(10, 4)" - + ") WITH (" - + " 'connector.type'='jdbc'," - + " 'connector.url'='" - + DB_URL - + "'," - + " 'connector.table'='" - + INPUT_TABLE - + "'" - + ")"); - - TableResult tableResult = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE); - - List results = manifestResults(tableResult); - - assertThat( - results, - containsInAnyOrder( - "+I[1, 2020-01-01T15:35:00.123456, 2020-01-01T15:35:00.123456789, 15:35, 1.175E-37, 1.79769E308, 100.1234]", - "+I[2, 2020-01-01T15:36:01.123456, 2020-01-01T15:36:01.123456789, 15:36:01, -1.175E-37, -1.79769E308, 101.1234]")); - } - - @Test - public void testProjectableJdbcSource() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - tEnv.executeSql( - "CREATE TABLE " - + INPUT_TABLE - + "(" - + "id BIGINT," - + "timestamp6_col TIMESTAMP(6)," - + "timestamp9_col TIMESTAMP(9)," - + "time_col TIME," - + "real_col FLOAT," - + "decimal_col DECIMAL(10, 4)" - + ") WITH (" - + " 'connector.type'='jdbc'," - + " 'connector.url'='" - + DB_URL - + "'," - + " 'connector.table'='" - + INPUT_TABLE - + "'" - + ")"); - - TableResult tableResult = - tEnv.executeSql("SELECT timestamp6_col, decimal_col FROM " + INPUT_TABLE); - - List results = manifestResults(tableResult); - - assertThat( - results, - containsInAnyOrder( - "+I[2020-01-01T15:35:00.123456, 100.1234]", - "+I[2020-01-01T15:36:01.123456, 101.1234]")); - } - - @Test - public void testScanQueryJDBCSource() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - final String testQuery = "SELECT id FROM " + INPUT_TABLE; - tEnv.executeSql( - "CREATE TABLE test(" - + "id BIGINT" - + ") WITH (" - + " 'connector.type'='jdbc'," - + " 'connector.url'='" - + DB_URL - + "'," - + " 'connector.table'='whatever'," - + " 'connector.read.query'='" - + testQuery - + "'" - + ")"); - - TableResult tableResult = tEnv.executeSql("SELECT id FROM test"); - - List results = manifestResults(tableResult); - - assertThat(results, containsInAnyOrder("+I[1]", "+I[2]")); - } - - private static List manifestResults(TableResult result) { - Iterator resultIterator = result.collect(); - return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), - false) - .map(Row::toString) - .collect(Collectors.toList()); - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java deleted file mode 100644 index 5cd7e0010e264..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.table; - -import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; -import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.factories.StreamTableSinkFactory; -import org.apache.flink.table.factories.StreamTableSourceFactory; -import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.sinks.StreamTableSink; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.table.sources.TableSource; -import org.apache.flink.table.sources.TableSourceValidation; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; - -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * Test for {@link JdbcTableSource} and {@link JdbcUpsertTableSink} created by {@link - * JdbcTableSourceSinkFactory}. - */ -public class JdbcTableSourceSinkFactoryTest { - - private static final TableSchema schema = - TableSchema.builder() - .field("aaa", DataTypes.INT()) - .field("bbb", DataTypes.STRING()) - .field("ccc", DataTypes.DOUBLE()) - .field("ddd", DataTypes.DECIMAL(31, 18)) - .field("eee", DataTypes.TIMESTAMP(3)) - .build(); - - @Test - public void testJdbcCommonProperties() { - Map properties = getBasicProperties(); - properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver"); - properties.put("connector.username", "user"); - properties.put("connector.password", "pass"); - properties.put("connector.connection.max-retry-timeout", "120s"); - - final StreamTableSource actual = - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - - final JdbcConnectorOptions options = - JdbcConnectorOptions.builder() - .setDBUrl("jdbc:derby:memory:mydb") - .setTableName("mytable") - .setDriverName("org.apache.derby.jdbc.EmbeddedDriver") - .setUsername("user") - .setPassword("pass") - .setConnectionCheckTimeoutSeconds(120) - .build(); - final JdbcTableSource expected = - JdbcTableSource.builder().setOptions(options).setSchema(schema).build(); - - TableSourceValidation.validateTableSource(expected, schema); - TableSourceValidation.validateTableSource(actual, schema); - assertEquals(expected, actual); - } - - @Test - public void testJdbcReadProperties() { - Map properties = getBasicProperties(); - properties.put("connector.read.query", "SELECT aaa FROM mytable"); - properties.put("connector.read.partition.column", "aaa"); - properties.put("connector.read.partition.lower-bound", "-10"); - properties.put("connector.read.partition.upper-bound", "100"); - properties.put("connector.read.partition.num", "10"); - properties.put("connector.read.fetch-size", "20"); - - final StreamTableSource actual = - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - - final JdbcConnectorOptions options = - JdbcConnectorOptions.builder() - .setDBUrl("jdbc:derby:memory:mydb") - .setTableName("mytable") - .build(); - final JdbcReadOptions readOptions = - JdbcReadOptions.builder() - .setQuery("SELECT aaa FROM mytable") - .setPartitionColumnName("aaa") - .setPartitionLowerBound(-10) - .setPartitionUpperBound(100) - .setNumPartitions(10) - .setFetchSize(20) - .build(); - final JdbcTableSource expected = - JdbcTableSource.builder() - .setOptions(options) - .setReadOptions(readOptions) - .setSchema(schema) - .build(); - - assertEquals(expected, actual); - } - - @Test - public void testJdbcLookupProperties() { - Map properties = getBasicProperties(); - properties.put("connector.lookup.cache.max-rows", "1000"); - properties.put("connector.lookup.cache.ttl", "10s"); - properties.put("connector.lookup.max-retries", "10"); - - final StreamTableSource actual = - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - - final JdbcConnectorOptions options = - JdbcConnectorOptions.builder() - .setDBUrl("jdbc:derby:memory:mydb") - .setTableName("mytable") - .build(); - final JdbcLookupOptions lookupOptions = - JdbcLookupOptions.builder() - .setCacheMaxSize(1000) - .setCacheExpireMs(10_000) - .setMaxRetryTimes(10) - .build(); - final JdbcTableSource expected = - JdbcTableSource.builder() - .setOptions(options) - .setLookupOptions(lookupOptions) - .setSchema(schema) - .build(); - - assertEquals(expected, actual); - } - - @Test - public void testJdbcSinkProperties() { - Map properties = getBasicProperties(); - properties.put("connector.write.flush.max-rows", "1000"); - properties.put("connector.write.flush.interval", "2min"); - properties.put("connector.write.max-retries", "5"); - - final StreamTableSink actual = - TableFactoryService.find(StreamTableSinkFactory.class, properties) - .createStreamTableSink(properties); - - final JdbcConnectorOptions options = - JdbcConnectorOptions.builder() - .setDBUrl("jdbc:derby:memory:mydb") - .setTableName("mytable") - .build(); - final JdbcUpsertTableSink expected = - JdbcUpsertTableSink.builder() - .setOptions(options) - .setTableSchema(schema) - .setFlushMaxSize(1000) - .setFlushIntervalMills(120_000) - .setMaxRetryTimes(5) - .build(); - - assertEquals(expected, actual); - } - - @Test - public void testJdbcFieldsProjection() { - Map properties = getBasicProperties(); - properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver"); - properties.put("connector.username", "user"); - properties.put("connector.password", "pass"); - - final TableSource actual = - ((JdbcTableSource) - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties)) - .projectFields(new int[] {0, 2}); - - List projectedFields = actual.getProducedDataType().getChildren(); - assertEquals(Arrays.asList(DataTypes.INT(), DataTypes.DOUBLE()), projectedFields); - - // test jdbc table source description - List fieldNames = - ((RowType) actual.getProducedDataType().getLogicalType()).getFieldNames(); - String expectedSourceDescription = - actual.getClass().getSimpleName() - + "(" - + String.join(", ", fieldNames.stream().toArray(String[]::new)) - + ")"; - assertEquals(expectedSourceDescription, actual.explainSource()); - } - - @Test - public void testJdbcValidation() { - // only password, no username - try { - Map properties = getBasicProperties(); - properties.put("connector.password", "pass"); - - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - fail("exception expected"); - } catch (IllegalArgumentException ignored) { - } - - // read partition properties not complete - try { - Map properties = getBasicProperties(); - properties.put("connector.read.partition.column", "aaa"); - properties.put("connector.read.partition.lower-bound", "-10"); - properties.put("connector.read.partition.upper-bound", "100"); - - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - fail("exception expected"); - } catch (IllegalArgumentException ignored) { - } - - // read partition lower-bound > upper-bound - try { - Map properties = getBasicProperties(); - properties.put("connector.read.partition.column", "aaa"); - properties.put("connector.read.partition.lower-bound", "100"); - properties.put("connector.read.partition.upper-bound", "-10"); - properties.put("connector.read.partition.num", "10"); - - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - fail("exception expected"); - } catch (IllegalArgumentException ignored) { - } - - // lookup cache properties not complete - try { - Map properties = getBasicProperties(); - properties.put("connector.lookup.cache.max-rows", "10"); - - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - fail("exception expected"); - } catch (IllegalArgumentException ignored) { - } - - // lookup cache properties not complete - try { - Map properties = getBasicProperties(); - properties.put("connector.lookup.cache.ttl", "1s"); - - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - fail("exception expected"); - } catch (IllegalArgumentException ignored) { - } - - // lookup max-retries property less than zero - try { - Map properties = getBasicProperties(); - properties.put("connector.lookup.max-retries", "-1"); - - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - fail("exception expected"); - } catch (ValidationException ignored) { - } - - // connection.max-retry-timeout property is smaller than 1 second - try { - Map properties = getBasicProperties(); - properties.put("connector.connection.max-retry-timeout", "100ms"); - - TableFactoryService.find(StreamTableSourceFactory.class, properties) - .createStreamTableSource(properties); - fail("exception expected"); - } catch (ValidationException ignored) { - } - } - - private Map getBasicProperties() { - Map properties = new HashMap<>(); - - properties.put("connector.type", "jdbc"); - properties.put("connector.property-version", "1"); - - properties.put("connector.url", "jdbc:derby:memory:mydb"); - properties.put("connector.table", "mytable"); - - DescriptorProperties descriptorProperties = new DescriptorProperties(); - descriptorProperties.putProperties(properties); - descriptorProperties.putTableSchema("schema", schema); - - return new HashMap<>(descriptorProperties.asMap()); - } -} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcUpsertTableSinkITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcUpsertTableSinkITCase.java deleted file mode 100644 index c68b9d0157d50..0000000000000 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcUpsertTableSinkITCase.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc.table; - -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.connector.jdbc.JdbcTestFixture; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.types.Row; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB; -import static org.apache.flink.connector.jdbc.internal.JdbcTableOutputFormatTest.check; -import static org.apache.flink.table.api.Expressions.$; - -/** IT case for {@link JdbcUpsertTableSink}. */ -public class JdbcUpsertTableSinkITCase extends AbstractTestBase { - - public static final String DB_URL = "jdbc:derby:memory:upsert"; - public static final String OUTPUT_TABLE1 = "upsertSink"; - public static final String OUTPUT_TABLE2 = "appendSink"; - public static final String OUTPUT_TABLE3 = "batchSink"; - - @Before - public void before() throws ClassNotFoundException, SQLException { - System.setProperty( - "derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL"); - - Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass()); - try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true"); - Statement stat = conn.createStatement()) { - stat.executeUpdate( - "CREATE TABLE " - + OUTPUT_TABLE1 - + " (" - + "cnt BIGINT NOT NULL DEFAULT 0," - + "lencnt BIGINT NOT NULL DEFAULT 0," - + "cTag INT NOT NULL DEFAULT 0," - + "ts TIMESTAMP," - + "PRIMARY KEY (cnt, cTag))"); - - stat.executeUpdate( - "CREATE TABLE " - + OUTPUT_TABLE2 - + " (" - + "id INT NOT NULL DEFAULT 0," - + "num BIGINT NOT NULL DEFAULT 0," - + "ts TIMESTAMP)"); - - stat.executeUpdate( - "CREATE TABLE " - + OUTPUT_TABLE3 - + " (" - + "NAME VARCHAR(20) NOT NULL," - + "SCORE BIGINT NOT NULL DEFAULT 0)"); - - stat.executeUpdate("CREATE TABLE REAL_TABLE (real_data REAL)"); - } - } - - @After - public void clearOutputTable() throws Exception { - Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass()); - try (Connection conn = DriverManager.getConnection(DB_URL); - Statement stat = conn.createStatement()) { - stat.execute("DROP TABLE " + OUTPUT_TABLE1); - stat.execute("DROP TABLE " + OUTPUT_TABLE2); - stat.execute("DROP TABLE " + OUTPUT_TABLE3); - stat.execute("DROP TABLE REAL_TABLE"); - } - } - - public static DataStream> get4TupleDataStream( - StreamExecutionEnvironment env) { - List> data = new ArrayList<>(); - data.add(new Tuple4<>(1, 1L, "Hi", Timestamp.valueOf("1970-01-01 00:00:00.001"))); - data.add(new Tuple4<>(2, 2L, "Hello", Timestamp.valueOf("1970-01-01 00:00:00.002"))); - data.add(new Tuple4<>(3, 2L, "Hello world", Timestamp.valueOf("1970-01-01 00:00:00.003"))); - data.add( - new Tuple4<>( - 4, - 3L, - "Hello world, how are you?", - Timestamp.valueOf("1970-01-01 00:00:00.004"))); - data.add(new Tuple4<>(5, 3L, "I am fine.", Timestamp.valueOf("1970-01-01 00:00:00.005"))); - data.add( - new Tuple4<>( - 6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-01 00:00:00.006"))); - data.add(new Tuple4<>(7, 4L, "Comment#1", Timestamp.valueOf("1970-01-01 00:00:00.007"))); - data.add(new Tuple4<>(8, 4L, "Comment#2", Timestamp.valueOf("1970-01-01 00:00:00.008"))); - data.add(new Tuple4<>(9, 4L, "Comment#3", Timestamp.valueOf("1970-01-01 00:00:00.009"))); - data.add(new Tuple4<>(10, 4L, "Comment#4", Timestamp.valueOf("1970-01-01 00:00:00.010"))); - data.add(new Tuple4<>(11, 5L, "Comment#5", Timestamp.valueOf("1970-01-01 00:00:00.011"))); - data.add(new Tuple4<>(12, 5L, "Comment#6", Timestamp.valueOf("1970-01-01 00:00:00.012"))); - data.add(new Tuple4<>(13, 5L, "Comment#7", Timestamp.valueOf("1970-01-01 00:00:00.013"))); - data.add(new Tuple4<>(14, 5L, "Comment#8", Timestamp.valueOf("1970-01-01 00:00:00.014"))); - data.add(new Tuple4<>(15, 5L, "Comment#9", Timestamp.valueOf("1970-01-01 00:00:00.015"))); - data.add(new Tuple4<>(16, 6L, "Comment#10", Timestamp.valueOf("1970-01-01 00:00:00.016"))); - data.add(new Tuple4<>(17, 6L, "Comment#11", Timestamp.valueOf("1970-01-01 00:00:00.017"))); - data.add(new Tuple4<>(18, 6L, "Comment#12", Timestamp.valueOf("1970-01-01 00:00:00.018"))); - data.add(new Tuple4<>(19, 6L, "Comment#13", Timestamp.valueOf("1970-01-01 00:00:00.019"))); - data.add(new Tuple4<>(20, 6L, "Comment#14", Timestamp.valueOf("1970-01-01 00:00:00.020"))); - data.add(new Tuple4<>(21, 6L, "Comment#15", Timestamp.valueOf("1970-01-01 00:00:00.021"))); - - Collections.shuffle(data); - return env.fromCollection(data); - } - - @Test - public void testReal() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - tEnv.executeSql( - "CREATE TABLE upsertSink (" - + " real_data float" - + ") WITH (" - + " 'connector.type'='jdbc'," - + " 'connector.url'='" - + DB_URL - + "'," - + " 'connector.table'='REAL_TABLE'" - + ")"); - - tEnv.executeSql("INSERT INTO upsertSink SELECT CAST(1.0 as FLOAT)").await(); - check(new Row[] {Row.of(1.0f)}, DB_URL, "REAL_TABLE", new String[] {"real_data"}); - } - - @Test - public void testUpsert() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - Table t = - tEnv.fromDataStream( - get4TupleDataStream(env) - .assignTimestampsAndWatermarks( - new AscendingTimestampExtractor< - Tuple4>() { - @Override - public long extractAscendingTimestamp( - Tuple4 - element) { - return element.f0; - } - }), - $("id"), - $("num"), - $("text"), - $("ts")); - - tEnv.createTemporaryView("T", t); - tEnv.executeSql( - "CREATE TABLE upsertSink (" - + " cnt BIGINT," - + " lencnt BIGINT," - + " cTag INT," - + " ts TIMESTAMP(3)" - + ") WITH (" - + " 'connector.type'='jdbc'," - + " 'connector.url'='" - + DB_URL - + "'," - + " 'connector.table'='" - + OUTPUT_TABLE1 - + "'" - + ")"); - - tEnv.executeSql( - "INSERT INTO upsertSink \n" - + "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS ts\n" - + "FROM (\n" - + " SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS ts\n" - + " FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n" - + " GROUP BY len, cTag\n" - + ")\n" - + "GROUP BY cnt, cTag") - .await(); - check( - new Row[] { - Row.of(1, 5, 1, Timestamp.valueOf("1970-01-01 00:00:00.006")), - Row.of(7, 1, 1, Timestamp.valueOf("1970-01-01 00:00:00.021")), - Row.of(9, 1, 1, Timestamp.valueOf("1970-01-01 00:00:00.015")) - }, - DB_URL, - OUTPUT_TABLE1, - new String[] {"cnt", "lencnt", "cTag", "ts"}); - } - - @Test - public void testAppend() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - env.getConfig().setParallelism(1); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - Table t = - tEnv.fromDataStream( - get4TupleDataStream(env), $("id"), $("num"), $("text"), $("ts")); - - tEnv.registerTable("T", t); - - tEnv.executeSql( - "CREATE TABLE upsertSink (" - + " id INT," - + " num BIGINT," - + " ts TIMESTAMP(3)" - + ") WITH (" - + " 'connector.type'='jdbc'," - + " 'connector.url'='" - + DB_URL - + "'," - + " 'connector.table'='" - + OUTPUT_TABLE2 - + "'" - + ")"); - - tEnv.executeSql("INSERT INTO upsertSink SELECT id, num, ts FROM T WHERE id IN (2, 10, 20)") - .await(); - check( - new Row[] { - Row.of(2, 2, Timestamp.valueOf("1970-01-01 00:00:00.002")), - Row.of(10, 4, Timestamp.valueOf("1970-01-01 00:00:00.01")), - Row.of(20, 6, Timestamp.valueOf("1970-01-01 00:00:00.02")) - }, - DB_URL, - OUTPUT_TABLE2, - new String[] {"id", "num", "ts"}); - } - - @Test - public void testBatchSink() throws Exception { - TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); - - tEnv.executeSql( - "CREATE TABLE USER_RESULT(" - + "NAME VARCHAR," - + "SCORE BIGINT" - + ") WITH ( " - + "'connector.type' = 'jdbc'," - + "'connector.url'='" - + DB_URL - + "'," - + "'connector.table' = '" - + OUTPUT_TABLE3 - + "'" - + ")"); - - tEnv.executeSql( - "INSERT INTO USER_RESULT\n" - + "SELECT user_name, score " - + "FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), " - + "(42, 'Kim'), (1, 'Bob')) " - + "AS UserCountTable(score, user_name)") - .await(); - - check( - new Row[] { - Row.of("Bob", 1), - Row.of("Tom", 22), - Row.of("Kim", 42), - Row.of("Kim", 42), - Row.of("Bob", 1) - }, - DB_URL, - OUTPUT_TABLE3, - new String[] {"NAME", "SCORE"}); - } -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java index c8d8baeee7c03..0ab5fc5a6f43e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java @@ -26,16 +26,12 @@ import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; -import org.apache.flink.table.types.FieldsDataType; -import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; import java.util.List; import java.util.Optional; -import static org.apache.flink.util.Preconditions.checkArgument; - /** Utilities to {@link TableSchema}. */ @Internal public class TableSchemaUtils { @@ -70,33 +66,6 @@ public static TableSchema getPhysicalSchema(TableSchema tableSchema) { return builder.build(); } - /** - * Creates a new {@link TableSchema} with the projected fields from another {@link TableSchema}. - * The new {@link TableSchema} doesn't contain any primary key or watermark information. - * - *

When extracting the fields from the origin schema, the fields may get name conflicts in - * the new schema. Considering that the path to the fields is unique in schema, use the path as - * the new name to resolve the name conflicts in the new schema. If name conflicts still exists, - * it will add postfix in the fashion "_$%d" to resolve. - * - * @see org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown - */ - public static TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) { - checkArgument( - containsPhysicalColumnsOnly(tableSchema), - "Projection is only supported for physical columns."); - TableSchema.Builder builder = TableSchema.builder(); - - FieldsDataType fields = - (FieldsDataType) - DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields); - RowType topFields = (RowType) fields.getLogicalType(); - for (int i = 0; i < topFields.getFieldCount(); i++) { - builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i)); - } - return builder.build(); - } - /** Returns true if there are only physical columns in the given {@link TableSchema}. */ public static boolean containsPhysicalColumnsOnly(TableSchema schema) { Preconditions.checkNotNull(schema); diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java index 638e0ef30f250..55161aeb115e3 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java @@ -74,59 +74,4 @@ public void testDropConstraint() { exceptionRule.expectMessage("Constraint ct2 to drop does not exist"); TableSchemaUtils.dropConstraint(originalSchema, "ct2"); } - - @Test - public void testInvalidProjectSchema() { - TableSchema schema = - TableSchema.builder() - .field("a", DataTypes.INT().notNull()) - .field("b", DataTypes.STRING()) - .field("c", DataTypes.INT(), "a + 1") - .field("t", DataTypes.TIMESTAMP(3)) - .primaryKey("ct1", new String[] {"a"}) - .watermark("t", "t", DataTypes.TIMESTAMP(3)) - .build(); - exceptionRule.expect(IllegalArgumentException.class); - exceptionRule.expectMessage("Projection is only supported for physical columns."); - int[][] projectedFields = {{1}}; - TableSchemaUtils.projectSchema(schema, projectedFields); - } - - @Test - public void testProjectSchema() { - TableSchema schema = - TableSchema.builder() - .field("a", DataTypes.INT().notNull()) - .field("b", DataTypes.STRING()) - .field("t", DataTypes.TIMESTAMP(3)) - .primaryKey("a") - .watermark("t", "t", DataTypes.TIMESTAMP(3)) - .build(); - - int[][] projectedFields = {{2}, {0}}; - TableSchema projected = TableSchemaUtils.projectSchema(schema, projectedFields); - TableSchema expected = - TableSchema.builder() - .field("t", DataTypes.TIMESTAMP(3)) - .field("a", DataTypes.INT().notNull()) - .build(); - assertEquals(expected, projected); - } - - @Test - public void testProjectSchemaWithNameConflict() { - TableSchema schema = - TableSchema.builder() - .field("a", DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.STRING()))) - .field("f0", DataTypes.STRING()) - .build(); - int[][] projectedFields = {{0, 0}, {1}}; - TableSchema projected = TableSchemaUtils.projectSchema(schema, projectedFields); - TableSchema expected = - TableSchema.builder() - .field("a_f0", DataTypes.STRING()) - .field("f0", DataTypes.STRING()) - .build(); - assertEquals(expected, projected); - } }