-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14491: [3/N] Add logical key value segments #13143
Conversation
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
open = false; | ||
closeOpenIterators(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the store was never open, it seems it's still safe to call closeOpenIterators
and it should just be an empty list? -- Could we inline the code into close()
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I don't feel strongly so I made the change. Besides guarding against closing a segment which was never opened, the usage of open
also guarded against closing the same segment twice. I've inlined closeOpenIterators()
and accounted for this by clearing openIterators
after it's copied.
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
Bytes getPrefix() { | ||
return Bytes.wrap(prefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How large is the overhead to call wrap()
(besides that it create a new object, what does it do?)
We pass in Bytes prefix
in the constructor and seem if we keep a reference, we could just return it (without the need to unwrap in the constructor and re-wrap here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap()
just creates the new object (after performing a null check) so it's very lightweight.
It's more convenient to keep prefix
as byte[]
than Bytes
because all the other operations require byte[]
rather than Bytes
. If we really wanted we could keep both (one copy as byte[]
and another as Bytes
) but that feels like overkill.
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax for the speedy review! Addressed your comments in the latest commit.
} | ||
|
||
open = false; | ||
closeOpenIterators(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I don't feel strongly so I made the change. Besides guarding against closing a segment which was never opened, the usage of open
also guarded against closing the same segment twice. I've inlined closeOpenIterators()
and accounted for this by clearing openIterators
after it's copied.
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
Bytes getPrefix() { | ||
return Bytes.wrap(prefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap()
just creates the new object (after performing a null check) so it's very lightweight.
It's more convenient to keep prefix
as byte[]
than Bytes
because all the other operations require byte[]
rather than Bytes
. If we really wanted we could keep both (one copy as byte[]
and another as Bytes
) but that feels like overkill.
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
Outdated
Show resolved
Hide resolved
Seems some of you newly added tests fail. Can you have a look. |
Thanks. Needed to update the test for the latest changes which now set |
Checkstyle error:
|
Merged the other PR -- can you rebase this one? |
82f47a6
to
9aff136
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor follow up on tests.
segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8)); | ||
segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8)); | ||
|
||
assertEquals("a", getAndDeserialize(segment1, "1")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also get
on the physical store to see if the logic works as expected? (Also for other tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was on the fence about this because it requires testing the internals of the class (i.e., specifically how the segment prefixes are serialized) rather than just the public-facing methods. In the end I opted to test indirectly instead, by inserting the same keys into different segments and checking that their values do not collide.
If you prefer checking the contents of the physical store itself, I can make the update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point, but the test does not really achieve this, as we put the same data into both segment? To test "segment isolation" we would need to put 4 different record (2 per segment) and test both positive (put on s1 allows use to get on s1) and negative (put on s1, does not allow get on s2 to see the data)?
Might apply to other tests, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point. That's definitely a gap in shouldPut()
and shouldPutAll()
. All of the other tests are already set up in a way that they fail if segments are not properly isolated from each other. Just pushed a fix to the two tests which didn't ensure that, and some minor cleanup to a few of the other tests.
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java
Outdated
Show resolved
Hide resolved
expectedContents.add(kv0); | ||
expectedContents.add(kv1); | ||
|
||
try (final KeyValueIterator<Bytes, byte[]> iterator = segment1.range(null, new Bytes(STRING_SERIALIZER.serialize(null, "1")))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we test different ranges? All lower and upper bound null/not-null combination?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh, this additional test coverage caught a bug. Pushed a fix in the latest commit.
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
Outdated
Show resolved
Hide resolved
|
||
@Test | ||
public void shouldCreateSegments() { | ||
final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call getOrCreateSegment
instead? Otherwise we mainly test the logic of AbstractSegments
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
} | ||
|
||
@Test | ||
public void shouldCleanupSegmentsThatHaveExpired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sound like we test AbstractSegments
logic here -- do we need to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that these tests are testing logic from AbstractSegments and not anything specific about LogicalKeyValueSegments. The thing is, AbstractSegments doesn't have its own test file at the moment (I assume because it's abstract). If you think it's worth it, I can remove these tests from here and also from KeyValueSegmentsTest.java, and create a dummy AbstractSegments implementation to add an AbstractSegmentsTest.java. I'd like to do that as a follow-up PR instead of as part of this change, though.
(Also, for this specific test, I would like to have it here because I plan to refactor the cleanup logic in AbstractSegments in a follow-up PR. The current approach (cleanup as part of getOrCreateSegmentIfLive()
) is not very efficient for the versioned store use case because this method is called multiple times during a single put operation. It will be better to only perform cleanup once per put.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you think it's worth it, I can remove these tests from here and also from KeyValueSegmentsTest.java, and create a dummy AbstractSegments implementation to add an AbstractSegmentsTest.java. I'd like to do that as a follow-up PR instead of as part of this change, though.
Sounds cleaner to me. And yes, follow up PR is preferable.
} | ||
|
||
@Test | ||
public void shouldGetSegmentForTimestamp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ab above
} | ||
|
||
@Test | ||
public void shouldGetSegmentsWithinTimeRange() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ab above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review, @mjsax ! Responded to your comments inline. Will push a commit with the latest changes soon.
segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8)); | ||
segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8)); | ||
|
||
assertEquals("a", getAndDeserialize(segment1, "1")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was on the fence about this because it requires testing the internals of the class (i.e., specifically how the segment prefixes are serialized) rather than just the public-facing methods. In the end I opted to test indirectly instead, by inserting the same keys into different segments and checking that their values do not collide.
If you prefer checking the contents of the physical store itself, I can make the update.
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void shouldCleanupSegmentsThatHaveExpired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that these tests are testing logic from AbstractSegments and not anything specific about LogicalKeyValueSegments. The thing is, AbstractSegments doesn't have its own test file at the moment (I assume because it's abstract). If you think it's worth it, I can remove these tests from here and also from KeyValueSegmentsTest.java, and create a dummy AbstractSegments implementation to add an AbstractSegmentsTest.java. I'd like to do that as a follow-up PR instead of as part of this change, though.
(Also, for this specific test, I would like to have it here because I plan to refactor the cleanup logic in AbstractSegments in a follow-up PR. The current approach (cleanup as part of getOrCreateSegmentIfLive()
) is not very efficient for the versioned store use case because this method is called multiple times during a single put operation. It will be better to only perform cleanup once per put.)
Today's KeyValueSegments create a new RocksDB instance for each KeyValueSegment. This PR introduces an analogous LogicalKeyValueSegments implementation, with corresponding LogicalKeyValueSegment, which shares a single physical RocksDB instance across all "logical" segments. This will be used for the RocksDB versioned store implementation proposed in KIP-889.
Committer Checklist (excluded from commit message)