-
Notifications
You must be signed in to change notification settings - Fork 954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CC-2116: Fix offsets compatibility #436
Changes from 11 commits
27ea46c
919599c
f290f3f
b87191e
ede90f6
d4aeddd
59b75c6
bd3e97d
0fe9cbb
31bb6c7
41b99d5
2f66bc8
a61a819
f23d2f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,9 @@ | |
import java.sql.Connection; | ||
import java.sql.SQLException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Locale; | ||
|
@@ -42,6 +44,8 @@ | |
import io.confluent.connect.jdbc.util.CachedConnectionProvider; | ||
import io.confluent.connect.jdbc.util.ColumnDefinition; | ||
import io.confluent.connect.jdbc.util.ColumnId; | ||
import io.confluent.connect.jdbc.util.ExpressionBuilder; | ||
import io.confluent.connect.jdbc.util.TableId; | ||
import io.confluent.connect.jdbc.util.Version; | ||
|
||
/** | ||
|
@@ -105,6 +109,8 @@ public void start(Map<String, String> properties) { | |
? Collections.singletonList(query) : tables; | ||
|
||
String mode = config.getString(JdbcSourceTaskConfig.MODE_CONFIG); | ||
//used only in table mode | ||
Map<String, List<Map<String, String>>> tableToPartitions = new HashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
Map<Map<String, String>, Map<String, Object>> offsets = null; | ||
if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING) | ||
|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP) | ||
|
@@ -113,9 +119,9 @@ public void start(Map<String, String> properties) { | |
switch (queryMode) { | ||
case TABLE: | ||
for (String table : tables) { | ||
Map<String, String> partition = | ||
Collections.singletonMap(JdbcSourceConnectorConstants.TABLE_NAME_KEY, table); | ||
partitions.add(partition); | ||
List<Map<String, String>> tablePartitions = possibleTablePartitions(table); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need a comment here that describes what we're doing:
|
||
partitions.addAll(tablePartitions); | ||
tableToPartitions.put(table, tablePartitions); | ||
} | ||
break; | ||
case QUERY: | ||
|
@@ -126,6 +132,7 @@ public void start(Map<String, String> properties) { | |
throw new ConnectException("Unknown query mode: " + queryMode); | ||
} | ||
offsets = context.offsetStorageReader().offsets(partitions); | ||
log.trace("The partition offsets are {}", offsets); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be nice at DEBUG level |
||
} | ||
|
||
String incrementingColumn | ||
|
@@ -138,6 +145,7 @@ public void start(Map<String, String> properties) { | |
= config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG); | ||
|
||
for (String tableOrQuery : tablesOrQuery) { | ||
final List<Map<String, String>> tablePartitionsToCheck; | ||
final Map<String, String> partition; | ||
switch (queryMode) { | ||
case TABLE: | ||
|
@@ -149,21 +157,32 @@ public void start(Map<String, String> properties) { | |
timestampColumns | ||
); | ||
} | ||
partition = Collections.singletonMap( | ||
JdbcSourceConnectorConstants.TABLE_NAME_KEY, | ||
tableOrQuery | ||
); | ||
tablePartitionsToCheck = tableToPartitions.get(tableOrQuery); | ||
break; | ||
case QUERY: | ||
partition = Collections.singletonMap( | ||
JdbcSourceConnectorConstants.QUERY_NAME_KEY, | ||
JdbcSourceConnectorConstants.QUERY_NAME_VALUE | ||
); | ||
tablePartitionsToCheck = new ArrayList<>(); | ||
tablePartitionsToCheck.add(partition); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion to replace these two lines with:
|
||
break; | ||
default: | ||
throw new ConnectException("Unexpected query mode: " + queryMode); | ||
} | ||
Map<String, Object> offset = offsets == null ? null : offsets.get(partition); | ||
|
||
//select the first matching offset | ||
// This helps in handling any backward compatible offset changes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a better comment:
|
||
Map<String, Object> offset = null; | ||
if (offsets != null) { | ||
for (Map<String, String> toCheckPartition : tablePartitionsToCheck) { | ||
offset = offsets.get(toCheckPartition); | ||
if (offset != null) { | ||
log.debug("Found offset {} for partition {}", offsets, toCheckPartition); | ||
break; | ||
} | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is looking for a single protocol amongst all of the topics, yet it is breaking as soon as it finds the first. This might be easier to understand if we put that into a separate method, with JavaDoc that explains what it is doing and the assumptions it is making. I'm not sure that the assumption about protocol versions might be valid. Imagine a connector runs with table A and B, and offsets are both stored. Then the connector is changed to use table A only and restarted. The offsets for table A and B both use protocol V0. Then the connector is upgraded to use protocol V1, and this updates the offsets for table A. Then the connector is changed to also consume from table B, but now the offsets for table A are V1 while the offsets for table B are still V1. Is it not sufficient to always be able to read the offsets for each partition using that partition's protocol version? Obviously all future offsets would be written with the latest protocol version. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, maybe this is simply trying to find the latest protocol version to use for writes? We definitely need to extract this to a separate method and have good JavaDoc. Why not always write using the most recent protocol version? |
||
|
||
String topicPrefix = config.getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG); | ||
|
||
|
@@ -216,6 +235,24 @@ public void start(Map<String, String> properties) { | |
running.set(true); | ||
} | ||
|
||
private List<Map<String, String>> possibleTablePartitions(String table) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add JavaDoc so that we better explain what this method is doing, especially around what the returned list means/contains. |
||
TableId tableId = dialect.parseTableIdentifier(table); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding a comment:
|
||
String fqn = ExpressionBuilder.create().append(tableId, false).toString(); | ||
Map<String, String> partitionWithFqn = new HashMap<>(); | ||
partitionWithFqn.put(JdbcSourceConnectorConstants.TABLE_NAME_KEY, fqn); | ||
partitionWithFqn.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION_KEY, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should call:
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding a comment:
|
||
Map<String, String> partition = Collections.singletonMap( | ||
JdbcSourceConnectorConstants.TABLE_NAME_KEY, | ||
tableId.tableName() | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should call:
|
||
return Arrays.asList(partitionWithFqn, partition); | ||
} | ||
|
||
@Override | ||
public void stop() throws ConnectException { | ||
try { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import java.sql.Timestamp; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
|
@@ -171,9 +172,15 @@ public SourceRecord extractRecord() throws SQLException { | |
final Map<String, String> partition; | ||
switch (mode) { | ||
case TABLE: | ||
String name = tableId.tableName(); // backward compatible | ||
partition = Collections.singletonMap(JdbcSourceConnectorConstants.TABLE_NAME_KEY, name); | ||
topic = topicPrefix + name; | ||
String name = tableId.tableName(); | ||
topic = topicPrefix + name;// backward compatible | ||
String fqn = ExpressionBuilder.create().append(tableId, false).toString(); | ||
partition = new HashMap<>(); | ||
partition.put(JdbcSourceConnectorConstants.TABLE_NAME_KEY, fqn); | ||
partition.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION_KEY, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't we talk about putting this logic into a helper method? Maybe an
|
||
break; | ||
case QUERY: | ||
partition = Collections.singletonMap(JdbcSourceConnectorConstants.QUERY_NAME_KEY, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,9 +34,18 @@ public class JdbcSourceTaskTestBase { | |
|
||
protected static String SINGLE_TABLE_NAME = "test"; | ||
protected static Map<String, Object> SINGLE_TABLE_PARTITION = new HashMap<>(); | ||
protected static Map<String, Object> SINGLE_TABLE_PARTITION_WITH_VERSION = new HashMap<>(); | ||
|
||
static { | ||
SINGLE_TABLE_PARTITION.put(JdbcSourceConnectorConstants.TABLE_NAME_KEY, SINGLE_TABLE_NAME); | ||
SINGLE_TABLE_PARTITION_WITH_VERSION.put( | ||
JdbcSourceConnectorConstants.TABLE_NAME_KEY, | ||
SINGLE_TABLE_NAME | ||
); | ||
SINGLE_TABLE_PARTITION_WITH_VERSION.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION_KEY, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could these could call |
||
} | ||
|
||
protected static EmbeddedDerby.TableName SINGLE_TABLE | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should be "protocol" rather than "version", since the latter might be mistaken as the Connect version.