Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend offset support for other streams #5359

Closed
mcvsubbu opened this issue May 10, 2020 · 4 comments · Fixed by #5549
Closed

Extend offset support for other streams #5359

mcvsubbu opened this issue May 10, 2020 · 4 comments · Fixed by #5549

Comments

@mcvsubbu
Copy link
Contributor

Pinot code stopped making reference to Kafka back in 0.1.0 days. HLC can support pretty much any stream. LLC it still uses one property of streams in the code in its raw form -- the offset of a stream message within a partition.

This is assumed to be a long (8 bytes). It appears as so in Segment ZK metadata, maintained as long in the stream consumers, and expected to be a long (primitive) in all the consuming interfaces.

This works fine with Kafka, Eventhub and such, but is not so with some of the other streams.

We need to extend the code to support more generic offsets. The support for this has to be done somewhat carefully since it can break backward compatibility and cause production outage. It is better to do it in smaller steps, making sure that we are not breaking anything.

Offsets are NOT stored in on-disk segment metadata (good!)

Broadly, the usage of offset is in these areas:

  1. The controller queries the stream's metadata to get the offset in each partition of the stream. The controller writes this offset into segment metadata as the starting offset of each realtime segment. Further, the controller also writes the offset into zk segment metadata when the segment completes.
  2. The server uses the offset to request the stream partition yo return messages starting with that offset.
  3. The server and controller exchange the offset value (as a long) in the segment completion protocol.

The broad set of steps are as follows (but the devil is in the details, and we will know better as we move along):

  1. Change long into a class (StreamPartitionMsgOffset? -- must be Comparable and Serializable) in all places except Kafka-specific areas. For now, use LongOffset as the sub-class implementing this interface. Don't change any persistent code as yet.
  2. Change stream consumer interface to support the StreamPartitionMsgOffset class instead of a long (both metadata fetcher and data consumer interfaces).
  3. Change the Segment Completion Protocol to add an additional serialized element into the protocol. Both controller and server will pay attention to the new element if it is present. Since we will be serializing LongOffset class, it should work well. The sender should include both raw form and serialized form in the protocol. The receiver chooses serialized if available, and falls back to raw if not.
  4. Change the segment metadata in zk to include serialized offset (in a new field). The deser will pick the serialized form if available, otherwise choose the long offset.
  5. Over time, remove the us of long in persistent data.
@mcvsubbu mcvsubbu changed the title Extend support for other streams Extend offset support for other streams May 11, 2020
mcvsubbu added a commit to mcvsubbu/incubator-pinot that referenced this issue Jun 2, 2020
…gOffset

Updated the segment commit protocol so that new element streamPartitionMsgOffset
is populated in requests (as request parameters) and in response (as JSON string element)

The server side has been modified to send the 'streamPartitionMsgOffset' as well as we the
'offset' parameters to the controller. The controller looks for and prefers streamPartitionMsgOffset
but falls back to offset if the streamPartitionMsgOffset is not there.

The controller side, in the repsonse message, populates both of the elements, and the server on
the receiver side does likewise -- preferring streamPartitionMsgOffset.

All callers into the protocol module have been modified to NOT set the offset field. Instead,
only set the streamPartitionMsgOffset field. The 'offset' value will be derived from
streamPartitionMsgOffset.

Added a test to make sure that the controller always generates both elements. Such a test was not
possible in the server side at this time, so verified manually.

Manually ran LLCClusterIntergrationTest by disabling populating `streamPartitionMsgOffset` on the
server side (old server/new controller) and on the controller respons side (new server/old controller)

Issue apache#5359
mcvsubbu added a commit that referenced this issue Jun 3, 2020
#5486)

* Changed the segment commit protocol to send/receive streamPartitionMsgOffset

Updated the segment commit protocol so that new element streamPartitionMsgOffset
is populated in requests (as request parameters) and in response (as JSON string element)

The server side has been modified to send the 'streamPartitionMsgOffset' as well as we the
'offset' parameters to the controller. The controller looks for and prefers streamPartitionMsgOffset
but falls back to offset if the streamPartitionMsgOffset is not there.

The controller side, in the repsonse message, populates both of the elements, and the server on
the receiver side does likewise -- preferring streamPartitionMsgOffset.

All callers into the protocol module have been modified to NOT set the offset field. Instead,
only set the streamPartitionMsgOffset field. The 'offset' value will be derived from
streamPartitionMsgOffset.

Added a test to make sure that the controller always generates both elements. Such a test was not
possible in the server side at this time, so verified manually.

Manually ran LLCClusterIntergrationTest by disabling populating `streamPartitionMsgOffset` on the
server side (old server/new controller) and on the controller respons side (new server/old controller)

Issue #5359

* Addressed review comments
mcvsubbu added a commit to mcvsubbu/incubator-pinot that referenced this issue Jun 5, 2020
- Changed StreamPartitionMsgOffset class to be an interface.
- Introduced a LongMsgOffsetFactory class that can be used for Kafka. Other
  streams will need to provide their own factory to create offsets of
  various types. Keeping LongMsgOffsetFactory in spi, so we can also use
  it in tests. Alternative was to introduce this for each test, but implement
  one in kafka that does the same thing.
- Introduced a new config 'stream.<type>.partition.offset.factory.class.name'
  Stream providers need to set the offset factory class with this config key.
- All classes now use StreamPartitionMsgOffset instead of 'long', except for
  cases where the offset is received from the stream or the offset is being
  read or written to persistent zk metadata.
- Marked TODOs explicity on items still to be done to complete impleemntation
  of generic offsets.

Issue apache#5359
mcvsubbu added a commit that referenced this issue Jun 9, 2020
* Moved StreamPartitionMsgOffset to be an interface

- Changed StreamPartitionMsgOffset class to be an interface.
- Introduced a LongMsgOffsetFactory class that can be used for Kafka. Other
  streams will need to provide their own factory to create offsets of
  various types. Keeping LongMsgOffsetFactory in spi, so we can also use
  it in tests. Alternative was to introduce this for each test, but implement
  one in kafka that does the same thing.
- Introduced a new config 'stream.<type>.partition.offset.factory.class.name'
  Stream providers need to set the offset factory class with this config key.
- All classes now use StreamPartitionMsgOffset instead of 'long', except for
  cases where the offset is received from the stream or the offset is being
  read or written to persistent zk metadata.
- Marked TODOs explicity on items still to be done to complete impleemntation
  of generic offsets.

Issue #5359

* Enabled parsing stream partition msg offset in protocol

And also added a backward cmpatibility test

* Fix unit test failure
mcvsubbu added a commit to mcvsubbu/incubator-pinot that referenced this issue Jun 11, 2020
Changed the interface exposed by streams to return StreamPartitionMsgOffset
instead of long.

Also changed the LLCRealtimeSegmentZKMetadat class to return String instead
of long offsets, since it is stored as a String anyway. Luckily zk metadata does
not generate java objects from json automatically (instead, parses the
fields one by one via custom code).

The segment endOffset can now be null in LLCRealtimeSegmentZKMetadata. For
backward compatibility, we store Long.toSring(Long.MAX_VALUE) in zookeeper
so that readers of the metadata do not get NPE if they are still running old
version. We will remove this workaround after we release 0.5.0

The segment completion protocol for realtime LLC segment completion will
continue to use both 'offset' and 'streamPartitionOffset' elements for
one more release, and will move to use the latter completely in 0.6.0

Interfaces PartitionLevelConsumer and StreamMetadataProvider (marked STABLE)
are now deprecating the calls that use long offsets. Instead, new calls have
been added that use StreamPartitionMsgOffset. Kafka Plugins have been updated
to use the new calls, but the old methods are preserved to make sure that any
other Kafka implementations have time to move over. These calls will be removed
in 0.6.0

The metric to show the highest offset consumed has disappeared.

Testing done:
Manually verified that LLCRealtimeClusterIntegrationTest completes segments correctly with
old server and new controller as well as with new server and old controller, using
command line starter for controller and server to start a component at a specific
revision after 0.4.0 release.

This PR concludes the work for this Issue

Issue apache#5359
mcvsubbu added a commit that referenced this issue Jun 11, 2020
Changed the interface exposed by streams to return StreamPartitionMsgOffset
instead of long.

Also changed the LLCRealtimeSegmentZKMetadat class to return String instead
of long offsets, since it is stored as a String anyway. Luckily zk metadata does
not generate java objects from json automatically (instead, parses the
fields one by one via custom code).

The segment endOffset can now be null in LLCRealtimeSegmentZKMetadata. For
backward compatibility, we store Long.toSring(Long.MAX_VALUE) in zookeeper
so that readers of the metadata do not get NPE if they are still running old
version. We will remove this workaround after we release 0.5.0

The segment completion protocol for realtime LLC segment completion will
continue to use both 'offset' and 'streamPartitionOffset' elements for
one more release, and will move to use the latter completely in 0.6.0

Interfaces PartitionLevelConsumer and StreamMetadataProvider (marked STABLE)
are now deprecating the calls that use long offsets. Instead, new calls have
been added that use StreamPartitionMsgOffset. Kafka Plugins have been updated
to use the new calls, but the old methods are preserved to make sure that any
other Kafka implementations have time to move over. These calls will be removed
in 0.6.0

The metric to show the highest offset consumed has disappeared.

Testing done:
Manually verified that LLCRealtimeClusterIntegrationTest completes segments correctly with
old server and new controller as well as with new server and old controller, using
command line starter for controller and server to start a component at a specific
revision after 0.4.0 release.

This PR concludes the work for this Issue

Issue #5359
@mcvsubbu mcvsubbu linked a pull request Jun 12, 2020 that will close this issue
3 tasks
@mcvsubbu mcvsubbu reopened this Jun 12, 2020
@mcvsubbu
Copy link
Contributor Author

Keeping the issue open until @KKcorps verifies things

@KKcorps
Copy link
Contributor

KKcorps commented Jun 16, 2020

@mcvsubbu The StreamPartitionMsgOffsetFactory interface expects a createMaxOffset method to be implemented. But for kinesis, the offsets are string and they can go out of bounds for Long or any other data type.

@mcvsubbu
Copy link
Contributor Author

@KKcorps I have removed that need. Please pull the latest code

@KKcorps
Copy link
Contributor

KKcorps commented Jun 16, 2020

Checked it just now. Looks good to me!

npawar added a commit that referenced this issue Apr 7, 2021
Part 2 of Kinesis implementation changes. This #6518 is the main PR, which is being split up into 3 parts in order to reduce the scope of review.

In this PR, An abstraction has been introduced for a group of partition/shards, and each consumer will now be responsible for a PartitionGroup instead of just a partitionId (though in first iteration, the PartitionGroup will only contain 1 partition/shard). It includes corresponding additions to interfaces and some new interfaces.

Kafka stream should function as is, with no changes needed.

Note that before upgrading to this change, installations should upgrade to at least the release that has #5359 (https://docs.pinot.apache.org/basics/releases/0.5.0). An installation that has older versions than this may break during upgrade.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants