Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
neilkakkar committed Jun 10, 2024
2 parents b787199 + f28466a commit ea8b996
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 73 deletions.
3 changes: 3 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Config {
pub redis_url: String,
pub otel_url: Option<String>,

#[envconfig(default = "false")]
pub overflow_enabled: bool,

#[envconfig(default = "100")]
pub overflow_per_second_limit: NonZeroU32,

Expand Down
42 changes: 24 additions & 18 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,30 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
let partition = match config.overflow_enabled {
false => None,
true => {
let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
Some(partition)
}
};
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

Expand Down
55 changes: 43 additions & 12 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,51 +36,78 @@ impl rdkafka::ClientContext for KafkaContext {
for (topic, stats) in stats.topics {
gauge!(
"capture_kafka_produce_avg_batch_size_bytes",
"topic" => topic.clone()
"topic" => topic.clone()
)
.set(stats.batchsize.avg as f64);
gauge!(
"capture_kafka_produce_avg_batch_size_events",

"topic" => topic
)
.set(stats.batchcnt.avg as f64);
}

for (_, stats) in stats.brokers {
let id_string = format!("{}", stats.nodeid);
if let Some(rtt) = stats.rtt {
gauge!(
"capture_kafka_produce_rtt_latency_us",
"quantile" => "p50",
"broker" => id_string.clone()
)
.set(rtt.p50 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_us",
"quantile" => "p90",
"broker" => id_string.clone()
)
.set(rtt.p90 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_us",
"quantile" => "p95",
"broker" => id_string.clone()
)
.set(rtt.p95 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_us",
"quantile" => "p99",
"broker" => id_string.clone()
)
.set(rtt.p99 as f64);
}

gauge!(
"capture_kafka_broker_requests_pending",

"broker" => id_string.clone()
)
.set(stats.outbuf_cnt as f64);
gauge!(
"capture_kafka_broker_responses_awaiting",

"broker" => id_string.clone()
)
.set(stats.waitresp_cnt as f64);
counter!(
"capture_kafka_broker_tx_errors_total",

"broker" => id_string.clone()
)
.absolute(stats.txerrs);
counter!(
"capture_kafka_broker_rx_errors_total",

"broker" => id_string
"broker" => id_string.clone()
)
.absolute(stats.rxerrs);
counter!(
"capture_kafka_broker_request_timeouts",
"broker" => id_string
)
.absolute(stats.req_timeouts);
}
}
}

#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
main_topic: String,
historical_topic: String,
}
Expand All @@ -89,7 +116,7 @@ impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -150,7 +177,11 @@ impl KafkaSink {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
// TODO: deprecate capture-led overflow or move logic in handler
if self.partition.is_limited(&event_key) {
let is_limited = match &self.partition {
None => false,
Some(partition) => partition.is_limited(&event_key),
};
if is_limited {
(&self.main_topic, None) // Analytics overflow goes to the main topic without locality
} else {
(&self.main_topic, Some(event_key.as_str()))
Expand Down Expand Up @@ -280,11 +311,11 @@ mod tests {
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter = OverflowLimiter::new(
let limiter = Some(OverflowLimiter::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
None,
);
));
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = config::KafkaConfig {
kafka_producer_linger_ms: 0,
Expand Down
9 changes: 8 additions & 1 deletion capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,14 @@ pub async fn event(
tracing::debug!(context=?context, events=?events, "decoded request");

if let Err(err) = process_events(state.sink.clone(), &events, &context).await {
report_dropped_events("process_events_error", events.len() as u64);
let cause = match err {
// TODO: automate this with a macro
CaptureError::EmptyDistinctId => "empty_distinct_id",
CaptureError::MissingDistinctId => "missing_distinct_id",
CaptureError::MissingEventName => "missing_event_name",
_ => "process_events_error",
};
report_dropped_events(cause, events.len() as u64);
tracing::log::warn!("rejected invalid payload: {}", err);
return Err(err);
}
Expand Down
1 change: 1 addition & 0 deletions capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
overflow_enabled: false,
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
Expand Down
54 changes: 54 additions & 0 deletions capture/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async fn it_overflows_events_on_burst() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = true;
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

Expand Down Expand Up @@ -223,6 +224,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = true;
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

Expand Down Expand Up @@ -254,6 +256,58 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn it_skips_overflows_when_disabled() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = false;
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
"event": "event1",
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"distinct_id": distinct_id
},{
"token": token,
"event": "event3",
"distinct_id": distinct_id
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

// Should have triggered overflow, but has not
assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);
Ok(())
}

#[tokio::test]
async fn it_trims_distinct_id() -> Result<()> {
setup_tracing();
Expand Down
Loading

0 comments on commit ea8b996

Please sign in to comment.