Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion lag because of multiple checkpoints. #16293

Open
Pankaj260100 opened this issue Apr 16, 2024 · 1 comment
Open

Ingestion lag because of multiple checkpoints. #16293

Pankaj260100 opened this issue Apr 16, 2024 · 1 comment

Comments

@Pankaj260100
Copy link
Contributor

Pankaj260100 commented Apr 16, 2024

Description

We see multiple checkpoints get triggered back-to-back from an ingestion task when we hit the maxTotalRows in one of the task replicas. The task should consume from at least 2 partitions, and the ingestion replica is greater than equal to 2.

These multiple checkpoints can lead to 2 scenarios:

  • Scenario 1: Ingestion gets throttled because of multiple persists
  • Scenario 2: The ingestion task got killed by the overlord because it failed to pause in a timely manner when the overlord was handling checkpoint requests.

Why we do not see this when the ingestion replica is less than 2?

  • Let's take an example: Task A, ingesting from 2 partitions(x, y) and currently x=100 and y =50 offsets, and task A hit maxTotalRows(let's suppose it is set to 150)

T1(x = 100, y = 50) ------- Hit row limit, issue a checkpoint request and set the endoffset to (100, 50) ------ this task goes to maybePersistAndPublishSequences and publishes the segment as it is created and updates the maxTotalRows-(number of rows published) And start ingesting again into the new sequence.

Why do we see this with ingestion replica greater than equal to 2?

  • Ex: Task A and its replica Task B, ingesting from 2 partitions. MaxTotalRow is set to 150. (Task B might have handoff the earlier segment. That's why it is ahead and still does not hit the maxTotalRows).
CurrentTotalRows= 150 for task A                                      
Task A(x1= 100 and y1 = 50)                                                   Task B(x1 = 200 and y1= 75)
                      |                                                                 |                         
                      |                                                                 |
                      |                                                                 | 
hit MaxTotalRows/Send Checkpoint                                                        |
                      |                                                     
                      |                     both tasks got paused                       |             
                      |                                                                 |
                      |-----------   set endoffset(200,75) ---------------------------- |
                      |                                                                 |
                      |                                                                 |
TaskA again starts ingesting in an older sequence.                    Task B can go ahead and publish the segment as it has already reached 
                      |                                                         endoffset for both partition
                      |                                                                 |                                                          
                      |                                                                 |
 Task A (x1= 150, y1 = 75)                                                              |      
 for partition y1 it will start ingesting into new sequence                             |
 as it has reached endoffset 75 for y1.                                                 |
 curr Total Rows is 225(> 150)                                                          |
 new sequence is not checkpointed                                             **Scenario2:**  Sometime when Task A issued a checkpoint and  
 So, Task A again sends a checkpoint request                              overlord askTask A and Task B to pause. But task B busy  
                      |                                                   publishing the segment to s3. It won't respond to pause       
                      |                                                    request from overlord and after exhausting all retries
                                                                            overlord ask the  worker to kill this task. And it can go into very bad 
                      |                                                      state if this task has published the segment and got killed, we                                   
                      |                                                       have seen all the Subsequent task failed becuase of metadata mismatch.  
                      |                                                       
                      |                                                 // **scenario1** The task gets paused, it will again set the endoffset                                
                      |                                                         and start publishing/ ingesting into new seq. But in this case we                                     
                      |                                                            see more than 1 persist throttled the ingestion                               
                      |
                      |
                      |          
After checkpointing if it again reaches the endoffset for 
  partition y1 before partition x1 reaches 200 it will again
  issue a new checkpoint request as currentTotalRows will remain high.
  we have not published the segment as partition x1 is behind.                 
                      
                            
@Pankaj260100
Copy link
Contributor Author

In short, If the task is consuming from multiple partitions and, for some reason, the consumption from multiple kafka partitions is different, the task will continue to hold the older segment. This causes multiple checkpoint requests and the above issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants