Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use partitions ring in write path and ingesters consumption #7376

Merged
merged 10 commits into from
Feb 15, 2024

Conversation

pracucci
Copy link
Collaborator

@pracucci pracucci commented Feb 14, 2024

What this PR does

This PR is another piece towards the partitions ring implementation, as part of the experimental ingest storage based on Kafka. In particular, in this PR I'm changing distributors (write path only) and ingesters to use partitions from the partitions ring.

How it works:

  • Distributors shard incoming series between ACTIVE partitions, looked up from the partitions ring
  • Ingesters own the partition detected from their own instance ID (e.g. ingester-zone-a-0 owns the partition 0)
  • Partitions per-tenant shuffle sharding is configured through a new config option: ingestion_partitions_tenant_shard_size

What's excluded from this PR:

  • The read path (querieres) still query ingesters looked up from the ingesters ring, ignoring partitions ring. This means that read path quorum is still based on ingesters ring and not on the partitions ring (and partitions assignment to ingesters). Naturally this is not what we want to get, but I would like to do such change in a follow up PR due to the large quantity of changes required.

This PR is based on grafana/dskit#484

Which issue(s) this PR fixes or relates to

N/A

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • about-versioning.md updated with experimental features.

// It can be nil, in which case a simple `go f()` will be used.
// See Config.ReusableIngesterPushWorkers on how to configure this.
ingesterDoBatchPushWorkers func(func())
doBatchPushWorkers func(func())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: renamed because it's also used for partitions.

@pracucci pracucci marked this pull request as ready for review February 14, 2024 07:45
@pracucci pracucci requested review from grafanabot and a team as code owners February 14, 2024 07:45
Copy link
Member

@pstibrany pstibrany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mimir changes lgtm.

Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly nitpicks, LGTM

pkg/distributor/distributor_ingest_storage_test.go Outdated Show resolved Hide resolved
pkg/distributor/distributor_test.go Outdated Show resolved Hide resolved
pkg/distributor/distributor_test.go Outdated Show resolved Hide resolved
pkg/distributor/distributor_test.go Outdated Show resolved Hide resolved
pkg/distributor/distributor_test.go Show resolved Hide resolved
Comment on lines +62 to +64
func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, consumerGroup string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) {
consumer := newPusherConsumer(pusher, reg, logger)
return newPartitionReader(kafkaCfg, partitionID, consumer, logger, reg)
return newPartitionReader(kafkaCfg, partitionID, consumerGroup, consumer, logger, reg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to fold the ConsumerGroup into KafkaConfig in the interest of fewer parameters? (I'd even say partitionID should have been there in the first place, but maybe later on)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, KafkaConfig is what you user can configure while input args is internally config. I would keep this distinction for now. I find it easier to reason about.

pkg/util/validation/limits.go Outdated Show resolved Hide resolved
pracucci and others added 9 commits February 14, 2024 18:32
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Co-authored-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, happy to merge this

@pracucci pracucci enabled auto-merge (squash) February 15, 2024 09:24
@pracucci pracucci merged commit 4c290ad into main Feb 15, 2024
28 checks passed
@pracucci pracucci deleted the write-and-consume-partitions branch February 15, 2024 09:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants