Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime servers using Low Level Kafka Consumers #178

Closed
mcvsubbu opened this issue Jun 9, 2016 · 12 comments
Closed

Realtime servers using Low Level Kafka Consumers #178

mcvsubbu opened this issue Jun 9, 2016 · 12 comments
Assignees

Comments

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Jun 9, 2016

Motivations:

  • We can improve scalability for tables that have a high ingestion rate.
  • We avoid storing multiple segments containing more or less the same data
  • Ease capacity expansion
@mcvsubbu mcvsubbu self-assigned this Jun 9, 2016
@xiangfu0
Copy link
Contributor

One thing from ops side, we need to handle upstream kafka topic expansion automatically.

@mcvsubbu
Copy link
Contributor Author

@fx19880617 do you mean increase in number of partitions in kafka?

This will not be handled automatically, and will require a downtime, and update of a record in PROPERTYSTORE and then restart.

In any case, changing the number of partitions in kafka dynamically requires a downtime afaik, and the pinot cluster downtime should coincide with the kafka downtime.

@mcvsubbu
Copy link
Contributor Author

Design outline:

We will be adding a new helix state, CONSUMING. For realtime segments, the controller will place them in CONSUMING state first. This state indicates that the segment is not completed on the server side.

With multiple replicas in CONSUMING state (set to consume either by time or number of records), the controller co-ordinates amongst the consumers so that one of them commits the segment (by uploading it), and the others either keep what they have (if they are at the same offset), or catch up to the correct offset, or discard and download the segment from the controller.

The replica instances for each segment is stored in PROPERTYSTORE as a map, when the table is provisioned, and updated when the cluster is expanded (the update requires no downtime).

When a segment is committed, the controller marks the segment state in IDEALSTATE as ONLINE and starts another segment in the CONSUMING state.

The protocol has been validated by prototype as in the branch https://github.com/linkedin/pinot/tree/LLConsumerPrototype

The Broker will be modified to introduce a routing table that routes to the new low-level consumer segments (differentiated by segment naming convention).

Transitioning a cluster from high-level consumer to low-level consumer will not require a downtime. Extra hardware may be required during transition phase.

@xiangfu0
Copy link
Contributor

@mcvsubbu, kafka could handle topic partition expansion without downtime, but not partition shrink.
Since I have multiple realtime tables on same host and it maybe hard to do this maintenance.

@jfim
Copy link
Member

jfim commented Jun 16, 2016

It should be doable to have this expansion handled on the Pinot side. Depending on whether or not there's a way to be notified whenever that cluster expansion happens, we might be able to do the expansion automatically or it might have to be done manually (ie. through some API). I haven't checked the newer Kafka APIs, though I think they're a lot better with this than the old ones.

@xiangfu0
Copy link
Contributor

Also, since we are using simple consumer, it would be better to open api to reset consumer offset to a particular position. So we can skip some bad data or re-consume some old data.

@mcvsubbu
Copy link
Contributor Author

As we move forward,

I discovered that the table config information for realtime tables has "replication" field. For high-level consumers, this value must match the number of instances. For low-level consumers, this value is independent of the number of instances.

I am considering introducing another config variable called replicasPerPartition.

Only realtime table code will pay attention to this value, when low-level consumers are enabled.

Doesn't look pretty, but I don't see an alternative (other than introducing a new config variable). The old one is already called "replication" ...

@kishoreg
Copy link
Member

How about changing existing code to always use number of instances or use
a new variable numConsumerGroups and falls back to replication if it does
not exist.

On Thu, Jul 14, 2016 at 5:10 AM, Subbu Subramaniam <notifications@github.com

wrote:

As we move forward,

I discovered that the table config information for realtime tables has
"replication" field. For high-level consumers, this value must match the
number of instances. For low-level consumers, this value is independent of
the number of instances.

I am considering introducing another config variable called
replicasPerPartition.

Only realtime table code will pay attention to this value, when low-level
consumers are enabled.

Doesn't look pretty, but I don't see an alternative (other than
introducing a new config variable). The old one is already called
"replication" ...


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#178 (comment), or mute
the thread
https://github.com/notifications/unsubscribe/AAz7Z7wDkVya5QvMoZ01EQBdulgFDVeCks5qVXdlgaJpZM4IxkOA
.

@mcvsubbu
Copy link
Contributor Author

I like the idea of changing the existing code to use number of instances, but the exception thrown right now, says "Number of instance in current tenant should be an integer multiples of the number of replications", and there is code (@fx19880617) that balances the same kafka consumer group across multiple instances. Can this even work? I thought Kafka cannot guarantee exactly once delivery when high-level consumers are split across hosts?

Otherwise, numConsumerGroups (essentially same as my proposal of 'replicasPerPartition') works.

@fx19880617, can you comment?

@mcvsubbu
Copy link
Contributor Author

mcvsubbu commented Aug 3, 2016

The plan to migrate deployments from high-level consumers to simple consumers is to start servers (either existing or new) with simple consumers consuming and completing realtime segments. We will change the broker to route a percentage of requests to the simple consumer segments, and the rest to the highlevel consumer segments. Over time, we will increase this percentage to 100, and then stop the old high-level consumers.

In order for us to do this, the code in HelixExternalViewBasedRouting needs to be modified to be aware of LLC segments.

Modifying this class with the existing logic (of using routing table builder classes provided via configuration) is proving to result in a very messy piece of code.

We are getting rid of the configurations that allow Pinot deployers to specify their own routing table builders.

@fx19880617 has already confirmed that customized routing is not used in their environment.

@mcvsubbu
Copy link
Contributor Author

@fx19880617 we now support automatic handling of change in number of kafka partitions, change in capacity or number of replicas. See #903

@mcvsubbu
Copy link
Contributor Author

@fx19880617 we don't have a feature to set offsets individually on a per partition basis. We can open that as a separate issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants