Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-2116: Fix offsets compatibility #436

Merged
merged 14 commits into from
Jun 28, 2018
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
files="(DataConverter|FieldsMetadata|JdbcSourceTask|GenericDatabaseDialect).java"/>

<suppress checks="MethodLength"
files="(DataConverter|GenericDatabaseDialect).java"/>
files="(DataConverter|GenericDatabaseDialect|JdbcSourceTask).java"/>

<suppress checks="ParameterNumber"
files="(ColumnDefinition|GenericDatabaseDialect).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ public class JdbcSourceConnectorConstants {
public static final String TABLE_NAME_KEY = "table";
public static final String QUERY_NAME_KEY = "query";
public static final String QUERY_NAME_VALUE = "query";
public static final String OFFSET_PROTOCOL_VERSION = "version";
public static final String PROTOCOL_VERSION_ZERO = "0";
public static final String PROTOCOL_VERSION_ONE = "1";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@

package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.ExpressionBuilder;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.Version;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
Expand All @@ -29,6 +37,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand All @@ -37,13 +46,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.Version;

/**
* JdbcSourceTask is a Kafka Connect SourceTask implementation that reads from JDBC databases and
* generates Kafka Connect records.
Expand Down Expand Up @@ -113,9 +115,7 @@ public void start(Map<String, String> properties) {
switch (queryMode) {
case TABLE:
for (String table : tables) {
Map<String, String> partition =
Collections.singletonMap(JdbcSourceConnectorConstants.TABLE_NAME_KEY, table);
partitions.add(partition);
partitionsForTable(partitions, table);
}
break;
case QUERY:
Expand All @@ -126,6 +126,7 @@ public void start(Map<String, String> properties) {
throw new ConnectException("Unknown query mode: " + queryMode);
}
offsets = context.offsetStorageReader().offsets(partitions);
log.trace("The partition offsets are {}", offsets);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be nice at DEBUG level

}

String incrementingColumn
Expand All @@ -138,6 +139,7 @@ public void start(Map<String, String> properties) {
= config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG);

for (String tableOrQuery : tablesOrQuery) {
final List<Map<String, String>> partitions = new ArrayList<>();
final Map<String, String> partition;
switch (queryMode) {
case TABLE:
Expand All @@ -149,21 +151,35 @@ public void start(Map<String, String> properties) {
timestampColumns
);
}
partition = Collections.singletonMap(
JdbcSourceConnectorConstants.TABLE_NAME_KEY,
tableOrQuery
);
partitionsForTable(partitions, tableOrQuery);
break;
case QUERY:
partition = Collections.singletonMap(
JdbcSourceConnectorConstants.QUERY_NAME_KEY,
JdbcSourceConnectorConstants.QUERY_NAME_VALUE
);
partitions.add(partition);
break;
default:
throw new ConnectException("Unexpected query mode: " + queryMode);
}
Map<String, Object> offset = offsets == null ? null : offsets.get(partition);

Map<String, Object> offset = null;
int offsetProtocolVersion = 0;
if (offsets != null) {
for (Map<String, String> toCheckPartition : partitions) {
offset = offsets.get(toCheckPartition);
if (offset != null) {
log.trace("Found non-null offset {} for partition {}", offsets, toCheckPartition);
offsetProtocolVersion = Integer.parseInt(
toCheckPartition.getOrDefault(
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION,
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ZERO
));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have the TRACE log message after getting the protocol version, and then including the protocol version in the log message.

break;
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is looking for a single protocol amongst all of the topics, yet it is breaking as soon as it finds the first. This might be easier to understand if we put that into a separate method, with JavaDoc that explains what it is doing and the assumptions it is making.

I'm not sure that the assumption about protocol versions might be valid. Imagine a connector runs with table A and B, and offsets are both stored. Then the connector is changed to use table A only and restarted. The offsets for table A and B both use protocol V0. Then the connector is upgraded to use protocol V1, and this updates the offsets for table A. Then the connector is changed to also consume from table B, but now the offsets for table A are V1 while the offsets for table B are still V1.

Is it not sufficient to always be able to read the offsets for each partition using that partition's protocol version? Obviously all future offsets would be written with the latest protocol version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, maybe this is simply trying to find the latest protocol version to use for writes? We definitely need to extract this to a separate method and have good JavaDoc.

Why not always write using the most recent protocol version?


String topicPrefix = config.getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG);

Expand All @@ -181,7 +197,8 @@ public void start(Map<String, String> properties) {
null,
incrementingColumn,
offset,
timestampDelayInterval
timestampDelayInterval,
offsetProtocolVersion
)
);
} else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) {
Expand All @@ -194,7 +211,8 @@ public void start(Map<String, String> properties) {
timestampColumns,
null,
offset,
timestampDelayInterval
timestampDelayInterval,
offsetProtocolVersion
)
);
} else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
Expand All @@ -207,7 +225,8 @@ public void start(Map<String, String> properties) {
timestampColumns,
incrementingColumn,
offset,
timestampDelayInterval
timestampDelayInterval,
offsetProtocolVersion
)
);
}
Expand All @@ -216,6 +235,24 @@ public void start(Map<String, String> properties) {
running.set(true);
}

private void partitionsForTable(List<Map<String, String>> partitions, String table) {
TableId tableId = dialect.parseTableIdentifier(table);
Map<String, String> partition = Collections.singletonMap(
JdbcSourceConnectorConstants.TABLE_NAME_KEY,
tableId.tableName()
);
partitions.add(partition);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a comment:

// Define source partition map for offset protocol V1 that uses the table FQN

String fqn = ExpressionBuilder.create().append(tableId, false).toString();
Map<String, String> partitionWithFqn = new HashMap<>();
partitionWithFqn.put(JdbcSourceConnectorConstants.TABLE_NAME_KEY, fqn);
partitionWithFqn.put(
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION,
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE)
;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the ) must be on the line with the semicolon.

partitions.add(partitionWithFqn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding both the single table name and FQN as separate partitions in the map?

}

@Override
public void stop() throws ConnectException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.source.SchemaMapping.FieldSetter;
import io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.CriteriaValues;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.DateTimeUtils;
import io.confluent.connect.jdbc.util.ExpressionBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -29,17 +36,10 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.source.SchemaMapping.FieldSetter;
import io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.CriteriaValues;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.DateTimeUtils;
import io.confluent.connect.jdbc.util.ExpressionBuilder;

/**
* <p>
* TimestampIncrementingTableQuerier performs incremental loading of data using two mechanisms: a
Expand Down Expand Up @@ -68,19 +68,22 @@ public class TimestampIncrementingTableQuerier extends TableQuerier implements C
private long timestampDelay;
private TimestampIncrementingOffset offset;
private TimestampIncrementingCriteria criteria;
private boolean useFqn;

public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode, String name,
String topicPrefix,
List<String> timestampColumnNames,
String incrementingColumnName,
Map<String, Object> offsetMap, Long timestampDelay) {
Map<String, Object> offsetMap, Long timestampDelay,
int offsetProtocolVersion) {
super(dialect, mode, name, topicPrefix);
this.incrementingColumnName = incrementingColumnName;
this.timestampColumnNames = timestampColumnNames != null
? timestampColumnNames : Collections.<String>emptyList();
this.timestampDelay = timestampDelay;
this.offset = TimestampIncrementingOffset.fromMap(offsetMap);

//unknown offset means new table and can fqn scheme
this.useFqn = offsetProtocolVersion > 0 || offsetMap == null || offsetMap.isEmpty();
this.timestampColumns = new ArrayList<>();
for (String timestampColumn : this.timestampColumnNames) {
if (timestampColumn != null && !timestampColumn.isEmpty()) {
Expand Down Expand Up @@ -171,9 +174,17 @@ public SourceRecord extractRecord() throws SQLException {
final Map<String, String> partition;
switch (mode) {
case TABLE:
String name = tableId.tableName(); // backward compatible
partition = Collections.singletonMap(JdbcSourceConnectorConstants.TABLE_NAME_KEY, name);
topic = topicPrefix + name;
if (useFqn) {
String fqn = ExpressionBuilder.create().append(tableId, false).toString();
partition = new HashMap<>();
partition.put(JdbcSourceConnectorConstants.TABLE_NAME_KEY, fqn);
partition.put(JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION, "1");
topic = topicPrefix + fqn;
} else {
String name = tableId.tableName(); // backward compatible
partition = Collections.singletonMap(JdbcSourceConnectorConstants.TABLE_NAME_KEY, name);
topic = topicPrefix + name;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like all of this could be extracted into a strategy function to compute the topic given the table ID (e.g., topicName(TableId)), and a second method to compute the partitions for a table ID (e.g., sourcePartitions(TableId)). We could have separate strategy implementations for each protocol version, and a particular strategy implementation can be passed into this class.

break;
case QUERY:
partition = Collections.singletonMap(JdbcSourceConnectorConstants.QUERY_NAME_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,16 @@ public class JdbcSourceTaskTestBase {

protected static String SINGLE_TABLE_NAME = "test";
protected static Map<String, Object> SINGLE_TABLE_PARTITION = new HashMap<>();
protected static Map<String, Object> SINGLE_TABLE_PARTITION_WITH_VERSION = new HashMap<>();

static {
SINGLE_TABLE_PARTITION.put(JdbcSourceConnectorConstants.TABLE_NAME_KEY, SINGLE_TABLE_NAME);
SINGLE_TABLE_PARTITION_WITH_VERSION.put(
JdbcSourceConnectorConstants.TABLE_NAME_KEY,
SINGLE_TABLE_NAME);
SINGLE_TABLE_PARTITION_WITH_VERSION.put(
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION,
"1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should all be moved into the PartitionProtocols class, which can have static instances of the strategy for each protocol.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit: the ); should be on the next line.

}

protected static EmbeddedDerby.TableName SINGLE_TABLE
Expand Down
Loading