Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pinot]Support for pausing the realtime consumption without disabling the table. #6302

Closed
Aka-shi opened this issue Dec 1, 2020 · 38 comments

Comments

@Aka-shi
Copy link
Contributor

Aka-shi commented Dec 1, 2020

As of now, to pause the realtime consumption from kafka we have to disable the table. This also leads to the table not being available for querying.

It would be helpful if there is support for only stopping the realtime consumption while having the table available for querying.

@Aka-shi Aka-shi changed the title Support for pausing the realtime consumption without disabling the table. [pinot]Support for pausing the realtime consumption without disabling the table. Dec 1, 2020
@mcvsubbu
Copy link
Contributor

Two questions, @Aka-shi

  1. After a pause, if the server is restarted, how do you desire the server should come back up? Should it consume up to the exact same paused place again? Should it simply not consume after the last completed/committed point? Should it forget that it was paused?
  2. If there are multiple replicas, then the above question becomes even harder since the behavior has to be co-ordinated across all replicas. I suppose it is not OK for one replica to forget the pause, and the other replica to still be paused.
  3. Further if there are multiple replicas, do you want them all to pause at the same place after a pause command?

@mcvsubbu
Copy link
Contributor

@snleee , @sajjad-moradi if you have other questions/thoughts please chime in

@mcvsubbu
Copy link
Contributor

Other questions:

  1. Would you want all partitions of the stream to pause , or just some specific partitions?
  2. Would you want all partitions/tables in a given server to pause?

@Aka-shi
Copy link
Contributor Author

Aka-shi commented Feb 15, 2021

@mcvsubbu

Would you want all partitions of the stream to pause , or just some specific partitions??

For all partitions. The consumption for the table itself should be stopped. I was thinking something like this.

  1. User pauses stream -> pinot server commits the current consuming segments(for all partitions) along with offsets.
  2. User resumes stream -> Pinot starts consumption from all partitions from previously committed offsets.

Would you want all partitions/tables in a given server to pause?

I was expecting the pause option to be at a table level. Because of this #6555 . If the pause and reset APIs are available at a table level, then the user can pause the current stream(which would commit current segments and pause the stream), reset the offsets, and resume consumption from earliest/latest offsets as per config.

After a pause, if the server is restarted, how do you desire the server should come back up? Should it consume up to the exact same paused place again? Should it simply not consume after the last completed/committed point? Should it forget that it was paused?

If a table is paused and server restarts after it, then considering the previous consuming segments were already committed when the table is paused, I would expect the server to not consume after the restart too. My understanding is when we pause the stream, we are changing the state of the table and it should not start consuming until the user resumes the stream himself.

Further if there are multiple replicas, do you want them all to pause at the same place after a pause command?

Yes. That's what I would expect when I pause the stream. No replica of any partition consumer should be active is what I feel.

PS: Just putting it out here. When we pause the table, the table should still be available for querying the already consumed data. If not, this would be more or less like the enable/disable API.

@mcvsubbu
Copy link
Contributor

Thanks for clarifying. So, you are not really looking for a "pause" (which I assumed to mean pause without committing segments). You are looking for a "commit now and do not consume until further instructions".

Last question (should have been the first). What is the use case for this?

Question for the community: Are there others who need this feature?

@Eywek
Copy link

Eywek commented Mar 10, 2021

Question for the community: Are there others who need this feature?

👋  yes. We're planning to use Pinot to store some data from our customers. We currently pull data from some APIs for our customers, transform it and store it. As this data can be quite large it means we need to stream it, and we need to be able to push this transformed data as a stream into pinot. To do this we plan to use a REALTIME table because we don't want to use Batch ingestion as it means that we need rely on a s3 bucket and we won't be able to know when the data is available for queries.
But when we've finished to pull data from those APIs, we won't update the table anymore, so being able to stop the ingestion but keep the query available could be really useful to avoid putting useless pressure on our Apache Pulsar cluster (we using KoP to be able to use Kafka ingestion plugin).

@mcvsubbu
Copy link
Contributor

@Eywek this seems to be a one-time operation as you describe it? Am I right? Once the data is loaded into pinot, you intend to shut off consumption and just query the data -- i.e. you do not expect more data to arrive in the realtime pipeline. Is that right?

@Eywek
Copy link

Eywek commented Mar 10, 2021

yep exactly

@Eywek
Copy link

Eywek commented Aug 3, 2021

Hi 👋

Do you have any news on this kind of feature?

@kishoreg
Copy link
Member

kishoreg commented Aug 3, 2021

@mcvsubbu @npawar we should definitely consider implementing this. I remember discussing a simple approach for this but I dont seem to recall the solution.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 3, 2021

@Eywek just clarifying if your request to "pause" is the same as that of @Aka-shi -- "Commit all data you have and do not consume any more until operator resumes". Also, if you don't expect to touch the table or send any more data ever again, then why bother pausing? You can get all the queries, and data in consuming segment will be committed eventually, and everything should be good.

@Aka-shi please state your use case as well (i.e. why do you want to "pause" the stream? How long do you expect the pause to last? Will you also need a resume, or just pause and finish the table?)

@Eywek
Copy link

Eywek commented Aug 3, 2021

@Eywek just clarifying if your request to "pause" is the same as that of @Aka-shi -- "Commit all data you have and do not consume any more until operator resumes"

In my use case, I don't need to be able to resume, only being able to pause and query the data.

why bother pausing?

Because we don't need to put useless pressure on our Apache Pulsar cluster (e.g. creating consumers, reading data...), we won't be able to delete the topic if we have consumer attached to it.
I also think that it's preferable to commit the segment when we pause so Pinot doesn't have to re-compute the segment if a server is restarted, and the data won't be lost.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 3, 2021

@Eywek just clarifying if your request to "pause" is the same as that of @Aka-shi -- "Commit all data you have and do not consume any more until operator resumes"

In my use case, I don't need to be able to resume, only being able to pause and query the data.

why bother pausing?

Because we don't need to put useless pressure on our Apache Pulsar cluster (e.g. creating consumers, reading data...), we won't be able to delete the topic if we have consumer attached to it.
I also think that it's preferable to commit the segment when we pause so Pinot doesn't have to re-compute the segment if a server is restarted, and the data won't be lost.

Good point.

Will it help if the realtime segments were moved to an offline table, and you just drop the realtime table when the segments are moved completely? We do have that feature now.

@Eywek
Copy link

Eywek commented Aug 3, 2021

Will it help if the realtime segments were moved to an offline table, and you just drop the realtime table when the segments are moved completely? We do have that feature now.

Good idea. But can we trigger a segment move (from REALTIME to OFFLINE) via the API? And know when this is done?
And can we drop the REALTIME table (of an hybrid table) without breaking anything?

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 3, 2021

Will it help if the realtime segments were moved to an offline table, and you just drop the realtime table when the segments are moved completely? We do have that feature now.

Good idea. But can we trigger a segment move (from REALTIME to OFFLINE) via the API? And know when this is done?
And can we drop the REALTIME table (of an hybrid table) without breaking anything?

No, it is done automatically in a periodic manner. Also, you will need to deploy minions for it.

Of course, you can always do this manually by downloading the segments and uploading them to the offline table. APIs are available for it. A small script can change the segment name. This is a hack (as of now, since segment name change is not a supported feature), but if, for example, we have an API to add a segment to a table under a different name (not a bad ask, IMO), your problem can be completely solved with APIs.

Again, this is based on the assumption that yours is a one-time operation, and therefore having some manual intervention is not an issue.

@Eywek
Copy link

Eywek commented Aug 3, 2021

I would rather using the API instead of minions, since I would do it as soon as my job finished processing data, I want to end this job only when the data is ready (which means when the segment is in the OFFLINE table in this solution).

Not an issue if the API supports it since we can automate it in our job.

This solution seems to work for our use case. But downloading and uploading segments could be a long task, isn't it? There is no other way?

@npawar
Copy link
Contributor

npawar commented Aug 3, 2021 via email

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 4, 2021

There is an API (check Task section on swagger) to trigger the movement from real-time to offline instead of waiting for the periodic task (you still need minions). However the realtimeToOffline task is not designed to achieve exactly what you want. It will not pick up the segments for movement unless they're completed segments. If it seems consuming segments, it will wait for them to complete. https://youtu.be/V_KNUUrS6DA (minute 33 onwards) explains that behavior. We might need to change the job's behavior to support this.

On Tue, Aug 3, 2021, 7:04 AM Valentin Touffet @.***> wrote: I would prefer using API instead of minions, since I would do it as soon as my job finished processing data, I want to end this job only when the data is ready (which means when the segment is in the OFFLINE table in this solution). Not an issue if the API supports it since we can automate it in our job. This solution seems to work for our use case. But downloading and uploading segments could be a long task, isn't it? There is no other way? — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <#6302 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEWIC3NWSBA6K6TAQS5ZSEDT3AOQLANCNFSM4UIMSSVQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email .

The segment will complete when the time constraint is reached. Since there is no more new data, the last completed segment will be moved, and the next consuming segment will be empty. Am I right?

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 4, 2021

@Aka-shi please describe your use case for pause. Thanks

@npawar
Copy link
Contributor

npawar commented Aug 4, 2021

The segment will complete when the time constraint is reached. Since there is no more new data, the last completed segment will be moved, and the next consuming segment will be empty. Am I right?

Not quite. The movement job waits for the complete window. So if you've set window as 1h, it will wait until the completed segments have crossed the entire 1 hour. If there's a consuming segment immediately after, it will wait. There's no way to know if the next consuming segment is going to get events for the incomplete window or not.
This was designed with the assumption that in production you'll have a continuous stream of events and none of these manual and edge cases.

My preference would be to design a pause/resume, instead of relying on the realtimeToOfflineJob for this. Or else, we'll be adding too many if-else to handle all these edge cases in the minion job. Plus, they would have to wait for the time threshold, or adjust it manually just for making this work.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 4, 2021

I agree realtime to offline does not fit the bill here. I think the use case described by @Eywek is unique, so I wanted to avoid a feature build (that may not be simple) for a one-time operation in one installation. Also, I wanted to check if there are existing mechanisms that @Eywek could use one-time to satisfy their needs. Segment name change seems perfectly fine here.

@Eywek Do you expect to get into this situation often (i.e. other tables in the pipeline for such a pause feature)?

@npawar
Copy link
Contributor

npawar commented Aug 4, 2021

Just putting it out there, one option is to wait for all data to make its way into completed segments, then delete the CONSUMING segments. If this is any way a one-time thing and reconsumption is not required, this will work.
Anyway, hoping that we get a concrete use-case description, so we can build this out properly.

@Eywek
Copy link

Eywek commented Aug 5, 2021

Do you expect to get into this situation often (i.e. other tables in the pipeline for such a pause feature)?

Yes I'll have many table with this situation, each time one of our customer try to import a new "datasource" (which happen maybe 10 times / day) we will create a new table and have this situation.

one option is to wait for all data to make its way into completed segments

That's not something that meet my use case, I want to be able to process and save data as fast as possible, waiting for pinot to complete a segment after an amount of time is increasing my processing time, which isn't desirable

@yupeng9
Copy link
Contributor

yupeng9 commented Aug 5, 2021

+1 to the feature. We have a similar request but for a different reason.

When we bootstrap an upsert table using the real-time segment push, we need to pause the real-time ingestion. Otherwise the upsert metadata will hit a race condition. Though it's possible to implement this pause internally, making it explicit will simplify the implementation and make it safer for operations.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 5, 2021

e ingestion. Otherwise the upsert metadata will hit a race condition. Though it's possible to implement this pause internally,

Thanks, yupeng. I need to understand this better. Is this a race condition in pinot? If so, we should fix it regardless?

Let us discuss a little bit on this. I will reach out. and then we can summarize our discussion here.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 5, 2021

Do you expect to get into this situation often (i.e. other tables in the pipeline for such a pause feature)?

Yes I'll have many table with this situation, each time one of our customer try to import a new "datasource" (which happen maybe 10 times / day) we will create a new table and have this situation.

thanks, this is good to know.

one option is to wait for all data to make its way into completed segments

That's not something that meet my use case, I want to be able to process and save data as fast as possible, waiting for pinot to complete a segment after an amount of time is increasing my processing time, which isn't desirable

@yupeng9
Copy link
Contributor

yupeng9 commented Aug 5, 2021

e ingestion. Otherwise the upsert metadata will hit a race condition. Though it's possible to implement this pause internally,

Thanks, yupeng. I need to understand this better. Is this a race condition in pinot? If so, we should fix it regardless?

Let us discuss a little bit on this. I will reach out. and then we can summarize our discussion here.

You can think of it this way:
When the segment in the history is modified, we need to scan the history to derive the latest upsert state per primary key, especially for the partial upsert. This scan cannot proceed when the real-time consumption is on, as we dont have the latest metadata info to handle the real-time update. The way to solve it is to put a lock and block the real-time ingestion, until the load/scan is completed.

@mcvsubbu
Copy link
Contributor

@yupeng9 and I discussed offline, and here is the summary for Upsert support:

For upsert tables, we need an occasional refill of older data. During refill, the segments change data, and if ingestion is also going on, then it messes up the indexing since other (uploaded realtime) segments are being modified. A few minutes of pause where queries are still being served (but potentially served with stale data) is acceptable. The requirement is to pause all partitions, not one at a time.

Having things pause while retaining a CONSUMING segment with data in it is hard. Pausing is easy, but in case a server gets restarted, it is hard to remember the exact offset when things were paused, and consume up to that.

Instead, I suggest we support two controller API primitives

  • Complete current consuming segments NOW, and optionally create a new segment. This may address Issue Add noDowntime option to reset #7280 (basically, force a commit now).
  • Restart consumption by creating new consuming segments.
  • Support the above two for a single partition or all partitions (we can start with all I think).

Things to think about and handle correctly (just listing a few)

  • Make sure the server initiates ONE call to complete segments. The pause operation needs to be invoked on the PartitionConsumer object. Not sure what happens if an ONLINE transition already comes in. A bad race condition can happen in such a case.
  • Make sure the periodic task does not restart a paused consumption.
  • Decide if paused state needs to be marked in zk (in idealstate?) Handle server restart during paused state.

I am sure people can add more.

@yupeng9
Copy link
Contributor

yupeng9 commented Aug 11, 2021

Thanks for writing down the summary!

A question on if we need to complete the consuming segment upon pause. Is it possible to simply pause the consumption? If the server restarts, then the server will lose the offset info, and restart from the previous checkpoint which is the same as the current consuming segment behavior. This could avoid the segment sealing (and small segment file) if the pause is not too long.

@mcvsubbu
Copy link
Contributor

I suggest to provide an option of either discarding all the consuming segments or completing all of them. I feel that handling server restarts when we are consumed half way and paused will be hard.

Take the case when the server starts to consume a segment at offset 100 and there are two replicas A and B. After sometime, a "pause" command is entered. Replica A is at 150, and Replica B is at 160, and they stop consuming, with the rows still in memory.

Now, A gets restarted. Ingestion has continued, so the current offset available in stream is 200.

Should A just serve data until 100 (i.e. data that has been committed) whereas B will serve data until 150?
Should A consume up to 150 and stop?
Should A consume up to 160 and stop? What if there are three replicas?

It gets harder to maintain state.

We can provide the operator an option to either complete the current consuming segment or discard it. Either way, we "pause" at a point where everything is committed.

@yupeng9
Copy link
Contributor

yupeng9 commented Aug 11, 2021

Perhaps I didn't fully see the complexity.
I feel when the consumption is paused, then it's paused regardless of the offset. If a server restarts, the offset will rewind, but it will not consume.

I think the same issue applies to normal consumption? You still have this offset mismatch across servers, when a server restarts?

Or in other words, I hope the pause can be a simple instruction to the consumers, but not related to the state transition.

@Aka-shi
Copy link
Contributor Author

Aka-shi commented Aug 12, 2021

@mcvsubbu Sorry for the delayed response. The initial use case for which I raised this issue is:
We had frequent downtimes on Kafka at that time and during every downtime server fails to ingest and we had to restart servers to recover. Even though we were informed of the downtime, the other option to avoid server restarts was to disable the table. But disabling the table also means querying is disabled for that table, which is not intended. So we wanted a functionality where if we are aware/anticipate ingestion failures we can just pause the ingestion and resume it later.

Recently, we are also using the upsert feature in pinot and we want to manually push segments to the upsert table too. SO @yupeng9's use case is valid for us too.

@mcvsubbu
Copy link
Contributor

@Aka-shi Pinot automatically recovers from Kafka failures. How much time did you allow to recover? There is a periodic task (you can adjust the frequency to run every X minutes) that recreates failed segments and restarts consumption. We use that in production, and things just auto-recover.

https://docs.pinot.apache.org/configuration-reference/controller#periodic-tasks-configuration
The one you want is controller.realtime.segment.validation.frequencyPeriod

If that does not work right, then we have a bug that needs to be fixed nevertheless.

@mcvsubbu
Copy link
Contributor

@Aka-shi did the periodic task help you recover from kafka downtimes automatically?

@mbecker
Copy link

mbecker commented Nov 17, 2021

Hi,

maybe adding another question to this issue: How to restart the ingestion process (with new configuration)?

Let's assume the following two use cases:

  • The Kafka broker died completely; all data lost; the Kafka broker is completely built up new
  • Using a new / different Kafka broker

How could I restart the streaming ingestion process (maybe with new configs that the "old" offsets doesn't match)?

I've tried to disable and then enable the table; I tried to "overwrite" the table (by just adding it again with new streamconfigs); didn't work.

Thanks!

@mcvsubbu
Copy link
Contributor

@mbecker I think the scenario you state is closer to issue #6555 and/or issue #7280 but I agree that these are closely related.

@Aka-shi
Copy link
Contributor Author

Aka-shi commented Dec 16, 2021

did the periodic task help you recover from kafka downtimes automatically?

@mcvsubbu Yes. That helped us in recovering. Apologies for delayed response.

@sajjad-moradi
Copy link
Contributor

Pause/resume feature #8986 is now merged into master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants