Skip to content

Commit

Permalink
DBZ-543 Removing a few unused methods
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling authored and jpechane committed Jan 15, 2018
1 parent 5c88431 commit b99bdf7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 127 deletions.
Expand Up @@ -5,10 +5,7 @@
*/
package io.debezium.relational;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -26,7 +23,6 @@
import io.debezium.annotation.Immutable;
import io.debezium.annotation.ThreadSafe;
import io.debezium.data.SchemaUtil;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.mapping.ColumnMapper;
import io.debezium.relational.mapping.ColumnMappers;

Expand Down Expand Up @@ -64,34 +60,6 @@ public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Functio
this.valueConverterProvider = valueConverterProvider;
}

/**
* Create a {@link TableSchema} from the given JDBC {@link ResultSet}. The resulting TableSchema will have no primary key,
* and its {@link TableSchema#valueSchema()} will contain fields for each column in the result set.
*
* @param resultSet the result set for a query; may not be null
* @param name the name of the value schema; may not be null
* @return the table schema that can be used for sending rows of data for this table to Kafka Connect; never null
* @throws SQLException if an error occurs while using the result set's metadata
*/
public TableSchema create(ResultSet resultSet, String name) throws SQLException {
// Determine the columns that make up the result set ...
List<Column> columns = new ArrayList<>();
JdbcConnection.columnsFor(resultSet, columns::add);

// Create a schema that represents these columns ...
String schemaName = schemaNameValidator.apply(name);
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(schemaName);
columns.forEach(column -> addField(schemaBuilder, column, null));
Schema valueSchema = schemaBuilder.build();

// And a generator that can be used to create values from rows in the result set ...
TableId id = new TableId(null, null, name);
Function<Object[], Struct> valueGenerator = createValueGenerator(valueSchema, id, columns, null, null);

// Finally create our result object with no primary key or key generator ...
return new TableSchema(null, null, valueSchema, valueGenerator);
}

/**
* Create a {@link TableSchema} from the given {@link Table table definition}. The resulting TableSchema will have a
* {@link TableSchema#keySchema() key schema} that contains all of the columns that make up the table's primary key,
Expand Down
91 changes: 16 additions & 75 deletions debezium-core/src/main/java/io/debezium/relational/Tables.java
Expand Up @@ -5,7 +5,6 @@
*/
package io.debezium.relational;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -22,7 +21,7 @@

/**
* Structural definitions for a set of tables in a JDBC database.
*
*
* @author Randall Hauch
*/
@ThreadSafe
Expand All @@ -48,7 +47,7 @@ public static TableNameFilter filterFor( Predicate<TableId> predicate) {
public static interface TableNameFilter {
/**
* Determine whether the named table should be included.
*
*
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
Expand All @@ -66,7 +65,7 @@ public static interface TableNameFilter {
public static interface ColumnNameFilter {
/**
* Determine whether the named column should be included in the table's {@link Schema} definition.
*
*
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
Expand All @@ -87,19 +86,19 @@ public static interface ColumnNameFilter {
*/
public Tables() {
}

protected Tables( Tables other) {
this.tablesByTableId.putAll(other.tablesByTableId);
}

@Override
public Tables clone() {
return new Tables(this);
}

/**
* Get the number of tables that are in this object.
*
*
* @return the table count
*/
public int size() {
Expand All @@ -116,7 +115,7 @@ public Set<TableId> drainChanges() {

/**
* Add or update the definition for the identified table.
*
*
* @param tableId the identifier of the table
* @param columnDefs the list of column definitions; may not be null or empty
* @param primaryKeyColumnNames the list of the column names that make up the primary key; may be null or empty
Expand All @@ -139,7 +138,7 @@ public Table overwriteTable(TableId tableId, List<Column> columnDefs, List<Strin

/**
* Add or update the definition for the identified table.
*
*
* @param table the definition for the table; may not be null
* @return the previous table definition, or null if there was no prior table definition
*/
Expand All @@ -156,7 +155,7 @@ public Table overwriteTable(Table table) {

/**
* Rename an existing table.
*
*
* @param existingTableId the identifier of the existing table to be renamed; may not be null
* @param newTableId the new identifier for the table; may not be null
* @return the previous table definition, or null if there was no prior table definition
Expand All @@ -179,7 +178,7 @@ public Table renameTable(TableId existingTableId, TableId newTableId) {

/**
* Add or update the definition for the identified table.
*
*
* @param tableId the identifier of the table
* @param changer the function that accepts the current {@link Table} and returns either the same or an updated
* {@link Table}; may not be null
Expand All @@ -198,34 +197,9 @@ public Table updateTable(TableId tableId, Function<Table, Table> changer) {
});
}

/**
* Add or update the definition for the identified table.
*
* @param tableId the identifier of the table
* @param changer the function that accepts and changes the mutable ordered list of column definitions and the mutable set of
* column names that make up the primary key; may not be null
* @return the previous table definition, or null if there was no prior table definition
*/
public Table updateTable(TableId tableId, TableChanger changer) {
return lock.write(() -> {
TableImpl existing = tablesByTableId.get(tableId);
List<Column> columns = new ArrayList<>(existing.columns());
List<String> pkColumnNames = new ArrayList<>(existing.primaryKeyColumnNames());
changer.rewrite(columns, pkColumnNames);
TableImpl updated = new TableImpl(tableId, columns, pkColumnNames, existing.defaultCharsetName());
tablesByTableId.put(tableId, updated);
changes.add(tableId);
return existing;
});
}

public static interface TableChanger {
void rewrite(List<Column> columnDefinitions, List<String> primaryKeyNames);
}

/**
* Remove the definition of the identified table.
*
*
* @param tableId the identifier of the table
* @return the existing table definition that was removed, or null if there was no prior table definition
*/
Expand All @@ -238,7 +212,7 @@ public Table removeTable(TableId tableId) {

/**
* Obtain the definition of the identified table.
*
*
* @param tableId the identifier of the table
* @return the table definition, or null if there was no definition for the identified table
*/
Expand All @@ -248,7 +222,7 @@ public Table forTable(TableId tableId) {

/**
* Obtain the definition of the identified table.
*
*
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
Expand All @@ -262,7 +236,7 @@ public Table forTable(String catalogName, String schemaName, String tableName) {

/**
* Get the set of {@link TableId}s for which there is a {@link Schema}.
*
*
* @return the immutable set of table identifiers; never null
*/
public Set<TableId> tableIds() {
Expand All @@ -273,7 +247,7 @@ public Set<TableId> tableIds() {
* Obtain an editor for the table with the given ID. This method does not lock the set of table definitions, so use
* with caution. The resulting editor can be used to modify the table definition, but when completed the new {@link Table}
* needs to be added back to this object via {@link #overwriteTable(Table)}.
*
*
* @param tableId the identifier of the table
* @return the editor for the table, or null if there is no table with the specified ID
*/
Expand All @@ -282,27 +256,11 @@ public TableEditor editTable(TableId tableId) {
return table == null ? null : table.edit();
}

/**
* Obtain an editor for the identified table. This method does not lock the set of table definitions, so use
* with caution. The resulting editor can be used to modify the table definition, but when completed the new {@link Table}
* needs to be added back to this object via {@link #overwriteTable(Table)}.
*
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param tableName the name of the table
* @return the editor for the table, or null if there is no table with the specified ID
*/
public TableEditor editTable(String catalogName, String schemaName, String tableName) {
return editTable(new TableId(catalogName, schemaName, tableName));
}

/**
* Obtain an editor for the table with the given ID. This method does not lock or modify the set of table definitions, so use
* with caution. The resulting editor can be used to modify the table definition, but when completed the new {@link Table}
* needs to be added back to this object via {@link #overwriteTable(Table)}.
*
*
* @param tableId the identifier of the table
* @return the editor for the table, or null if there is no table with the specified ID
*/
Expand All @@ -311,23 +269,6 @@ public TableEditor editOrCreateTable(TableId tableId) {
return table == null ? Table.editor().tableId(tableId) : table.edit();
}

/**
* Obtain an editor for the identified table or, if there is no such table, create an editor with the specified ID.
* This method does not lock or modify the set of table definitions, so use with caution. The resulting editor can be used to
* modify the table definition, but when completed the new {@link Table} needs to be added back to this object via
* {@link #overwriteTable(Table)}.
*
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param tableName the name of the table
* @return the editor for the table, or null if there is no table with the specified ID
*/
public TableEditor editOrCreateTable(String catalogName, String schemaName, String tableName) {
return editOrCreateTable(new TableId(catalogName, schemaName, tableName));
}

@Override
public int hashCode() {
return tablesByTableId.hashCode();
Expand Down
32 changes: 12 additions & 20 deletions debezium-core/src/test/java/io/debezium/data/KeyValueStore.java
Expand Up @@ -28,7 +28,7 @@
/**
* A test utility for accumulating the {@link SourceRecord}s that represent change events on rows. This store applies the
* changes and maintains the current state of the rows.
*
*
* @author Randall Hauch
*/
public class KeyValueStore {
Expand All @@ -55,7 +55,7 @@ protected static Function<String, TableId> fromRegex(String regex, int groupNumb
/**
* Create a KeyValueStore that uses the supplied regular expression and group number to extract the {@link TableId} from
* the topic name.
*
*
* @param regex the regular expression that identifies the table ID within the topic name; may not be null
* @param groupNumber the group number in the regex for the table ID string
* @return the key value store
Expand All @@ -66,7 +66,7 @@ public static KeyValueStore createForTopicsMatching(String regex, int groupNumbe

/**
* Create a KeyValueStore that removes from the topic names the supplied prefix to obtain the {@link TableId}.
*
*
* @param prefix the prefix after which all of the topic name forms the table ID; may not be null
* @return the key value store
*/
Expand All @@ -89,11 +89,11 @@ public void add(SourceRecord record) {
getOrCreate(tableId).add(record);
}
}

public List<SourceRecord> sourceRecords() {
return sourceRecords;
}

public Collection collection(String fullyQualifiedName) {
return collection(TableId.parse(fullyQualifiedName));
}
Expand All @@ -118,14 +118,6 @@ public int collectionCount() {
return collectionsByTableId.size();
}

public Collection getOrCreate(String fullyQualifiedName) {
return getOrCreate(TableId.parse(fullyQualifiedName));
}

public Collection getOrCreate(String catalog, String tableName) {
return getOrCreate(new TableId(catalog, null, tableName));
}

public Collection getOrCreate(TableId tableId) {
return collectionsByTableId.computeIfAbsent(tableId, Collection::new);
}
Expand All @@ -149,7 +141,7 @@ public TableId tableId() {

/**
* Get the number of changes to the key schema for events in this collection.
*
*
* @return the count; never negative
*/
public long numberOfKeySchemaChanges() {
Expand All @@ -158,7 +150,7 @@ public long numberOfKeySchemaChanges() {

/**
* Get the number of changes to the key schema for events in this collection.
*
*
* @return the count; never negative
*/
public long numberOfValueSchemaChanges() {
Expand All @@ -167,7 +159,7 @@ public long numberOfValueSchemaChanges() {

/**
* Get the number of {@link Operation#CREATE CREATE} records {@link #add(SourceRecord) added} to this collection.
*
*
* @return the count; never negative
*/
public long numberOfCreates() {
Expand All @@ -176,7 +168,7 @@ public long numberOfCreates() {

/**
* Get the number of {@link Operation#DELETE DELETE} records {@link #add(SourceRecord) added} to this collection.
*
*
* @return the count; never negative
*/
public long numberOfDeletes() {
Expand All @@ -185,7 +177,7 @@ public long numberOfDeletes() {

/**
* Get the number of {@link Operation#READ READ} records {@link #add(SourceRecord) added} to this collection.
*
*
* @return the count; never negative
*/
public long numberOfReads() {
Expand All @@ -194,7 +186,7 @@ public long numberOfReads() {

/**
* Get the number of {@link Operation#UPDATE UPDATE} records {@link #add(SourceRecord) added} to this collection.
*
*
* @return the count; never negative
*/
public long numberOfUpdates() {
Expand All @@ -203,7 +195,7 @@ public long numberOfUpdates() {

/**
* Get the number of tombstone records that were {@link #add(SourceRecord) added} to this collection.
*
*
* @return the count; never negative
*/
public long numberOfTombstones() {
Expand Down

0 comments on commit b99bdf7

Please sign in to comment.