New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector #13945
KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector #13945
Conversation
…eConnector and the FileStreamSinkConnector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey thanks for handling this, Yash!
I had one comment about validating filenames from previous configurations.
|
||
// This connector makes use of a single source partition which represents the file that it is configured to read from | ||
if (offsets.size() > 1) { | ||
throw new ConnectException("The " + FileStreamSourceConnector.class.getSimpleName() + " supports only a single source partition / file"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppose that:
- The connector is configured with one file
a
- The task commits offsets for
a
- The connector is reconfigured for a different file
b
- The task commits offsets for
b
- A user calls
GET /offsets
and sees botha
andb
- A user calls
DELETE /offsets
witha
, ora
andb
in the same request.
I think in this situation, it is reasonable for the delete to succeed; The user clearly intends to delete the old offset, and we shouldn't force them to revert the configuration to contain a
to delete that offset. Also, a PATCH /offsets
which clears a
and updates b
in the same request should succeed.
I think it is reasonable to fail a request which updates a
, since that is not an offset which the current configuration could produce.
So in other words: If the value is non-null, then we should validate the key. If the value is null, then any filename should be allowed, since it might have been from a previous configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wow, that's a really good catch, thanks! One point to note is that in a DELETE /offsets
request for a source connector, users don't specify the partitions / offsets (we write null
offsets for every known partition).
Thinking about this a bit more, we shouldn't fail the request even if there are multiple partitions with non-null values. Taking the same example you used, if a user calls GET /offsets
and copies the response body as the request body for a PATCH /offsets
request while only modifying the offset value for file b
, I don't think it makes sense to fail that request. Also, requests that don't even include the current source partition are also fairly harmless. I've relaxed the validation criteria in the latest patch.
…for multiple source partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yash! Left some thoughts.
BTW, have you also filed a ticket to implement these APIs in MM2? If not, no worries; I'd be happy to take that work on if you don't have the cycles.
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
Outdated
Show resolved
Hide resolved
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
Outdated
Show resolved
Hide resolved
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
Outdated
Show resolved
Hide resolved
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
Outdated
Show resolved
Hide resolved
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Chris, I've addressed your review comments and created this ticket for MM2 (but haven't self-assigned it).
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
Outdated
Show resolved
Hide resolved
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
Outdated
Show resolved
Hide resolved
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yash! Still have some thoughts. Also wondering if maybe it's time we add integration tests for these connectors, given possible interactions between the Connect runtime and the file Connector
and Task
classes that are tricky to test. Definitely not a blocker for this PR but interested in your take.
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
Outdated
Show resolved
Hide resolved
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
Outdated
Show resolved
Hide resolved
|
||
// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value | ||
try { | ||
long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this too permissive? Based on the testAlterOffsetsOffsetPositionValues
test case, this would allow values of both "10"
(string) and 10
(number) for the offset. But it looks like the source task class would only work with numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good catch, thanks. I think it might probably be a bit friendlier if we update the task class instead to do similar parsing, WDYT? I'm okay either way, since the most common use case would be copy pasting the output from GET /offsets
and modifying it in which case users would end up using a number rather than a string anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to leave the task parsing the same; less work on our part, and less risk of a regression in existing parts of the code base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found something interesting when experimenting with this. When passing a request body to the PATCH /offsets
endpoint like:
{
"offsets": [
{
"partition": {
"filename": "/path/to/filename"
},
"offset": {
"position": 20
}
}
]
}
the position 20
is deserialized to an Integer
by Jackson (the JSON library we use in the REST layer for Connect) which seems fine because JSON doesn't have separate types for 32 bit and 64 bit numbers. So, the offsets map that is passed to FileStreamSourceConnector::alterOffsets
by the runtime also contains 20
as an Integer
value. I initially thought that this would cause the FileStreamSourceTask
to fail at startup because it uses an instanceof Long
check here (and an Integer
value is obviously not an instance of Long
). However, interestingly, the task did not fail and doing some debugging revealed that after the offsets are serialized and deserialized by the JsonConverter
in OffsetsStorageWriter
(in Worker::modifySourceConnectorOffsets
) and OffsetsStorageReader
respectively, the offsets map that is retrieved by the task on startup through its context contains the position 20
as a Long
value.
While this particular case is easily handled by simply accepting Integer
values as valid in the FileStreamSourceConnector::alterOffsets
method, I'm thinking we probably need to make some changes so that the offsets map passed to source connectors in their alterOffsets
method is the same as the offsets map that connectors / tasks will retrieve via the OffsetsStorageReader
from their context (otherwise, this could lead to some hard to debug issues in other connectors implementing the SourceConnector::alterOffsets
method). The easiest way off the top of my head would probably would be to serialize and deserialize the offsets map using the JsonConverter
before invoking SourceConnector::alterOffsets
. WDYT?
Furthermore, just checking whether the offset position is an instance of Long
(Jackson uses a Long
if the number doesn't fit in an Integer
) or Integer
in the FileStreamSourceConnector::alterOffsets
method seems sub-optimal because:
- To someone just reading through the
FileStreamSourceConnector
andFileStreamSourceTask
classes, acceptingInteger
instances during validation but requiringLong
instances in the actual task would look like a bug since the serialization + deserialization aspect isn't transparent. - It's an extremely unlikely scenario, but any changes in Jackson's deser logic could break things here - for instance, if smaller numbers are deserialized into
Short
s instead ofInteger
s. Parsing using a combination ofString::valueOf
andLong::parseLong
seems a lot more robust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, nice catch! I noticed the discrepancy in numeric types while working on KAFKA-15177 but hadn't even considered the possibility of aligning the types across invocations of alterOffsets
and OffsetStorageReader::offset
/OffsetStorageReader::offsets
.
I think re-deserializing the offsets before passing them to alterOffsets
is a great idea. Unless the request body is gigantic there shouldn't be serious performance concerns, and it also acts as a nice preflight check to ensure that the offsets can be successfully propagated to the connector's tasks through the offsets topic.
I still don't love permitting string types for the connector's position
offset values--it doesn't seem like a great endorsement of our API if we have to implement workarounds in the file connectors, which are the first example of the connector API that many developers see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the type alignment issue seemed like a broader one (i.e. not scoped to the file connectors being touched here), I've created a separate ticket and PR for it.
it doesn't seem like a great endorsement of our API if we have to implement workarounds in the file connectors, which are the first example of the connector API that many developers see.
I'd argue that it isn't really a workaround and that the current check itself is bad. If the (de)serialization happened to use Integer
for values that fit in a 32 bit signed type (which would be perfectly valid and is exactly what we see currently before the values are passed through the converter), the current check in the FileStreamSourceTask
would cause it to bail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... wouldn't that be a pretty serious breaking change if we accidentally switched up how the JSON converter deserializes integer types? Not just for the file source connector, but for plenty of others.
It feels like it might be a better use of our time to make note of this possibility and ensure that we have sufficient unit testing in place to prevent that kind of regression (I suspect we already do but haven't verified this yet).
Of course, because things aren't interesting enough already--it turns out that there's actually two different scenarios in which tasks observe offsets for their connector. The first, which we're all familiar with, is when they query them using an OffsetStorageReader, which in distributed mode reflects the contents of the offsets topic. The second is when SourceTask::commitRecord is invoked, which carries with it the just-ack'd SourceRecord
instance originally provided by the task, including the original in-memory source partition and source offset, which may use types that get lost when written to and read back from the offsets topic.
I don't know if this significantly changes the conversation but it seems subtle and counterintuitive enough to bring up so that we can avoid accidentally breaking connector code that relies on this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... wouldn't that be a pretty serious breaking change if we accidentally switched up how the JSON converter deserializes integer types? Not just for the file source connector, but for plenty of others.
Okay, that's fair enough, I've changed the check in FileStreamSourceConnector::alterOffsets
to mirror the one made in the task at startup for consistency (and avoided making changes in the existing task logic). This does mean that this PR should be merged after #14003 has been merged (assuming that that approach is acceptable).
I don't know if this significantly changes the conversation but it seems subtle and counterintuitive enough to bring up so that we can avoid accidentally breaking connector code that relies on this behavior.
Hm yeah, that's definitely another interesting one to bring up - however, I'd contend that that one kinda makes sense since we're passing the SourceRecord
itself - tasks already deal with SourceRecord
and their offsets (and associated types) in their regular lifecycle. It would be highly confusing if the SourceRecord
that they get in commitRecord
doesn't match the one they dispatched to the framework via poll
. Of course, ideally, the offsets that they read via OffsetStorageReader
should also not have any type mismatches compared to the SourceRecord
ones, but I don't think we'd want to (or safely could) change that at this point.
Since the offsets being altered externally would correspond to the ones that the connector / tasks read at startup, I think it makes sense to align the types across invocations to SourceConnector::alterOffsets
and offsets queried from an OffsetStorageReader
(and an implicit separate alignment between the SourceRecord
's offsets types).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 Sounds good, thanks for thinking this through.
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
Show resolved
Hide resolved
I was toying with the idea of adding ITs for these file connectors in this very PR earlier; but after a bit of tinkering, left it for later 😄 I'd be happy to file a separate ticket for that and assign it to myself. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yash! LGTM, will merge once #14003 lands.
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
Show resolved
Hide resolved
* ak/trunk: (110 commits) MINOR: Update docs to include ZK deprecation notice and information (apache#14031) KAFKA-15091: Fix misleading Javadoc for SourceTask::commit (apache#13948) KAFKA-14669: Use the generated docs for MirrorMaker configs in the doc (apache#13658) KAFKA-14953: Add tiered storage related metrics (apache#13944) KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector (apache#13945) Revert "MINOR: Update .asf.yaml file with refreshed github_whitelist, and collaborators" (apache#14037) MINOR: Update .asf.yaml file with refreshed github_whitelist, and collaborators KAFKA-14737: Move kafka.utils.json to server-common (apache#13585) KAFKA-14647: Move TopicFilter to server-common/utils (apache#13158) MINOR: remove unused variable in examples (apache#14021) ...
…eConnector and the FileStreamSinkConnector (apache#13945) Reviewers: Chris Egerton <chrise@aiven.io>
…eConnector and the FileStreamSinkConnector (apache#13945) Reviewers: Chris Egerton <chrise@aiven.io>
…eConnector and the FileStreamSinkConnector (apache#13945) Reviewers: Chris Egerton <chrise@aiven.io>
Committer Checklist (excluded from commit message)