Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queues: communicate flushes back to source in order to keep slot size down #1626

Merged
merged 3 commits into from
Apr 19, 2024

Conversation

serprex
Copy link
Member

@serprex serprex commented Apr 18, 2024

This is particularly important for queues, as to reduce reconnections long sync batches are desirable

… down

This is particularly important, as to reduce reconnections queues should be given day long sync batches
@@ -226,11 +225,11 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
if err := c.client.Flush(ctx); err != nil {
c.logger.Warn("[kafka] flush error", slog.Any("error", err))
continue
} else if lastSeen > lastUpdatedOffset {
} else if lastSeen > req.ConsumedOffset.Load() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update seems a bit repetitive, maybe it'd be better as a shared method, but fine to leave for now

@serprex
Copy link
Member Author

serprex commented Apr 18, 2024

localhost_3000_peers_source

Screenshot of lag decreasing while sync batch ongoing with a small amount of ingestion

@serprex serprex enabled auto-merge (squash) April 18, 2024 17:03
@serprex serprex merged commit 7fb283a into main Apr 19, 2024
6 checks passed
@serprex serprex deleted the queue-lsn branch April 19, 2024 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants