Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: add experimental support for consuming records from kafka #6929

Merged
merged 21 commits into from
Dec 15, 2023

Conversation

dimitarvdimitrov
Copy link
Contributor

This PR adds experimental support to the ingester to consume write requests from a Kafka topic. The configuration is the same as used in #6888 - it is hidden because it is very experimental. For the same reason this PR doesn't include a changelog entry.

The consumption logic is following fairly basic manual kafka consumption (outside a consumer group) with a few twists:

  1. unmarshalling and Push() invocation happen concurrently. Roughly 14% of the time spent serving a write request is in unmarhsalling it. Making these two work concurrently speeds up consumption.
  2. the ingester is using a consumer group to persist how far it has consumed, so it picks up from there after a restart. But it doesn't consume in a consumer group - partition allocations is manual based on the ingester ID in the hash ring
  3. committing to the consumer group happens every second to slightly speed up consumption

Incoming work:

  • consume records for different tenants in parallel
  • improve (add) error handling so that only client errors are swallowed, not server errors

This PR adds experimental support to the ingester to consume write requests from a Kafka topic.

The configuration is the same as used in 6888. The consumption logic is following fairly basic manual kafka consumption (outside a consumer group) with a few twists:

1. unmarshalling and Push() invocation happen concurrently. Roughly 14% of the time spent serving a write request is in unmarhsalling it. Making these two work concurrently speeds up consumption.
2. the ingester is using a consumer group to persist how far it has consumed, but it doesn't consume in a consumer group - partition allocations is manual based on the ingester ID in the hash ring
3. committing to the consumer group happens every second to slightly speed up consumption

Incoming work:
* consume records for different tenants in parallel
* improve (add) error handling so that only client errors are swallowed, not server errors

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
pkg/ingester/ingester.go Outdated Show resolved Hide resolved
pkg/ingester/ingester.go Outdated Show resolved Hide resolved
pkg/storage/ingest/reader.go Outdated Show resolved Hide resolved
pkg/storage/ingest/reader.go Outdated Show resolved Hide resolved
r.metrics.recordsPerFetch.Observe(float64(numRecords))
}

func (r *PartitionReader) newKafkaReader(at kgo.Offset) (*kgo.Client, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for this PR, but please add a TODO to your issue to review all consumer-related config options and check what should be fine-tuned. I got some good improvements fine-tuning the producer's ones in the Writer. Can be done in a follow up PR.

}

func (r *PartitionReader) start(ctx context.Context) error {
offset, err := r.fetchLastCommittedOffsetWithRetries(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why fetchLastCommittedOffset() creates its own client, instead of moving this call after the creation of the client below and then just using it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not possible to reset the offset of an existing client without first consuming from it. So we would have to do createClient(); fetchOffset(); fetchRecords(); adjustOffset(); fetchRecords() and the first fetch will be discarded

From the docs:

// SetOffsets sets any matching offsets in setOffsets to the given
// epoch/offset. Partitions that are not specified are not set. It is invalid
// to set topics that were not yet returned from a PollFetches: this function
// sets only partitions that were previously consumed, any extra partitions are
// skipped.

I can add this as a comment

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can potentially change SetOffsets to allow this, it's a pretty old function and I've removed some internal restrictions that made SetOffsets more limited in the past. Not sure on the timeline of that work though (this isn't the first need to improve SetOffsets that I've seen in the past few months)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look :) It will be useful if we can simplify the code, but it's more of a nice-to-have, than a requirement at this point.

pkg/storage/ingest/reader.go Outdated Show resolved Hide resolved
pkg/storage/ingest/reader.go Outdated Show resolved Hide resolved
pkg/storage/ingest/reader.go Show resolved Hide resolved
pkg/storage/ingest/reader.go Outdated Show resolved Hide resolved
pkg/storage/ingest/pusher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/pusher.go Show resolved Hide resolved
pkg/storage/ingest/pusher.go Show resolved Hide resolved

go c.unmarshalRequests(ctx, records, recC)
err := c.pushRequests(ctx, recC)
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't return error. I would not return any error from pushRequests() for now, and revisit it once we'll have the actual error handling logic, unless you already have that logic ready for a follow up PR (in that case keep it as is).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i have the change ready. While I was working on it I realized that the current logic skips records

Copy link
Collaborator

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice job! I left some minor comments, and a major one about the offset committing. Most comments are nits or non blocking ones. Feel free to address them in a follow up PR or skip if you disagree. Thanks!

pkg/storage/ingest/reader_test.go Show resolved Hide resolved
pkg/storage/ingest/reader_test.go Outdated Show resolved Hide resolved
pkg/storage/ingest/reader_test.go Show resolved Hide resolved
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
@dimitarvdimitrov dimitarvdimitrov merged commit 2e49aab into main Dec 15, 2023
28 checks passed
@dimitarvdimitrov dimitarvdimitrov deleted the dimitar/ingest/upstream-basic-reader branch December 15, 2023 14:21
}
lastOffset := int64(0)
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
lastOffset = partition.Records[len(partition.Records)-1].Offset
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add an extra check on the partition, to make sure the partition matches the expected one. It should never happen, but I want to make sure we don't have a bug where we've consumed another partition and then we commit the wrong offset to the partition.

}
err := collectFetchErrs(fetches)
level.Error(r.logger).Log("msg", "encountered error while fetching", "err", err)
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if only some fetches returned error? Aren't we going to loose data because we're not ingesting successful fetches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i opened #6951 to address these comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants