diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index 665277e92ae..3a251d98180 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -5,10 +5,7 @@ */ package io.debezium.relational; -import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Types; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -26,7 +23,6 @@ import io.debezium.annotation.Immutable; import io.debezium.annotation.ThreadSafe; import io.debezium.data.SchemaUtil; -import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.mapping.ColumnMapper; import io.debezium.relational.mapping.ColumnMappers; @@ -64,34 +60,6 @@ public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Functio this.valueConverterProvider = valueConverterProvider; } - /** - * Create a {@link TableSchema} from the given JDBC {@link ResultSet}. The resulting TableSchema will have no primary key, - * and its {@link TableSchema#valueSchema()} will contain fields for each column in the result set. - * - * @param resultSet the result set for a query; may not be null - * @param name the name of the value schema; may not be null - * @return the table schema that can be used for sending rows of data for this table to Kafka Connect; never null - * @throws SQLException if an error occurs while using the result set's metadata - */ - public TableSchema create(ResultSet resultSet, String name) throws SQLException { - // Determine the columns that make up the result set ... - List columns = new ArrayList<>(); - JdbcConnection.columnsFor(resultSet, columns::add); - - // Create a schema that represents these columns ... - String schemaName = schemaNameValidator.apply(name); - SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(schemaName); - columns.forEach(column -> addField(schemaBuilder, column, null)); - Schema valueSchema = schemaBuilder.build(); - - // And a generator that can be used to create values from rows in the result set ... - TableId id = new TableId(null, null, name); - Function valueGenerator = createValueGenerator(valueSchema, id, columns, null, null); - - // Finally create our result object with no primary key or key generator ... - return new TableSchema(null, null, valueSchema, valueGenerator); - } - /** * Create a {@link TableSchema} from the given {@link Table table definition}. The resulting TableSchema will have a * {@link TableSchema#keySchema() key schema} that contains all of the columns that make up the table's primary key, diff --git a/debezium-core/src/main/java/io/debezium/relational/Tables.java b/debezium-core/src/main/java/io/debezium/relational/Tables.java index adb26b03b4d..15d7b31aae8 100644 --- a/debezium-core/src/main/java/io/debezium/relational/Tables.java +++ b/debezium-core/src/main/java/io/debezium/relational/Tables.java @@ -5,7 +5,6 @@ */ package io.debezium.relational; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -22,7 +21,7 @@ /** * Structural definitions for a set of tables in a JDBC database. - * + * * @author Randall Hauch */ @ThreadSafe @@ -48,7 +47,7 @@ public static TableNameFilter filterFor( Predicate predicate) { public static interface TableNameFilter { /** * Determine whether the named table should be included. - * + * * @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not * show a schema for this table * @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not @@ -66,7 +65,7 @@ public static interface TableNameFilter { public static interface ColumnNameFilter { /** * Determine whether the named column should be included in the table's {@link Schema} definition. - * + * * @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not * show a schema for this table * @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not @@ -87,11 +86,11 @@ public static interface ColumnNameFilter { */ public Tables() { } - + protected Tables( Tables other) { this.tablesByTableId.putAll(other.tablesByTableId); } - + @Override public Tables clone() { return new Tables(this); @@ -99,7 +98,7 @@ public Tables clone() { /** * Get the number of tables that are in this object. - * + * * @return the table count */ public int size() { @@ -116,7 +115,7 @@ public Set drainChanges() { /** * Add or update the definition for the identified table. - * + * * @param tableId the identifier of the table * @param columnDefs the list of column definitions; may not be null or empty * @param primaryKeyColumnNames the list of the column names that make up the primary key; may be null or empty @@ -139,7 +138,7 @@ public Table overwriteTable(TableId tableId, List columnDefs, List changer) { }); } - /** - * Add or update the definition for the identified table. - * - * @param tableId the identifier of the table - * @param changer the function that accepts and changes the mutable ordered list of column definitions and the mutable set of - * column names that make up the primary key; may not be null - * @return the previous table definition, or null if there was no prior table definition - */ - public Table updateTable(TableId tableId, TableChanger changer) { - return lock.write(() -> { - TableImpl existing = tablesByTableId.get(tableId); - List columns = new ArrayList<>(existing.columns()); - List pkColumnNames = new ArrayList<>(existing.primaryKeyColumnNames()); - changer.rewrite(columns, pkColumnNames); - TableImpl updated = new TableImpl(tableId, columns, pkColumnNames, existing.defaultCharsetName()); - tablesByTableId.put(tableId, updated); - changes.add(tableId); - return existing; - }); - } - - public static interface TableChanger { - void rewrite(List columnDefinitions, List primaryKeyNames); - } - /** * Remove the definition of the identified table. - * + * * @param tableId the identifier of the table * @return the existing table definition that was removed, or null if there was no prior table definition */ @@ -238,7 +212,7 @@ public Table removeTable(TableId tableId) { /** * Obtain the definition of the identified table. - * + * * @param tableId the identifier of the table * @return the table definition, or null if there was no definition for the identified table */ @@ -248,7 +222,7 @@ public Table forTable(TableId tableId) { /** * Obtain the definition of the identified table. - * + * * @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not * show a schema for this table * @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not @@ -262,7 +236,7 @@ public Table forTable(String catalogName, String schemaName, String tableName) { /** * Get the set of {@link TableId}s for which there is a {@link Schema}. - * + * * @return the immutable set of table identifiers; never null */ public Set tableIds() { @@ -273,7 +247,7 @@ public Set tableIds() { * Obtain an editor for the table with the given ID. This method does not lock the set of table definitions, so use * with caution. The resulting editor can be used to modify the table definition, but when completed the new {@link Table} * needs to be added back to this object via {@link #overwriteTable(Table)}. - * + * * @param tableId the identifier of the table * @return the editor for the table, or null if there is no table with the specified ID */ @@ -282,27 +256,11 @@ public TableEditor editTable(TableId tableId) { return table == null ? null : table.edit(); } - /** - * Obtain an editor for the identified table. This method does not lock the set of table definitions, so use - * with caution. The resulting editor can be used to modify the table definition, but when completed the new {@link Table} - * needs to be added back to this object via {@link #overwriteTable(Table)}. - * - * @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not - * show a schema for this table - * @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not - * show a schema for this table - * @param tableName the name of the table - * @return the editor for the table, or null if there is no table with the specified ID - */ - public TableEditor editTable(String catalogName, String schemaName, String tableName) { - return editTable(new TableId(catalogName, schemaName, tableName)); - } - /** * Obtain an editor for the table with the given ID. This method does not lock or modify the set of table definitions, so use * with caution. The resulting editor can be used to modify the table definition, but when completed the new {@link Table} * needs to be added back to this object via {@link #overwriteTable(Table)}. - * + * * @param tableId the identifier of the table * @return the editor for the table, or null if there is no table with the specified ID */ @@ -311,23 +269,6 @@ public TableEditor editOrCreateTable(TableId tableId) { return table == null ? Table.editor().tableId(tableId) : table.edit(); } - /** - * Obtain an editor for the identified table or, if there is no such table, create an editor with the specified ID. - * This method does not lock or modify the set of table definitions, so use with caution. The resulting editor can be used to - * modify the table definition, but when completed the new {@link Table} needs to be added back to this object via - * {@link #overwriteTable(Table)}. - * - * @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not - * show a schema for this table - * @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not - * show a schema for this table - * @param tableName the name of the table - * @return the editor for the table, or null if there is no table with the specified ID - */ - public TableEditor editOrCreateTable(String catalogName, String schemaName, String tableName) { - return editOrCreateTable(new TableId(catalogName, schemaName, tableName)); - } - @Override public int hashCode() { return tablesByTableId.hashCode(); diff --git a/debezium-core/src/test/java/io/debezium/data/KeyValueStore.java b/debezium-core/src/test/java/io/debezium/data/KeyValueStore.java index 6f788527639..837d23a551b 100644 --- a/debezium-core/src/test/java/io/debezium/data/KeyValueStore.java +++ b/debezium-core/src/test/java/io/debezium/data/KeyValueStore.java @@ -28,7 +28,7 @@ /** * A test utility for accumulating the {@link SourceRecord}s that represent change events on rows. This store applies the * changes and maintains the current state of the rows. - * + * * @author Randall Hauch */ public class KeyValueStore { @@ -55,7 +55,7 @@ protected static Function fromRegex(String regex, int groupNumb /** * Create a KeyValueStore that uses the supplied regular expression and group number to extract the {@link TableId} from * the topic name. - * + * * @param regex the regular expression that identifies the table ID within the topic name; may not be null * @param groupNumber the group number in the regex for the table ID string * @return the key value store @@ -66,7 +66,7 @@ public static KeyValueStore createForTopicsMatching(String regex, int groupNumbe /** * Create a KeyValueStore that removes from the topic names the supplied prefix to obtain the {@link TableId}. - * + * * @param prefix the prefix after which all of the topic name forms the table ID; may not be null * @return the key value store */ @@ -89,11 +89,11 @@ public void add(SourceRecord record) { getOrCreate(tableId).add(record); } } - + public List sourceRecords() { return sourceRecords; } - + public Collection collection(String fullyQualifiedName) { return collection(TableId.parse(fullyQualifiedName)); } @@ -118,14 +118,6 @@ public int collectionCount() { return collectionsByTableId.size(); } - public Collection getOrCreate(String fullyQualifiedName) { - return getOrCreate(TableId.parse(fullyQualifiedName)); - } - - public Collection getOrCreate(String catalog, String tableName) { - return getOrCreate(new TableId(catalog, null, tableName)); - } - public Collection getOrCreate(TableId tableId) { return collectionsByTableId.computeIfAbsent(tableId, Collection::new); } @@ -149,7 +141,7 @@ public TableId tableId() { /** * Get the number of changes to the key schema for events in this collection. - * + * * @return the count; never negative */ public long numberOfKeySchemaChanges() { @@ -158,7 +150,7 @@ public long numberOfKeySchemaChanges() { /** * Get the number of changes to the key schema for events in this collection. - * + * * @return the count; never negative */ public long numberOfValueSchemaChanges() { @@ -167,7 +159,7 @@ public long numberOfValueSchemaChanges() { /** * Get the number of {@link Operation#CREATE CREATE} records {@link #add(SourceRecord) added} to this collection. - * + * * @return the count; never negative */ public long numberOfCreates() { @@ -176,7 +168,7 @@ public long numberOfCreates() { /** * Get the number of {@link Operation#DELETE DELETE} records {@link #add(SourceRecord) added} to this collection. - * + * * @return the count; never negative */ public long numberOfDeletes() { @@ -185,7 +177,7 @@ public long numberOfDeletes() { /** * Get the number of {@link Operation#READ READ} records {@link #add(SourceRecord) added} to this collection. - * + * * @return the count; never negative */ public long numberOfReads() { @@ -194,7 +186,7 @@ public long numberOfReads() { /** * Get the number of {@link Operation#UPDATE UPDATE} records {@link #add(SourceRecord) added} to this collection. - * + * * @return the count; never negative */ public long numberOfUpdates() { @@ -203,7 +195,7 @@ public long numberOfUpdates() { /** * Get the number of tombstone records that were {@link #add(SourceRecord) added} to this collection. - * + * * @return the count; never negative */ public long numberOfTombstones() {