Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Admin command to recover from deleted CONSUMING segment case #6679

Closed
npawar opened this issue Mar 12, 2021 · 23 comments
Closed

Admin command to recover from deleted CONSUMING segment case #6679

npawar opened this issue Mar 12, 2021 · 23 comments

Comments

@npawar
Copy link
Contributor

npawar commented Mar 12, 2021

If a user deletes the CONSUMING segment (by mistake or intentionally coz they don't know the impact), the consumption stops. Pinot is unable to recover sucha a table automatically.
The only way out is to manually add the next sequenceID's segment zk metadata (and optionally add entries in ideal state for CONSUMING to kick off consumption immediately).

We can add admin commands to do these ops.

@kriti-sc
Copy link
Contributor

kriti-sc commented Jul 5, 2021

Picking this up

@Jackie-Jiang
Copy link
Contributor

@npawar @kriti-sc This scenario should be handled within PinotLLCRealtimeSegmentManager.ensureAllPartitionsConsuming() when a partition does not have any consuming segment in ideal state. We should pick the end offset of the latest completed segment and put it as the start offset of the new created consuming segment.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Jul 7, 2021

Currently (unless through human intervention), it is never possible that IDEALSTATE does not contain a CONSUMING segment for each partition(group). I therefore agree with Neha, that if the consuming segment is deleted (for whatever reason) by the administrator, the admin should re-create the consuming segment via human interaction.

So, we can create an Admin API that allows the operator to recreate the CONSUMING segment. Also, have an option in there to pick a very high sequence number. This will also help address issues like #7100

In general, this can be a way to recover realtime tables from extreme scenarios (like changing kafka topics underneath, etc.)

@npawar @kriti-sc This scenario should be handled within PinotLLCRealtimeSegmentManager.ensureAllPartitionsConsuming() when a partition does not have any consuming segment in ideal state. We should pick the end offset of the latest completed segment and put it as the start offset of the new created consuming segment.v

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Jul 7, 2021

@npawar @kriti-sc This scenario should be handled within PinotLLCRealtimeSegmentManager.ensureAllPartitionsConsuming() when a partition does not have any consuming segment in ideal state. We should pick the end offset of the latest completed segment and put it as the start offset of the new created consuming segment.

I disagree with this. I think Neha's approach of creating a command to do this is better, and can be a general solution for multiple issues

@Jackie-Jiang
Copy link
Contributor

@mcvsubbu I see. Yes this api can be useful to skip the bad records which can cause consumption to stop.
I would recommend modeling the api to take the following parameters:

  • table name
  • partition number
  • start offset (optional, use the end offset of last completed segment if not specified)

We should not ask the user to create the whole ZK metadata because that will be very hard to use.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Jul 8, 2021

One of the problems faced is that kafka topic (whether new topic on the same table, or some unerlying kafka change) causes offsets to go lower.

So, the option to re-add consuming segment should be to pick the latest or earliest available offset (as per table config) even if it is lower than the previous segment. So, maybe an option like ignoreOffsetConsistency ?

Operator can provide an offset, but it is hard to do that on a per partition basis. They will need to enter a command for each partition, maybe. I would say provide a way to create consuming segments for all partitions with one command. Optionally, the operator can specify partition numbers, and we can re-create consuming segments for those partitions. Depending on the boolean to ignore the consistency, we can either return an error or force create a consuming segment. If we are force creating, just bump up the sequence number far enough out (say, +100) so that we know by looking at things that something changed (when they call for support -- this is experimental, and there is no saying what else went wrong).

Let us be really careful here, and offer minimum features on this. Such scenarios have not been tested, so this is not a beginner task to go and validate possible race conditions via manual testing. Good thing is we know that the automatic recovery ignores this condition. I would like it to stay that way.

In order to add a consuming segment, you need to do the following: (1) Add segment metadata (2) Update the idealstate.

Not sure what happens when automatic correction comes around between (1) and (2). We will need to play that out manually and see.

A safe method of doing this is perhaps to send out a helix broadcast to all controllers when the API is received, and let the controller that is master for that table handle it, perhaps while locking out the realtime fixer job.

it will also be useful to NOT allow deletion of consuming segments unless the admin explicitly says so in the segment deletion API. I suggest adding a includeConsumingSegments option there.

All this makes it clear, that the admins have the knobs to turn around for automatic fixes, but when they start doing things manually, they are on a thin line.

@Jackie-Jiang
Copy link
Contributor

I feel this is way too complicated, and we should break it down to several primitive operations.

Since the consuming segment can only be removed manually, and the controller won't automatically recreate it, the race condition will be easy to address: a version check when updating the ideal state is good enough.

The first api is the same as I suggested above.
We can provide another api to read the latest offset of each partition (all partitions if partition is not specified) so that the admin knows the offset to use if they want to skip some offsets.
I don't think earliest available offset is very useful here because that can change. In order to use the earliest offset, user should delete all the completed segments for the partition, and run the first api, which should pick the earliest available offset when there is no completed segment.

To summarize, automatic fix always pick the latest offset if there are completed segments, or earliest available offset if not. Manual fix requires the users to explicitly put an offset, and can only work on single partition.

@npawar
Copy link
Contributor Author

npawar commented Jul 8, 2021

+1 to this automatic fix always pick the latest offset if there are completed segments, or earliest available offset if not. One thing to watch out for here is in Kinesis, it is valid to have no CONSUMING segment. We shouldn't end up creating a CONSUMING segment for those, we might have to add endOfPartition into segment metadata. This issue and the suggestion was created before we got the Kinesis refactoring.
I'm not sure about the second part of providing the ability to put an offset, which they will use when they want to skip some offsets. This means that we are declaring that if someone wants to skip some offsets, they have to delete the consuming segments. They may have to delete some completed segments too, if the records they want to skip are already in the completed seg). Do we really want users to be deleting consuming segments whenever they want to skip offsets?

@Jackie-Jiang
Copy link
Contributor

@npawar Skipping offset can be useful when certain bad records can hang the consumption. Ideally we should automatically skip such records, but this is not always handled properly. Skipping offset can help resume consumption in such scenario at the cost of losing some records, but without dropping the whole table or deploying the new code

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Jul 8, 2021

@npawar , @Jackie-Jiang so far what I have heard is that people do things to Kafka, and then want to recover on the pinot side -- they don't care about missed or duplicate data.

So, we can offer this as a solution. Delete all your consuming segments, and the entering the command to create them. Two APIs will get them back to normal.

While recreating the consuming segments, they have a choice:

  • Auto-pick the offset as per prev segment
  • Pick earliest offset, even if lower than prev segment
  • Pick latest offset even if lower than prev segment

I suggest we do not include a way for the operator to specify the offset (as yet. We can add it later if we want).

If some partitions offsets are lower and others are higher, we may have to decide on a way to handle it.

@Jackie-Jiang
Copy link
Contributor

@mcvsubbu So the suggestion here is to add an offsetMode:

  • AUTO: pick the end offset of the last completed segment, or offset per the table config if there is no completed segment
  • EARLIEST/LATEST: As if there is no completed segment

I'd still suggest allowing users to manually put the offset if they want. This might not be as easy to use, but can make special requirement possible, e.g. skipping a known bad offset without losing all the recent records.

@mcvsubbu
Copy link
Contributor

@Jackie-Jiang I don't understand the proposal clearly. Table config already has ways to specify earliest/latest/1d/3d etc.

Let us see what are the cases we need to handle:

  • User has deleted the consuming segment (or segments) by mistake and wants to restore whatever was there
  • User has deleted the consuming segments (or segments) because some partition-groups had a bad offset. User has the right offset to re-start consumption from.
  • User has deleted all consuming segments because they changed something in the underlying stream (e.g. changed the kafka topic, or something like that)

In all of these cases, we need the following arguments

  • A partition group ID (or, some way to say "all partition groups")
  • A directive to pick the best offset (i.e. pick the next offset from prev segment if that offset is still in the stream. Otherwise, pick the earliest offset in the stream, while indicating loss of data)
  • A directive to say that the offsets in the previous segments need to be forgotten, and we just need to pick whatever offset is available now in the partition-group(s), even if it compares lower than the previous segment's offset. In this case, the table config will indicate whether the offset to be chosen is "earliest" or "latest" or some other offset directive. This is same as table creation mode.

Are you in agreement with this, or am I missing some things? Once we have identified the requirements, we can design an API that meets all of that.

@Jackie-Jiang
Copy link
Contributor

@mcvsubbu
In AUTO mode, we create the new consuming segment in the same way it is created during regular consumption: pick the last offset of the previous completed segment if exists, or follow the offset rule within the table config if not.

We need other mode because in certain cases the default way won't skip the bad offset (second scenario). Let's say there is a bad offset two days ago, and the consumption stopped because of that.
User might want to set the offset to:
EARLIEST: Bad offset is already expired within the stream, and user wants to minimize the data loss
LATEST: Start from the latest offset so that the bad offset is guaranteed to be skipped
CUSTOMIZED: Bad offset might not be expired yet. Manually put an offset to skip the bad offset but not losing all the recent data

This design should be able to handle all the requirements

@mcvsubbu
Copy link
Contributor

User can always modify the table config to set any starting offset -- earliest/latest included. This setting makes sense ONLY when a table is created , or in such repair situations. So, I suggest we take this from the table config instead of asking user to provide an argument

@Jackie-Jiang
Copy link
Contributor

@mcvsubbu Forcing the user to change their table config in order to fix a segment does not seem correct to me. For example, user is using EARLIEST for all the new partitions. In order to fix one partition, user has to first change the table config to LATEST, invoke the repair API, then change the table config back. This can also cause problem when there is a new partition added (could be common for Kinesis) at the same time.
Also, there is no way to config a CUSTOMIZED offset as mentioned above

@mcvsubbu
Copy link
Contributor

@Jackie-Jiang the user need not change back. They can keep it at LATEST. New partitions will always consume as they are detected (and therefore earliest). Like I mentioned before, this setting is used exactly once, when the table is created. After that, the setting is pretty much "minimize data loss". All the correction algorithms look for the earliest data available to minimize data loss and use that

@Jackie-Jiang
Copy link
Contributor

@mcvsubbu I don't think that the case at least based on my reading. Once controller detects a new partition, it will fetch the partition offset based on the offset criteria from the stream config (see StreamMetadataProvider). Note that controller checks new partitions periodically, so at the time new partition is detected, the LATEST might not include the earliest event.
Also, I don't see any drawback of allowing user to set the offset criteria for the repair. If not specified, it will fall back to the table config setting. So basically as easy to use and more flexible.

@mcvsubbu
Copy link
Contributor

@Jackie-Jiang if that is the case (i.e. it fetches latest on new partitions) then we have a bug. I know for certain it was not the case when we first coded it up (at the time, only Kafka worked). Maybe in all the re-factoring somewhere things got messed up. Although, we have not had complaints of data loss

@minwoo-jung
Copy link

Hi @Jackie-Jiang @mcvsubbu
Is there any further progress or development since the discussion above?
I think it will be very helpful to keep the data for a long time and to operate it stably when saving data with kafka in real time.
I know you are busy, but I look forward to your reply.
I have previously inquired about this issue. ref : #7100

@suddendust
Copy link
Contributor

Hi, is there any update on this? We have a use-case wherein we had to update like 22k segments as we migrated our kafka clusters. Doing this manually is not possible (Actually we have to update only the consuming segments which is less than 64, but once you expand the segments list in the controller UI, for each segment, you'll have to scroll up to the top of the list to be able to edit the metadata. For 22k segments, this is tedious. Not to mention, we run the risk of missing some segments as it's very tiring).

@Jackie-Jiang
Copy link
Contributor

I think here is what we have agreed on:

  • Create a new rest API on controller to fix the partition without consuming segment
  • The API takes the table name, optional partition group id (if not provided, apply to all partitions)

I feel the offset strategy might not be needed because that can only be applied to new partition where no segment exists, which is handled in the periodic check. In order to skip a bad offset, the API can take an optional parameter to use the LATEST offset for the new consuming segment. I do suggest allowing users to put an offset if they know where to consume for the new consuming segment, but this can be done as the second step.

@mcvsubbu What do you think?

@mcvsubbu
Copy link
Contributor

@sajjad-moradi has been working on a proposal for a solution for a bunch of open issues related to this one (pausing consumption, etc). I think we should find a solution that addresses all of these instead of one problem at a time.

@sajjad-moradi
Copy link
Contributor

Resume endpoint - that was introduced as part of pause/resume feature #8986 - can be used to address the requirement of this ticket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants