-
Notifications
You must be signed in to change notification settings - Fork 954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CC-2116: Fix offsets compatibility #436
Conversation
It looks like @mageshn hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
@rhauch I have not cleaned up the code yet and haven't added specific test yet. But wanted to get some initial feedback on the approach before spending time on those. Between, the test that was system test that was failing passes with this fix. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few suggestions. Biggest questions are:
- why are we assigning multiple source partitions per table to the partitions map?
- why not just read each source partition and offset given whatever the protocol, and then always write new partitions using the latest protocol?
@@ -126,6 +126,7 @@ public void start(Map<String, String> properties) { | |||
throw new ConnectException("Unknown query mode: " + queryMode); | |||
} | |||
offsets = context.offsetStorageReader().offsets(partitions); | |||
log.trace("The partition offsets are {}", offsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be nice at DEBUG level
break; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is looking for a single protocol amongst all of the topics, yet it is breaking as soon as it finds the first. This might be easier to understand if we put that into a separate method, with JavaDoc that explains what it is doing and the assumptions it is making.
I'm not sure that the assumption about protocol versions might be valid. Imagine a connector runs with table A and B, and offsets are both stored. Then the connector is changed to use table A only and restarted. The offsets for table A and B both use protocol V0. Then the connector is upgraded to use protocol V1, and this updates the offsets for table A. Then the connector is changed to also consume from table B, but now the offsets for table A are V1 while the offsets for table B are still V1.
Is it not sufficient to always be able to read the offsets for each partition using that partition's protocol version? Obviously all future offsets would be written with the latest protocol version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, maybe this is simply trying to find the latest protocol version to use for writes? We definitely need to extract this to a separate method and have good JavaDoc.
Why not always write using the most recent protocol version?
toCheckPartition.getOrDefault( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ZERO | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to have the TRACE log message after getting the protocol version, and then including the protocol version in the log message.
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE) | ||
; | ||
partitions.add(partitionWithFqn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding both the single table name and FQN as separate partitions in the map?
String name = tableId.tableName(); // backward compatible | ||
partition = Collections.singletonMap(JdbcSourceConnectorConstants.TABLE_NAME_KEY, name); | ||
topic = topicPrefix + name; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like all of this could be extracted into a strategy function to compute the topic given the table ID (e.g., topicName(TableId)
), and a second method to compute the partitions for a table ID (e.g., sourcePartitions(TableId)
). We could have separate strategy implementations for each protocol version, and a particular strategy implementation can be passed into this class.
SINGLE_TABLE_NAME); | ||
SINGLE_TABLE_PARTITION_WITH_VERSION.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION, | ||
"1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should all be moved into the PartitionProtocols class, which can have static instances of the strategy for each protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: the );
should be on the next line.
silly question: what is the problem we are solving in this PR? looks like it is related to CC-2116. Might be good to add a description above. thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments. Thanks, @mageshn!
partitionWithFqn.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE) | ||
; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the )
must be on the line with the semicolon.
SINGLE_TABLE_NAME); | ||
SINGLE_TABLE_PARTITION_WITH_VERSION.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION, | ||
"1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: the );
should be on the next line.
expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION)); | ||
expectInitializeNoOffsets(Arrays.asList( | ||
SINGLE_TABLE_PARTITION, | ||
SINGLE_TABLE_PARTITION_WITH_VERSION)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the );
should be on the next line. multiple occurrences in this file.
@wicknicks thanks for your comments but you got too early to the PR before it was GA :) |
@rhauch I'm going to be adding a few more unit tests. But the latest changes gave me a green run for all the Mysql System tests |
@rhauch @wicknicks Do you want to take a pass at this now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. A few comments/suggestions.
@@ -20,4 +20,6 @@ | |||
public static final String TABLE_NAME_KEY = "table"; | |||
public static final String QUERY_NAME_KEY = "query"; | |||
public static final String QUERY_NAME_VALUE = "query"; | |||
public static final String OFFSET_PROTOCOL_VERSION_KEY = "version"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should be "protocol" rather than "version", since the latter might be mistaken as the Connect version.
@@ -105,6 +109,8 @@ public void start(Map<String, String> properties) { | |||
? Collections.singletonList(query) : tables; | |||
|
|||
String mode = config.getString(JdbcSourceTaskConfig.MODE_CONFIG); | |||
//used only in table mode | |||
Map<String, List<Map<String, String>>> tableToPartitions = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think partitionsByTableFqn
would be a better name.
Map<String, String> partition = | ||
Collections.singletonMap(JdbcSourceConnectorConstants.TABLE_NAME_KEY, table); | ||
partitions.add(partition); | ||
List<Map<String, String>> tablePartitions = possibleTablePartitions(table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a comment here that describes what we're doing:
// Find possible partition maps for different offset protocols ...
break; | ||
case QUERY: | ||
partition = Collections.singletonMap( | ||
JdbcSourceConnectorConstants.QUERY_NAME_KEY, | ||
JdbcSourceConnectorConstants.QUERY_NAME_VALUE | ||
); | ||
tablePartitionsToCheck = new ArrayList<>(); | ||
tablePartitionsToCheck.add(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion to replace these two lines with:
tablePartitionsToCheck = Collections.singleton(partition);
Map<String, Object> offset = offsets == null ? null : offsets.get(partition); | ||
|
||
//select the first matching offset | ||
// This helps in handling any backward compatible offset changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a better comment:
// The partition map varies by offset protocol. Since we don't know which protocol each table's offsets
// are keyed by, we need to use the different possible partitions (which are in newest protocol version first)
// to find the actual offsets for each table.
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION_KEY, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding a comment:
// Define source partition map for original offset protocol (V0) that uses the table name
partition.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION_KEY, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't we talk about putting this logic into a helper method? Maybe an OffsetProtocols
class with static methods:
Map<String, String> sourcePartitionForProtocolV1(TableId table) { ... }
Map<String, String> sourcePartitionForProtocolV0(TableId table) { ... }
Map<String, String> partition = Collections.singletonMap( | ||
JdbcSourceConnectorConstants.TABLE_NAME_KEY, | ||
tableId.tableName() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should call:
Map<String, String> partition = OffsetProtocols.sourcePartitionForProtocolV0(tableId)`;
partitionWithFqn.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION_KEY, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should call:
Map<String, String> partition = OffsetProtocols.sourcePartitionForProtocolV1(tableId)`;
SINGLE_TABLE_PARTITION_WITH_VERSION.put( | ||
JdbcSourceConnectorConstants.OFFSET_PROTOCOL_VERSION_KEY, | ||
JdbcSourceConnectorConstants.PROTOCOL_VERSION_ONE | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could these could call OffsetProtocols
methods?
@rhauch I have incorporated the comments. Would you like to take another pass at it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. A few more nits.
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class OffsetProtocols { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaDoc please?
|
||
public class OffsetProtocols { | ||
|
||
public static Map<String, String> sourcePartitionForProtocolV1(TableId tableId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaDoc please?
return partitionForV1; | ||
} | ||
|
||
public static Map<String, String> sourcePartitionForProtocolV0(TableId tableId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaDoc please?
topic = topicPrefix + name; | ||
String name = tableId.tableName(); | ||
topic = topicPrefix + name;// backward compatible | ||
partition = OffsetProtocols.sourcePartitionForProtocolV1(tableId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this constant for this querier instance? Can we initialize this once in the constructor and then simply reuse the same map over and over?
} | ||
|
||
@Test | ||
public void testTimestampRestore() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is testing when we have offsets with both protocols. Is that worth having the method include that information? E.g., testTimestampRestoreOffsetsWithMultipleProtocol()
Collections.singletonMap(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()) | ||
); | ||
} | ||
|
||
@Test | ||
public void testTimestampAndIncrementingRestoreOffset() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is testing when we have offsets with both protocols. Is that worth having the method include that information? E.g., testTimestampAndIncrementingRestoreOffsetWithMultipleProtocol()
Collections.singletonMap(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()) | ||
); | ||
} | ||
|
||
@Test | ||
public void testAutoincrementRestoreOffset() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is testing when we have offsets with both protocols. Is that worth having the method include that information? E.g., testAutoincrementRestoreOffsetsWithMultipleProtocol()
Map<Map<String, String>, Map<String, Object>> offsets = new HashMap<>(); | ||
offsets.put(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()); | ||
offsets.put(SINGLE_TABLE_PARTITION, oldOffset.toMap()); | ||
testTimestampRestoreOffset(offsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a comment here to say that we want to always use the offset with the latest protocol found?
Map<Map<String, String>, Map<String, Object>> offsets = new HashMap<>(); | ||
offsets.put(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()); | ||
offsets.put(SINGLE_TABLE_PARTITION, oldOffset.toMap()); | ||
testTimestampAndIncrementingRestoreOffset(offsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a comment here to say that we want to always use the offset with the latest protocol found?
Map<Map<String, String>, Map<String, Object>> offsets = new HashMap<>(); | ||
offsets.put(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()); | ||
offsets.put(SINGLE_TABLE_PARTITION, oldOffset.toMap()); | ||
testAutoincrementRestoreOffset(offsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a comment here to say that we want to always use the offset with the latest protocol found?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the fix!
With the dialect changes introduced in 5.0.x, the JDBCSourceTask always gets the table names as fully qualified table names. In the offset management, the connector was using the unqualified table name while writing the offsets and fully qualified table name for reading the offsets. Because of this when a connector restarts, it never finds the stored offsets and hence always starts from the beginning. This issue occurs only for TABLE mode.
The solution is to include a backward compatible offset management. We do the following
This PR doesn't handle any changes to topic names. The topic names will continue to be unqualified table name to be backward compatible. If there are multiple tables, matching the intent is to error out and will be implemented in a separate PR.