-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(metrics): Split buckets into partitions (dry run) [INGEST-1472] #1425
Conversation
relay-metrics/src/aggregation.rs
Outdated
.entry(key.project_key) | ||
.or_default() | ||
.push(HashedBucket { | ||
hashed_key: key.as_integer_lossy(), // TODO: Do we need a more reliable hasher? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we can rely on this hasher, this comment seems pretty clear:
relay/relay-metrics/src/aggregation.rs
Lines 894 to 895 in a9d9390
// XXX: The way this hasher is used may be platform-dependent. If we want to produce the | |
// same hash across platforms, the `deterministic_hash` crate may be useful. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I looked into this, didn't find any great, fast crates, and then @jan-auer pointed out that we already have FMV for this.
Considering we're doing sharding of kafka topics by org id at some point, I think it's time well spent to look into a hashing function that is deterministic and portable (can be replicated in Python), and I am not sure if this function here fulfills any of those criteria.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have FMV for this.
Good point, will replace with FnvHasher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FnvHasher
If you have the time, ensure we're picking a hashing function that is easily usable in python in any case. while portability not required for this story, we will need it in future tasks for traffic steering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a Python impl for FNV, and it looks simple enough to write ourselves if necessary:
https://pypi.org/project/fnvhash/
https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
relay-metrics/src/aggregation.rs
Outdated
// XXX: The way this hasher is used may be platform-dependent. If we want to produce the | ||
// same hash across platforms, the `deterministic_hash` crate may be useful. | ||
|
||
// TODO(jjbayer): Use FnvHasher here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed, fnv is fine and easy to impl, let's do it
relay-metrics/src/aggregation.rs
Outdated
relay_statsd::metric!( | ||
histogram(MetricHistograms::BucketsPerBatch) = batch.len() as f64, | ||
partition_key = partition_tag.as_str(), | ||
batch_index = format!("{i}").as_str(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't tag this, as the cardinality is theoretically unbound, and instead add a histogram metric counting the number of batches within a partition, i.e. metric!(histogram(..) = capped_batches.len());
relay-metrics/src/aggregation.rs
Outdated
let capped_batches = | ||
CappedBucketIter::new(buckets.into_iter(), self.config.max_flush_bytes); | ||
let partition_tag = match partition_key { | ||
Some(partition_key) => format!("{partition_key}"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we prefer to use .to_string()
instead of format!("{..}")
relay-statsd/src/lib.rs
Outdated
|
||
f(); | ||
|
||
*METRICS_CLIENT.write() = old_client; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might have those changes locally, but this needs to be done per-thread
@@ -20,7 +20,7 @@ relay-system = { path = "../relay-system" } | |||
serde = { version = "1.0.114", features = ["derive"] } | |||
serde_json = "1.0.55" | |||
failure = "0.1.8" | |||
crc32fast = "1.2.1" | |||
fnv = "1.0.7" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency was already in Cargo.lock.
// Create a 64-bit hash of the bucket key using FnvHasher. | ||
// This is used for partition key computation and statsd logging. | ||
fn hash64(&self) -> u64 { | ||
let mut hasher = FnvHasher::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With fnv::FnvHasher
we can auto-derive Hash
, with hash32
, we cannot.
Convert the dry run implemented in #1425 into an actual batching mechanism that splits metrics buckets into logical partitions. The partition_key has to be passed through ProjectCache, EnvelopeManager and UpstreamRelay to be set as a header on the outgoing envelope request.
Measure what distributions we would obtain if we split flush buckets not only by project, but also by partition, which is determined by a configurable number of partitions and hashing the bucket key.
#skip-changelog