From ebaf596ec571f0c2c2cc8bafbdd4ee932ac28b2a Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 27 May 2024 14:23:07 +0200 Subject: [PATCH 1/7] capture: add overflow_enabled option (#43) --- capture/src/config.rs | 3 +++ capture/src/server.rs | 42 ++++++++++++++++------------- capture/src/sinks/kafka.rs | 14 ++++++---- capture/tests/common.rs | 1 + capture/tests/events.rs | 54 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 23 deletions(-) diff --git a/capture/src/config.rs b/capture/src/config.rs index 07b7f89..d91e7b7 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -13,6 +13,9 @@ pub struct Config { pub redis_url: String, pub otel_url: Option, + #[envconfig(default = "false")] + pub overflow_enabled: bool, + #[envconfig(default = "100")] pub overflow_per_second_limit: NonZeroU32, diff --git a/capture/src/server.rs b/capture/src/server.rs index 0704987..8585036 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -48,24 +48,30 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = OverflowLimiter::new( - config.overflow_per_second_limit, - config.overflow_burst_limit, - config.overflow_forced_keys, - ); - if config.export_prometheus { - let partition = partition.clone(); - tokio::spawn(async move { - partition.report_metrics().await; - }); - } - { - // Ensure that the rate limiter state does not grow unbounded - let partition = partition.clone(); - tokio::spawn(async move { - partition.clean_state().await; - }); - } + let partition = match config.overflow_enabled { + false => None, + true => { + let partition = OverflowLimiter::new( + config.overflow_per_second_limit, + config.overflow_burst_limit, + config.overflow_forced_keys, + ); + if config.export_prometheus { + let partition = partition.clone(); + tokio::spawn(async move { + partition.report_metrics().await; + }); + } + { + // Ensure that the rate limiter state does not grow unbounded + let partition = partition.clone(); + tokio::spawn(async move { + partition.clean_state().await; + }); + } + Some(partition) + } + }; let sink = KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index 945e581..bc45fa1 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -80,7 +80,7 @@ impl rdkafka::ClientContext for KafkaContext { #[derive(Clone)] pub struct KafkaSink { producer: FutureProducer, - partition: OverflowLimiter, + partition: Option, main_topic: String, historical_topic: String, } @@ -89,7 +89,7 @@ impl KafkaSink { pub fn new( config: KafkaConfig, liveness: HealthHandle, - partition: OverflowLimiter, + partition: Option, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -150,7 +150,11 @@ impl KafkaSink { DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events DataType::AnalyticsMain => { // TODO: deprecate capture-led overflow or move logic in handler - if self.partition.is_limited(&event_key) { + let is_limited = match &self.partition { + None => false, + Some(partition) => partition.is_limited(&event_key), + }; + if is_limited { (&self.main_topic, None) // Analytics overflow goes to the main topic without locality } else { (&self.main_topic, Some(event_key.as_str())) @@ -280,11 +284,11 @@ mod tests { let handle = registry .register("one".to_string(), Duration::seconds(30)) .await; - let limiter = OverflowLimiter::new( + let limiter = Some(OverflowLimiter::new( NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap(), None, - ); + )); let cluster = MockCluster::new(1).expect("failed to create mock brokers"); let config = config::KafkaConfig { kafka_producer_linger_ms: 0, diff --git a/capture/tests/common.rs b/capture/tests/common.rs index 788e6e2..868b27c 100644 --- a/capture/tests/common.rs +++ b/capture/tests/common.rs @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { print_sink: false, address: SocketAddr::from_str("127.0.0.1:0").unwrap(), redis_url: "redis://localhost:6379/".to_string(), + overflow_enabled: false, overflow_burst_limit: NonZeroU32::new(5).unwrap(), overflow_per_second_limit: NonZeroU32::new(10).unwrap(), overflow_forced_keys: None, diff --git a/capture/tests/events.rs b/capture/tests/events.rs index 111b02c..7d2defc 100644 --- a/capture/tests/events.rs +++ b/capture/tests/events.rs @@ -174,6 +174,7 @@ async fn it_overflows_events_on_burst() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); @@ -223,6 +224,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(1).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); @@ -254,6 +256,58 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { Ok(()) } +#[tokio::test] +async fn it_skips_overflows_when_disabled() -> Result<()> { + setup_tracing(); + + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + + let mut config = DEFAULT_CONFIG.clone(); + config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = false; + config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); + config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); + + let server = ServerHandle::for_config(config).await; + + let event = json!([{ + "token": token, + "event": "event1", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event2", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event3", + "distinct_id": distinct_id + }]); + + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + // Should have triggered overflow, but has not + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + Ok(()) +} + #[tokio::test] async fn it_trims_distinct_id() -> Result<()> { setup_tracing(); From f71dc0867bc0ce80c91953de69f5ca9f65521d84 Mon Sep 17 00:00:00 2001 From: Neil Kakkar Date: Wed, 29 May 2024 14:32:41 +0100 Subject: [PATCH 2/7] feat(flags): Do token validation and extract distinct id (#41) --- Cargo.lock | 39 ++++- feature-flags/Cargo.toml | 1 + feature-flags/src/api.rs | 9 ++ feature-flags/src/config.rs | 2 +- feature-flags/src/lib.rs | 9 ++ feature-flags/src/redis.rs | 73 +++++---- feature-flags/src/team.rs | 139 ++++++++++++++++++ feature-flags/src/test_utils.rs | 50 +++++++ feature-flags/src/v0_endpoint.rs | 23 +-- feature-flags/src/v0_request.rs | 90 ++++++++++-- .../tests/{common.rs => common/mod.rs} | 27 ++-- feature-flags/tests/test_flags.rs | 46 +++++- 12 files changed, 442 insertions(+), 66 deletions(-) create mode 100644 feature-flags/src/team.rs create mode 100644 feature-flags/src/test_utils.rs rename feature-flags/tests/{common.rs => common/mod.rs} (76%) diff --git a/Cargo.lock b/Cargo.lock index 0f475fa..8642ade 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -707,6 +707,7 @@ dependencies = [ "redis", "reqwest 0.12.3", "serde", + "serde-pickle", "serde_json", "thiserror", "tokio", @@ -1395,6 +1396,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "iter-read" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c397ca3ea05ad509c4ec451fea28b4771236a376ca1c69fd5143aae0cf8f93c4" + [[package]] name = "itertools" version = "0.12.1" @@ -1680,6 +1687,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1705,11 +1722,10 @@ checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" [[package]] name = "num-integer" -version = "0.1.45" +version = "0.1.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" dependencies = [ - "autocfg", "num-traits", ] @@ -1726,9 +1742,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", "libm", @@ -2533,6 +2549,19 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-pickle" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c762ad136a26407c6a80825813600ceeab5e613660d93d79a41f0ec877171e71" +dependencies = [ + "byteorder", + "iter-read", + "num-bigint", + "num-traits", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.196" diff --git a/feature-flags/Cargo.toml b/feature-flags/Cargo.toml index ddfe070..1e0c111 100644 --- a/feature-flags/Cargo.toml +++ b/feature-flags/Cargo.toml @@ -24,6 +24,7 @@ redis = { version = "0.23.3", features = [ serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } +serde-pickle = { version = "1.1.1"} [lints] workspace = true diff --git a/feature-flags/src/api.rs b/feature-flags/src/api.rs index c94eed6..ccf4735 100644 --- a/feature-flags/src/api.rs +++ b/feature-flags/src/api.rs @@ -37,6 +37,11 @@ pub enum FlagError { #[error("rate limited")] RateLimited, + + #[error("failed to parse redis cache data")] + DataParsingError, + #[error("redis unavailable")] + RedisUnavailable, } impl IntoResponse for FlagError { @@ -52,6 +57,10 @@ impl IntoResponse for FlagError { } FlagError::RateLimited => (StatusCode::TOO_MANY_REQUESTS, self.to_string()), + + FlagError::DataParsingError | FlagError::RedisUnavailable => { + (StatusCode::SERVICE_UNAVAILABLE, self.to_string()) + } } .into_response() } diff --git a/feature-flags/src/config.rs b/feature-flags/src/config.rs index 3fa6f50..cc7ad37 100644 --- a/feature-flags/src/config.rs +++ b/feature-flags/src/config.rs @@ -4,7 +4,7 @@ use envconfig::Envconfig; #[derive(Envconfig, Clone)] pub struct Config { - #[envconfig(default = "127.0.0.1:0")] + #[envconfig(default = "127.0.0.1:3001")] pub address: SocketAddr, #[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")] diff --git a/feature-flags/src/lib.rs b/feature-flags/src/lib.rs index 9175b5c..195a55c 100644 --- a/feature-flags/src/lib.rs +++ b/feature-flags/src/lib.rs @@ -3,5 +3,14 @@ pub mod config; pub mod redis; pub mod router; pub mod server; +pub mod team; pub mod v0_endpoint; pub mod v0_request; + +// Test modules don't need to be compiled with main binary +// #[cfg(test)] +// TODO: To use in integration tests, we need to compile with binary +// or make it a separate feature using cfg(feature = "integration-tests") +// and then use this feature only in tests. +// For now, ok to just include in binary +pub mod test_utils; diff --git a/feature-flags/src/redis.rs b/feature-flags/src/redis.rs index 8c03820..89dde42 100644 --- a/feature-flags/src/redis.rs +++ b/feature-flags/src/redis.rs @@ -2,20 +2,38 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use redis::AsyncCommands; +use redis::{AsyncCommands, RedisError}; +use thiserror::Error; use tokio::time::timeout; // average for all commands is <10ms, check grafana const REDIS_TIMEOUT_MILLISECS: u64 = 10; +#[derive(Error, Debug)] +pub enum CustomRedisError { + #[error("Not found in redis")] + NotFound, + + #[error("Pickle error: {0}")] + PickleError(#[from] serde_pickle::Error), + + #[error("Redis error: {0}")] + Other(#[from] RedisError), + + #[error("Timeout error")] + Timeout(#[from] tokio::time::error::Elapsed), +} /// A simple redis wrapper /// Copied from capture/src/redis.rs. -/// TODO: Modify this to support hincrby, get, and set commands. +/// TODO: Modify this to support hincrby #[async_trait] pub trait Client { // A very simplified wrapper, but works for our usage async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result>; + + async fn get(&self, k: String) -> Result; + async fn set(&self, k: String, v: String) -> Result<()>; } pub struct RedisClient { @@ -40,38 +58,39 @@ impl Client for RedisClient { Ok(fut?) } -} -// TODO: Find if there's a better way around this. -#[derive(Clone)] -pub struct MockRedisClient { - zrangebyscore_ret: Vec, -} + async fn get(&self, k: String) -> Result { + let mut conn = self.client.get_async_connection().await?; -impl MockRedisClient { - pub fn new() -> MockRedisClient { - MockRedisClient { - zrangebyscore_ret: Vec::new(), + let results = conn.get(k); + let fut: Result, RedisError> = + timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; + + // return NotFound error when empty or not found + if match &fut { + Ok(v) => v.is_empty(), + Err(_) => false, + } { + return Err(CustomRedisError::NotFound); } - } - pub fn zrangebyscore_ret(&mut self, ret: Vec) -> Self { - self.zrangebyscore_ret = ret; + // TRICKY: We serialise data to json, then django pickles it. + // Here we deserialize the bytes using serde_pickle, to get the json string. + let string_response: String = serde_pickle::from_slice(&fut?, Default::default())?; - self.clone() + Ok(string_response) } -} -impl Default for MockRedisClient { - fn default() -> Self { - Self::new() - } -} + async fn set(&self, k: String, v: String) -> Result<()> { + // TRICKY: We serialise data to json, then django pickles it. + // Here we serialize the json string to bytes using serde_pickle. + let bytes = serde_pickle::to_vec(&v, Default::default())?; -#[async_trait] -impl Client for MockRedisClient { - // A very simplified wrapper, but works for our usage - async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result> { - Ok(self.zrangebyscore_ret.clone()) + let mut conn = self.client.get_async_connection().await?; + + let results = conn.set(k, bytes); + let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; + + Ok(fut?) } } diff --git a/feature-flags/src/team.rs b/feature-flags/src/team.rs new file mode 100644 index 0000000..ac62ea9 --- /dev/null +++ b/feature-flags/src/team.rs @@ -0,0 +1,139 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tracing::instrument; + +use crate::{ + api::FlagError, + redis::{Client, CustomRedisError}, +}; + +// TRICKY: This cache data is coming from django-redis. If it ever goes out of sync, we'll bork. +// TODO: Add integration tests across repos to ensure this doesn't happen. +pub const TEAM_TOKEN_CACHE_PREFIX: &str = "posthog:1:team_token:"; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Team { + pub id: i64, + pub name: String, + pub api_token: String, +} + +impl Team { + /// Validates a token, and returns a team if it exists. + + #[instrument(skip_all)] + pub async fn from_redis( + client: Arc, + token: String, + ) -> Result { + // TODO: Instead of failing here, i.e. if not in redis, fallback to pg + let serialized_team = client + .get(format!("{TEAM_TOKEN_CACHE_PREFIX}{}", token)) + .await + .map_err(|e| match e { + CustomRedisError::NotFound => FlagError::TokenValidationError, + CustomRedisError::PickleError(_) => { + tracing::error!("failed to fetch data: {}", e); + FlagError::DataParsingError + } + _ => { + tracing::error!("Unknown redis error: {}", e); + FlagError::RedisUnavailable + } + })?; + + let team: Team = serde_json::from_str(&serialized_team).map_err(|e| { + tracing::error!("failed to parse data to team: {}", e); + FlagError::DataParsingError + })?; + + Ok(team) + } +} + +#[cfg(test)] +mod tests { + use rand::Rng; + use redis::AsyncCommands; + + use super::*; + use crate::{ + team, + test_utils::{insert_new_team_in_redis, random_string, setup_redis_client}, + }; + + #[tokio::test] + async fn test_fetch_team_from_redis() { + let client = setup_redis_client(None); + + let team = insert_new_team_in_redis(client.clone()).await.unwrap(); + + let target_token = team.api_token; + + let team_from_redis = Team::from_redis(client.clone(), target_token.clone()) + .await + .unwrap(); + assert_eq!(team_from_redis.api_token, target_token); + assert_eq!(team_from_redis.id, team.id); + } + + #[tokio::test] + async fn test_fetch_invalid_team_from_redis() { + let client = setup_redis_client(None); + + match Team::from_redis(client.clone(), "banana".to_string()).await { + Err(FlagError::TokenValidationError) => (), + _ => panic!("Expected TokenValidationError"), + }; + } + + #[tokio::test] + async fn test_cant_connect_to_redis_error_is_not_token_validation_error() { + let client = setup_redis_client(Some("redis://localhost:1111/".to_string())); + + match Team::from_redis(client.clone(), "banana".to_string()).await { + Err(FlagError::RedisUnavailable) => (), + _ => panic!("Expected RedisUnavailable"), + }; + } + + #[tokio::test] + async fn test_corrupted_data_in_redis_is_handled() { + // TODO: Extend this test with fallback to pg + let id = rand::thread_rng().gen_range(0..10_000_000); + let token = random_string("phc_", 12); + let team = Team { + id, + name: "team".to_string(), + api_token: token, + }; + let serialized_team = serde_json::to_string(&team).expect("Failed to serialise team"); + + // manually insert non-pickled data in redis + let client = + redis::Client::open("redis://localhost:6379/").expect("Failed to create redis client"); + let mut conn = client + .get_async_connection() + .await + .expect("Failed to get redis connection"); + conn.set::( + format!( + "{}{}", + team::TEAM_TOKEN_CACHE_PREFIX, + team.api_token.clone() + ), + serialized_team, + ) + .await + .expect("Failed to write data to redis"); + + // now get client connection for data + let client = setup_redis_client(None); + + match Team::from_redis(client.clone(), team.api_token.clone()).await { + Err(FlagError::DataParsingError) => (), + Err(other) => panic!("Expected DataParsingError, got {:?}", other), + Ok(_) => panic!("Expected DataParsingError"), + }; + } +} diff --git a/feature-flags/src/test_utils.rs b/feature-flags/src/test_utils.rs new file mode 100644 index 0000000..75db86d --- /dev/null +++ b/feature-flags/src/test_utils.rs @@ -0,0 +1,50 @@ +use anyhow::Error; +use std::sync::Arc; + +use crate::{ + redis::{Client, RedisClient}, + team::{self, Team}, +}; +use rand::{distributions::Alphanumeric, Rng}; + +pub fn random_string(prefix: &str, length: usize) -> String { + let suffix: String = rand::thread_rng() + .sample_iter(Alphanumeric) + .take(length) + .map(char::from) + .collect(); + format!("{}{}", prefix, suffix) +} + +pub async fn insert_new_team_in_redis(client: Arc) -> Result { + let id = rand::thread_rng().gen_range(0..10_000_000); + let token = random_string("phc_", 12); + let team = Team { + id, + name: "team".to_string(), + api_token: token, + }; + + let serialized_team = serde_json::to_string(&team)?; + client + .set( + format!( + "{}{}", + team::TEAM_TOKEN_CACHE_PREFIX, + team.api_token.clone() + ), + serialized_team, + ) + .await?; + + Ok(team) +} + +pub fn setup_redis_client(url: Option) -> Arc { + let redis_url = match url { + Some(value) => value, + None => "redis://localhost:6379/".to_string(), + }; + let client = RedisClient::new(redis_url).expect("Failed to create redis client"); + Arc::new(client) +} diff --git a/feature-flags/src/v0_endpoint.rs b/feature-flags/src/v0_endpoint.rs index 8f77611..ba4bcef 100644 --- a/feature-flags/src/v0_endpoint.rs +++ b/feature-flags/src/v0_endpoint.rs @@ -33,7 +33,7 @@ use crate::{ )] #[debug_handler] pub async fn flags( - _state: State, + state: State, InsecureClientIp(ip): InsecureClientIp, meta: Query, headers: HeaderMap, @@ -59,21 +59,26 @@ pub async fn flags( .get("content-type") .map_or("", |v| v.to_str().unwrap_or("")) { - "application/x-www-form-urlencoded" => { - return Err(FlagError::RequestDecodingError(String::from( - "invalid form data", - ))); + "application/json" => { + tracing::Span::current().record("content_type", "application/json"); + FlagRequest::from_bytes(body) } ct => { - tracing::Span::current().record("content_type", ct); - - FlagRequest::from_bytes(body) + return Err(FlagError::RequestDecodingError(format!( + "unsupported content type: {}", + ct + ))); } }?; - let token = request.extract_and_verify_token()?; + let token = request + .extract_and_verify_token(state.redis.clone()) + .await?; + + let distinct_id = request.extract_distinct_id()?; tracing::Span::current().record("token", &token); + tracing::Span::current().record("distinct_id", &distinct_id); tracing::debug!("request: {:?}", request); diff --git a/feature-flags/src/v0_request.rs b/feature-flags/src/v0_request.rs index f2269df..63b26b4 100644 --- a/feature-flags/src/v0_request.rs +++ b/feature-flags/src/v0_request.rs @@ -1,11 +1,11 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::instrument; -use crate::api::FlagError; +use crate::{api::FlagError, redis::Client, team::Team}; #[derive(Deserialize, Default)] pub struct FlagsQueryParams { @@ -36,11 +36,8 @@ pub struct FlagRequest { } impl FlagRequest { - /// Takes a request payload and tries to decompress and unmarshall it. - /// While posthog-js sends a compression query param, a sizable portion of requests - /// fail due to it being missing when the body is compressed. - /// Instead of trusting the parameter, we peek at the payload's first three bytes to - /// detect gzip, fallback to uncompressed utf8 otherwise. + /// Takes a request payload and tries to read it. + /// Only supports base64 encoded payloads or uncompressed utf-8 as json. #[instrument(skip_all)] pub fn from_bytes(bytes: Bytes) -> Result { tracing::debug!(len = bytes.len(), "decoding new request"); @@ -54,15 +51,88 @@ impl FlagRequest { Ok(serde_json::from_str::(&payload)?) } - pub fn extract_and_verify_token(&self) -> Result { + pub async fn extract_and_verify_token( + &self, + redis_client: Arc, + ) -> Result { let token = match self { FlagRequest { token: Some(token), .. } => token.to_string(), _ => return Err(FlagError::NoTokenError), }; - // TODO: Get tokens from redis, confirm this one is valid - // validate_token(&token)?; + + // validate token + Team::from_redis(redis_client, token.clone()).await?; + + // TODO: fallback when token not found in redis + Ok(token) } + + pub fn extract_distinct_id(&self) -> Result { + let distinct_id = match &self.distinct_id { + None => return Err(FlagError::MissingDistinctId), + Some(id) => id, + }; + + match distinct_id.len() { + 0 => Err(FlagError::EmptyDistinctId), + 1..=200 => Ok(distinct_id.to_owned()), + _ => Ok(distinct_id.chars().take(200).collect()), + } + } +} + +#[cfg(test)] +mod tests { + use crate::api::FlagError; + use crate::v0_request::FlagRequest; + use bytes::Bytes; + use serde_json::json; + + #[test] + fn empty_distinct_id_not_accepted() { + let json = json!({ + "distinct_id": "", + "token": "my_token1", + }); + let bytes = Bytes::from(json.to_string()); + + let flag_payload = FlagRequest::from_bytes(bytes).expect("failed to parse request"); + + match flag_payload.extract_distinct_id() { + Err(FlagError::EmptyDistinctId) => (), + _ => panic!("expected empty distinct id error"), + }; + } + + #[test] + fn too_large_distinct_id_is_truncated() { + let json = json!({ + "distinct_id": std::iter::repeat("a").take(210).collect::(), + "token": "my_token1", + }); + let bytes = Bytes::from(json.to_string()); + + let flag_payload = FlagRequest::from_bytes(bytes).expect("failed to parse request"); + + assert_eq!(flag_payload.extract_distinct_id().unwrap().len(), 200); + } + + #[test] + fn distinct_id_is_returned_correctly() { + let json = json!({ + "$distinct_id": "alakazam", + "token": "my_token1", + }); + let bytes = Bytes::from(json.to_string()); + + let flag_payload = FlagRequest::from_bytes(bytes).expect("failed to parse request"); + + match flag_payload.extract_distinct_id() { + Ok(id) => assert_eq!(id, "alakazam"), + _ => panic!("expected distinct id"), + }; + } } diff --git a/feature-flags/tests/common.rs b/feature-flags/tests/common/mod.rs similarity index 76% rename from feature-flags/tests/common.rs rename to feature-flags/tests/common/mod.rs index f66a11f..c8644fe 100644 --- a/feature-flags/tests/common.rs +++ b/feature-flags/tests/common/mod.rs @@ -4,8 +4,7 @@ use std::string::ToString; use std::sync::Arc; use once_cell::sync::Lazy; -use rand::distributions::Alphanumeric; -use rand::Rng; +use reqwest::header::CONTENT_TYPE; use tokio::net::TcpListener; use tokio::sync::Notify; @@ -44,6 +43,21 @@ impl ServerHandle { client .post(format!("http://{:?}/flags", self.addr)) .body(body) + .header(CONTENT_TYPE, "application/json") + .send() + .await + .expect("failed to send request") + } + + pub async fn send_invalid_header_for_flags_request>( + &self, + body: T, + ) -> reqwest::Response { + let client = reqwest::Client::new(); + client + .post(format!("http://{:?}/flags", self.addr)) + .body(body) + .header(CONTENT_TYPE, "xyz") .send() .await .expect("failed to send request") @@ -55,12 +69,3 @@ impl Drop for ServerHandle { self.shutdown.notify_one() } } - -pub fn random_string(prefix: &str, length: usize) -> String { - let suffix: String = rand::thread_rng() - .sample_iter(Alphanumeric) - .take(length) - .map(char::from) - .collect(); - format!("{}_{}", prefix, suffix) -} diff --git a/feature-flags/tests/test_flags.rs b/feature-flags/tests/test_flags.rs index 82f41f0..2ceba24 100644 --- a/feature-flags/tests/test_flags.rs +++ b/feature-flags/tests/test_flags.rs @@ -5,14 +5,20 @@ use reqwest::StatusCode; use serde_json::{json, Value}; use crate::common::*; -mod common; + +use feature_flags::test_utils::{insert_new_team_in_redis, setup_redis_client}; + +pub mod common; #[tokio::test] async fn it_sends_flag_request() -> Result<()> { - let token = random_string("token", 16); + let config = DEFAULT_CONFIG.clone(); + let distinct_id = "user_distinct_id".to_string(); - let config = DEFAULT_CONFIG.clone(); + let client = setup_redis_client(Some(config.redis_url.clone())); + let team = insert_new_team_in_redis(client.clone()).await.unwrap(); + let token = team.api_token; let server = ServerHandle::for_config(config).await; @@ -41,3 +47,37 @@ async fn it_sends_flag_request() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn it_rejects_invalid_headers_flag_request() -> Result<()> { + let config = DEFAULT_CONFIG.clone(); + + let distinct_id = "user_distinct_id".to_string(); + + let client = setup_redis_client(Some(config.redis_url.clone())); + let team = insert_new_team_in_redis(client.clone()).await.unwrap(); + let token = team.api_token; + + let server = ServerHandle::for_config(config).await; + + let payload = json!({ + "token": token, + "distinct_id": distinct_id, + "groups": {"group1": "group1"} + }); + let res = server + .send_invalid_header_for_flags_request(payload.to_string()) + .await; + assert_eq!(StatusCode::BAD_REQUEST, res.status()); + + // We don't want to deserialize the data into a flagResponse struct here, + // because we want to assert the shape of the raw json data. + let response_text = res.text().await?; + + assert_eq!( + response_text, + "failed to decode request: unsupported content type: xyz" + ); + + Ok(()) +} From ff0780cf5badd14c5fdf9a84c27ee222120639cd Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 29 May 2024 16:26:11 +0200 Subject: [PATCH 3/7] capture: add broker rtt latency and timeout metrics (#44) --- capture/src/sinks/kafka.rs | 41 +++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index bc45fa1..bff61b5 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -36,12 +36,11 @@ impl rdkafka::ClientContext for KafkaContext { for (topic, stats) in stats.topics { gauge!( "capture_kafka_produce_avg_batch_size_bytes", - "topic" => topic.clone() + "topic" => topic.clone() ) .set(stats.batchsize.avg as f64); gauge!( "capture_kafka_produce_avg_batch_size_events", - "topic" => topic ) .set(stats.batchcnt.avg as f64); @@ -49,30 +48,58 @@ impl rdkafka::ClientContext for KafkaContext { for (_, stats) in stats.brokers { let id_string = format!("{}", stats.nodeid); + if let Some(rtt) = stats.rtt { + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p50", + "broker" => id_string.clone() + ) + .set(rtt.p50 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p90", + "broker" => id_string.clone() + ) + .set(rtt.p90 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p95", + "broker" => id_string.clone() + ) + .set(rtt.p95 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p99", + "broker" => id_string.clone() + ) + .set(rtt.p99 as f64); + } + gauge!( "capture_kafka_broker_requests_pending", - "broker" => id_string.clone() ) .set(stats.outbuf_cnt as f64); gauge!( "capture_kafka_broker_responses_awaiting", - "broker" => id_string.clone() ) .set(stats.waitresp_cnt as f64); counter!( "capture_kafka_broker_tx_errors_total", - "broker" => id_string.clone() ) .absolute(stats.txerrs); counter!( "capture_kafka_broker_rx_errors_total", - - "broker" => id_string + "broker" => id_string.clone() ) .absolute(stats.rxerrs); + counter!( + "capture_kafka_broker_request_timeouts", + "broker" => id_string + ) + .absolute(stats.req_timeouts); } } } From 8d69910b091cb2b9ff2ada1a2946b92804983c89 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 30 May 2024 15:59:20 +0200 Subject: [PATCH 4/7] capture: fix produce_rtt_latency metric unit (#46) --- capture/src/sinks/kafka.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index bff61b5..b82d3c3 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -50,25 +50,25 @@ impl rdkafka::ClientContext for KafkaContext { let id_string = format!("{}", stats.nodeid); if let Some(rtt) = stats.rtt { gauge!( - "capture_kafka_produce_rtt_latency_ms", + "capture_kafka_produce_rtt_latency_us", "quantile" => "p50", "broker" => id_string.clone() ) .set(rtt.p50 as f64); gauge!( - "capture_kafka_produce_rtt_latency_ms", + "capture_kafka_produce_rtt_latency_us", "quantile" => "p90", "broker" => id_string.clone() ) .set(rtt.p90 as f64); gauge!( - "capture_kafka_produce_rtt_latency_ms", + "capture_kafka_produce_rtt_latency_us", "quantile" => "p95", "broker" => id_string.clone() ) .set(rtt.p95 as f64); gauge!( - "capture_kafka_produce_rtt_latency_ms", + "capture_kafka_produce_rtt_latency_us", "quantile" => "p99", "broker" => id_string.clone() ) From 63db2a6c20182445d5d3cc71aff55fb099e6c60a Mon Sep 17 00:00:00 2001 From: Neil Kakkar Date: Thu, 30 May 2024 17:18:28 +0100 Subject: [PATCH 5/7] feat(flags): Extract flag definitions from redis (#42) --- feature-flags/src/flag_definitions.rs | 200 ++++++++++++++++++++++++++ feature-flags/src/lib.rs | 1 + feature-flags/src/test_utils.rs | 43 ++++++ 3 files changed, 244 insertions(+) create mode 100644 feature-flags/src/flag_definitions.rs diff --git a/feature-flags/src/flag_definitions.rs b/feature-flags/src/flag_definitions.rs new file mode 100644 index 0000000..29ec8d8 --- /dev/null +++ b/feature-flags/src/flag_definitions.rs @@ -0,0 +1,200 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tracing::instrument; + +use crate::{ + api::FlagError, + redis::{Client, CustomRedisError}, +}; + +// TRICKY: This cache data is coming from django-redis. If it ever goes out of sync, we'll bork. +// TODO: Add integration tests across repos to ensure this doesn't happen. +pub const TEAM_FLAGS_CACHE_PREFIX: &str = "posthog:1:team_feature_flags_"; + +// TODO: Hmm, revisit when dealing with groups, but seems like +// ideal to just treat it as a u8 and do our own validation on top +#[derive(Debug, Deserialize, Serialize)] +pub enum GroupTypeIndex {} + +#[derive(Debug, Deserialize, Serialize)] +pub enum OperatorType { + #[serde(rename = "exact")] + Exact, + #[serde(rename = "is_not")] + IsNot, + #[serde(rename = "icontains")] + Icontains, + #[serde(rename = "not_icontains")] + NotIcontains, + #[serde(rename = "regex")] + Regex, + #[serde(rename = "not_regex")] + NotRegex, + #[serde(rename = "gt")] + Gt, + #[serde(rename = "lt")] + Lt, + #[serde(rename = "gte")] + Gte, + #[serde(rename = "lte")] + Lte, + #[serde(rename = "is_set")] + IsSet, + #[serde(rename = "is_not_set")] + IsNotSet, + #[serde(rename = "is_date_exact")] + IsDateExact, + #[serde(rename = "is_date_after")] + IsDateAfter, + #[serde(rename = "is_date_before")] + IsDateBefore, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct PropertyFilter { + pub key: String, + pub value: serde_json::Value, + pub operator: Option, + #[serde(rename = "type")] + pub prop_type: String, + pub group_type_index: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct FlagGroupType { + pub properties: Option>, + pub rollout_percentage: Option, + pub variant: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct MultivariateFlagVariant { + pub key: String, + pub name: Option, + pub rollout_percentage: f32, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct MultivariateFlagOptions { + pub variants: Vec, +} + +// TODO: test name with https://www.fileformat.info/info/charset/UTF-16/list.htm values, like '𝖕𝖗𝖔𝖕𝖊𝖗𝖙𝖞': `𝓿𝓪𝓵𝓾𝓮` + +#[derive(Debug, Deserialize, Serialize)] +pub struct FlagFilters { + pub groups: Vec, + pub multivariate: Option, + pub aggregation_group_type_index: Option, + pub payloads: Option, + pub super_groups: Option>, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct FeatureFlag { + pub id: i64, + pub team_id: i64, + pub name: Option, + pub key: String, + pub filters: FlagFilters, + #[serde(default)] + pub deleted: bool, + #[serde(default)] + pub active: bool, + #[serde(default)] + pub ensure_experience_continuity: bool, +} + +#[derive(Debug, Deserialize, Serialize)] + +pub struct FeatureFlagList { + pub flags: Vec, +} + +impl FeatureFlagList { + /// Returns feature flags given a team_id + + #[instrument(skip_all)] + pub async fn from_redis( + client: Arc, + team_id: i64, + ) -> Result { + // TODO: Instead of failing here, i.e. if not in redis, fallback to pg + let serialized_flags = client + .get(format!("{TEAM_FLAGS_CACHE_PREFIX}{}", team_id)) + .await + .map_err(|e| match e { + CustomRedisError::NotFound => FlagError::TokenValidationError, + CustomRedisError::PickleError(_) => { + tracing::error!("failed to fetch data: {}", e); + println!("failed to fetch data: {}", e); + FlagError::DataParsingError + } + _ => { + tracing::error!("Unknown redis error: {}", e); + FlagError::RedisUnavailable + } + })?; + + let flags_list: Vec = + serde_json::from_str(&serialized_flags).map_err(|e| { + tracing::error!("failed to parse data to flags list: {}", e); + println!("failed to parse data: {}", e); + + FlagError::DataParsingError + })?; + + Ok(FeatureFlagList { flags: flags_list }) + } +} + +#[cfg(test)] +mod tests { + use rand::Rng; + + use super::*; + use crate::test_utils::{ + insert_flags_for_team_in_redis, insert_new_team_in_redis, setup_redis_client, + }; + + #[tokio::test] + async fn test_fetch_flags_from_redis() { + let client = setup_redis_client(None); + + let team = insert_new_team_in_redis(client.clone()).await.unwrap(); + + insert_flags_for_team_in_redis(client.clone(), team.id, None) + .await + .expect("Failed to insert flags"); + + let flags_from_redis = FeatureFlagList::from_redis(client.clone(), team.id) + .await + .unwrap(); + assert_eq!(flags_from_redis.flags.len(), 1); + let flag = flags_from_redis.flags.get(0).unwrap(); + assert_eq!(flag.key, "flag1"); + assert_eq!(flag.team_id, team.id); + assert_eq!(flag.filters.groups.len(), 1); + assert_eq!(flag.filters.groups[0].properties.as_ref().unwrap().len(), 1); + } + + #[tokio::test] + async fn test_fetch_invalid_team_from_redis() { + let client = setup_redis_client(None); + + match FeatureFlagList::from_redis(client.clone(), 1234).await { + Err(FlagError::TokenValidationError) => (), + _ => panic!("Expected TokenValidationError"), + }; + } + + #[tokio::test] + async fn test_cant_connect_to_redis_error_is_not_token_validation_error() { + let client = setup_redis_client(Some("redis://localhost:1111/".to_string())); + + match FeatureFlagList::from_redis(client.clone(), 1234).await { + Err(FlagError::RedisUnavailable) => (), + _ => panic!("Expected RedisUnavailable"), + }; + } +} diff --git a/feature-flags/src/lib.rs b/feature-flags/src/lib.rs index 195a55c..0352c21 100644 --- a/feature-flags/src/lib.rs +++ b/feature-flags/src/lib.rs @@ -1,5 +1,6 @@ pub mod api; pub mod config; +pub mod flag_definitions; pub mod redis; pub mod router; pub mod server; diff --git a/feature-flags/src/test_utils.rs b/feature-flags/src/test_utils.rs index 75db86d..0cefb7e 100644 --- a/feature-flags/src/test_utils.rs +++ b/feature-flags/src/test_utils.rs @@ -1,7 +1,9 @@ use anyhow::Error; +use serde_json::json; use std::sync::Arc; use crate::{ + flag_definitions, redis::{Client, RedisClient}, team::{self, Team}, }; @@ -40,6 +42,47 @@ pub async fn insert_new_team_in_redis(client: Arc) -> Result, + team_id: i64, + json_value: Option, +) -> Result<(), Error> { + let payload = match json_value { + Some(value) => value, + None => json!([{ + "id": 1, + "key": "flag1", + "name": "flag1 description", + "active": true, + "deleted": false, + "team_id": team_id, + "filters": { + "groups": [ + { + "properties": [ + { + "key": "email", + "value": "a@b.com", + "type": "person", + }, + ] + }, + ], + }, + }]) + .to_string(), + }; + + client + .set( + format!("{}{}", flag_definitions::TEAM_FLAGS_CACHE_PREFIX, team_id), + payload, + ) + .await?; + + Ok(()) +} + pub fn setup_redis_client(url: Option) -> Arc { let redis_url = match url { Some(value) => value, From cf302723ef9e0e2abc8e32307c8e9e7c3072a407 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 6 Jun 2024 16:31:06 +0200 Subject: [PATCH 6/7] capture: subdivide process_events_error cause (#48) --- capture/src/v0_endpoint.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/capture/src/v0_endpoint.rs b/capture/src/v0_endpoint.rs index 3849e29..ff4b90f 100644 --- a/capture/src/v0_endpoint.rs +++ b/capture/src/v0_endpoint.rs @@ -150,7 +150,14 @@ pub async fn event( tracing::debug!(context=?context, events=?events, "decoded request"); if let Err(err) = process_events(state.sink.clone(), &events, &context).await { - report_dropped_events("process_events_error", events.len() as u64); + let cause = match err { + // TODO: automate this with a macro + CaptureError::EmptyDistinctId => "empty_distinct_id", + CaptureError::MissingDistinctId => "missing_distinct_id", + CaptureError::MissingEventName => "missing_event_name", + _ => "process_events_error", + }; + report_dropped_events(cause, events.len() as u64); tracing::log::warn!("rejected invalid payload: {}", err); return Err(err); } From f28466a0f9699441109060ba1ba4e7baf3705a8f Mon Sep 17 00:00:00 2001 From: Neil Kakkar Date: Mon, 10 Jun 2024 10:12:32 +0100 Subject: [PATCH 7/7] feat(flags): Match flags on rollout percentage (#45) --- Cargo.lock | 1 + feature-flags/Cargo.toml | 1 + feature-flags/src/flag_definitions.rs | 81 +- feature-flags/src/flag_matching.rs | 161 +++ feature-flags/src/lib.rs | 1 + feature-flags/src/team.rs | 1 + feature-flags/src/test_utils.rs | 35 +- .../tests/test_flag_matching_consistency.rs | 1209 +++++++++++++++++ 8 files changed, 1454 insertions(+), 36 deletions(-) create mode 100644 feature-flags/src/flag_matching.rs create mode 100644 feature-flags/tests/test_flag_matching_consistency.rs diff --git a/Cargo.lock b/Cargo.lock index 8642ade..b9f226b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -709,6 +709,7 @@ dependencies = [ "serde", "serde-pickle", "serde_json", + "sha1", "thiserror", "tokio", "tracing", diff --git a/feature-flags/Cargo.toml b/feature-flags/Cargo.toml index 1e0c111..4993930 100644 --- a/feature-flags/Cargo.toml +++ b/feature-flags/Cargo.toml @@ -25,6 +25,7 @@ serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } serde-pickle = { version = "1.1.1"} +sha1 = "0.10.6" [lints] workspace = true diff --git a/feature-flags/src/flag_definitions.rs b/feature-flags/src/flag_definitions.rs index 29ec8d8..1f4582c 100644 --- a/feature-flags/src/flag_definitions.rs +++ b/feature-flags/src/flag_definitions.rs @@ -1,4 +1,4 @@ -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use std::sync::Arc; use tracing::instrument; @@ -13,44 +13,30 @@ pub const TEAM_FLAGS_CACHE_PREFIX: &str = "posthog:1:team_feature_flags_"; // TODO: Hmm, revisit when dealing with groups, but seems like // ideal to just treat it as a u8 and do our own validation on top -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize)] pub enum GroupTypeIndex {} -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum OperatorType { - #[serde(rename = "exact")] Exact, - #[serde(rename = "is_not")] IsNot, - #[serde(rename = "icontains")] Icontains, - #[serde(rename = "not_icontains")] NotIcontains, - #[serde(rename = "regex")] Regex, - #[serde(rename = "not_regex")] NotRegex, - #[serde(rename = "gt")] Gt, - #[serde(rename = "lt")] Lt, - #[serde(rename = "gte")] Gte, - #[serde(rename = "lte")] Lte, - #[serde(rename = "is_set")] IsSet, - #[serde(rename = "is_not_set")] IsNotSet, - #[serde(rename = "is_date_exact")] IsDateExact, - #[serde(rename = "is_date_after")] IsDateAfter, - #[serde(rename = "is_date_before")] IsDateBefore, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct PropertyFilter { pub key: String, pub value: serde_json::Value, @@ -60,28 +46,28 @@ pub struct PropertyFilter { pub group_type_index: Option, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct FlagGroupType { pub properties: Option>, - pub rollout_percentage: Option, + pub rollout_percentage: Option, pub variant: Option, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct MultivariateFlagVariant { pub key: String, pub name: Option, - pub rollout_percentage: f32, + pub rollout_percentage: f64, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct MultivariateFlagOptions { pub variants: Vec, } // TODO: test name with https://www.fileformat.info/info/charset/UTF-16/list.htm values, like '𝖕𝖗𝖔𝖕𝖊𝖗𝖙𝖞': `𝓿𝓪𝓵𝓾𝓮` -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct FlagFilters { pub groups: Vec, pub multivariate: Option, @@ -90,7 +76,7 @@ pub struct FlagFilters { pub super_groups: Option>, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] pub struct FeatureFlag { pub id: i64, pub team_id: i64, @@ -105,15 +91,31 @@ pub struct FeatureFlag { pub ensure_experience_continuity: bool, } -#[derive(Debug, Deserialize, Serialize)] +impl FeatureFlag { + pub fn get_group_type_index(&self) -> Option { + self.filters.aggregation_group_type_index + } + + pub fn get_conditions(&self) -> &Vec { + &self.filters.groups + } + + pub fn get_variants(&self) -> Vec { + self.filters + .multivariate + .clone() + .map_or(vec![], |m| m.variants) + } +} + +#[derive(Debug, Deserialize)] pub struct FeatureFlagList { pub flags: Vec, } impl FeatureFlagList { - /// Returns feature flags given a team_id - + /// Returns feature flags from redis given a team_id #[instrument(skip_all)] pub async fn from_redis( client: Arc, @@ -126,6 +128,8 @@ impl FeatureFlagList { .map_err(|e| match e { CustomRedisError::NotFound => FlagError::TokenValidationError, CustomRedisError::PickleError(_) => { + // TODO: Implement From trait for FlagError so we don't need to map + // CustomRedisError ourselves tracing::error!("failed to fetch data: {}", e); println!("failed to fetch data: {}", e); FlagError::DataParsingError @@ -150,8 +154,6 @@ impl FeatureFlagList { #[cfg(test)] mod tests { - use rand::Rng; - use super::*; use crate::test_utils::{ insert_flags_for_team_in_redis, insert_new_team_in_redis, setup_redis_client, @@ -161,7 +163,9 @@ mod tests { async fn test_fetch_flags_from_redis() { let client = setup_redis_client(None); - let team = insert_new_team_in_redis(client.clone()).await.unwrap(); + let team = insert_new_team_in_redis(client.clone()) + .await + .expect("Failed to insert team"); insert_flags_for_team_in_redis(client.clone(), team.id, None) .await @@ -169,13 +173,20 @@ mod tests { let flags_from_redis = FeatureFlagList::from_redis(client.clone(), team.id) .await - .unwrap(); + .expect("Failed to fetch flags from redis"); assert_eq!(flags_from_redis.flags.len(), 1); - let flag = flags_from_redis.flags.get(0).unwrap(); + let flag = flags_from_redis.flags.get(0).expect("Empty flags in redis"); assert_eq!(flag.key, "flag1"); assert_eq!(flag.team_id, team.id); assert_eq!(flag.filters.groups.len(), 1); - assert_eq!(flag.filters.groups[0].properties.as_ref().unwrap().len(), 1); + assert_eq!( + flag.filters.groups[0] + .properties + .as_ref() + .expect("Properties don't exist on flag") + .len(), + 1 + ); } #[tokio::test] diff --git a/feature-flags/src/flag_matching.rs b/feature-flags/src/flag_matching.rs new file mode 100644 index 0000000..c59b594 --- /dev/null +++ b/feature-flags/src/flag_matching.rs @@ -0,0 +1,161 @@ +use crate::flag_definitions::{FeatureFlag, FlagGroupType}; +use sha1::{Digest, Sha1}; +use std::fmt::Write; + +#[derive(Debug, PartialEq, Eq)] +pub struct FeatureFlagMatch { + pub matches: bool, + pub variant: Option, + //reason + //condition_index + //payload +} + +// TODO: Rework FeatureFlagMatcher - python has a pretty awkward interface, where we pass in all flags, and then again +// the flag to match. I don't think there's any reason anymore to store the flags in the matcher, since we can just +// pass the flag to match directly to the get_match method. This will also make the matcher more stateless. +// Potentially, we could also make the matcher a long-lived object, with caching for group keys and such. +// It just takes in the flag and distinct_id and returns the match... +// Or, make this fully stateless +// and have a separate cache struct for caching group keys, cohort definitions, etc. - and check size, if we can keep it in memory +// for all teams. If not, we can have a LRU cache, or a cache that stores only the most recent N keys. +// But, this can be a future refactor, for now just focusing on getting the basic matcher working, write lots and lots of tests +// and then we can easily refactor stuff around. +#[derive(Debug)] +pub struct FeatureFlagMatcher { + // pub flags: Vec, + pub distinct_id: String, +} + +const LONG_SCALE: u64 = 0xfffffffffffffff; + +impl FeatureFlagMatcher { + pub fn new(distinct_id: String) -> Self { + FeatureFlagMatcher { + // flags, + distinct_id, + } + } + + pub fn get_match(&self, feature_flag: &FeatureFlag) -> FeatureFlagMatch { + if self.hashed_identifier(feature_flag).is_none() { + return FeatureFlagMatch { + matches: false, + variant: None, + }; + } + + // TODO: super groups for early access + // TODO: Variant overrides condition sort + + for (index, condition) in feature_flag.get_conditions().iter().enumerate() { + let (is_match, _evaluation_reason) = + self.is_condition_match(feature_flag, condition, index); + + if is_match { + // TODO: This is a bit awkward, we should handle overrides only when variants exist. + let variant = match condition.variant.clone() { + Some(variant_override) => { + if feature_flag + .get_variants() + .iter() + .any(|v| v.key == variant_override) + { + Some(variant_override) + } else { + self.get_matching_variant(feature_flag) + } + } + None => self.get_matching_variant(feature_flag), + }; + + // let payload = self.get_matching_payload(is_match, variant, feature_flag); + return FeatureFlagMatch { + matches: true, + variant, + }; + } + } + FeatureFlagMatch { + matches: false, + variant: None, + } + } + + pub fn is_condition_match( + &self, + feature_flag: &FeatureFlag, + condition: &FlagGroupType, + _index: usize, + ) -> (bool, String) { + let rollout_percentage = condition.rollout_percentage.unwrap_or(100.0); + let mut condition_match = true; + if condition.properties.is_some() { + // TODO: Handle matching conditions + if !condition.properties.as_ref().unwrap().is_empty() { + condition_match = false; + } + } + + if !condition_match { + return (false, "NO_CONDITION_MATCH".to_string()); + } else if rollout_percentage == 100.0 { + // TODO: Check floating point schenanigans if any + return (true, "CONDITION_MATCH".to_string()); + } + + if self.get_hash(feature_flag, "") > (rollout_percentage / 100.0) { + return (false, "OUT_OF_ROLLOUT_BOUND".to_string()); + } + + (true, "CONDITION_MATCH".to_string()) + } + + pub fn hashed_identifier(&self, feature_flag: &FeatureFlag) -> Option { + if feature_flag.get_group_type_index().is_none() { + // TODO: Use hash key overrides for experience continuity + Some(self.distinct_id.clone()) + } else { + // TODO: Handle getting group key + Some("".to_string()) + } + } + + /// This function takes a identifier and a feature flag key and returns a float between 0 and 1. + /// Given the same identifier and key, it'll always return the same float. These floats are + /// uniformly distributed between 0 and 1, so if we want to show this feature to 20% of traffic + /// we can do _hash(key, identifier) < 0.2 + pub fn get_hash(&self, feature_flag: &FeatureFlag, salt: &str) -> f64 { + // check if hashed_identifier is None + let hashed_identifier = self + .hashed_identifier(feature_flag) + .expect("hashed_identifier is None when computing hash"); + let hash_key = format!("{}.{}{}", feature_flag.key, hashed_identifier, salt); + let mut hasher = Sha1::new(); + hasher.update(hash_key.as_bytes()); + let result = hasher.finalize(); + // :TRICKY: Convert the first 15 characters of the digest to a hexadecimal string + // not sure if this is correct, padding each byte as 2 characters + let hex_str: String = result.iter().fold(String::new(), |mut acc, byte| { + let _ = write!(acc, "{:02x}", byte); + acc + })[..15] + .to_string(); + let hash_val = u64::from_str_radix(&hex_str, 16).unwrap(); + + hash_val as f64 / LONG_SCALE as f64 + } + + pub fn get_matching_variant(&self, feature_flag: &FeatureFlag) -> Option { + let hash = self.get_hash(feature_flag, "variant"); + let mut total_percentage = 0.0; + + for variant in feature_flag.get_variants() { + total_percentage += variant.rollout_percentage / 100.0; + if hash < total_percentage { + return Some(variant.key.clone()); + } + } + None + } +} diff --git a/feature-flags/src/lib.rs b/feature-flags/src/lib.rs index 0352c21..edc2a29 100644 --- a/feature-flags/src/lib.rs +++ b/feature-flags/src/lib.rs @@ -1,6 +1,7 @@ pub mod api; pub mod config; pub mod flag_definitions; +pub mod flag_matching; pub mod redis; pub mod router; pub mod server; diff --git a/feature-flags/src/team.rs b/feature-flags/src/team.rs index ac62ea9..e872aa4 100644 --- a/feature-flags/src/team.rs +++ b/feature-flags/src/team.rs @@ -42,6 +42,7 @@ impl Team { } })?; + // TODO: Consider an LRU cache for teams as well, with small TTL to skip redis/pg lookups let team: Team = serde_json::from_str(&serialized_team).map_err(|e| { tracing::error!("failed to parse data to team: {}", e); FlagError::DataParsingError diff --git a/feature-flags/src/test_utils.rs b/feature-flags/src/test_utils.rs index 0cefb7e..92bc8a4 100644 --- a/feature-flags/src/test_utils.rs +++ b/feature-flags/src/test_utils.rs @@ -3,7 +3,7 @@ use serde_json::json; use std::sync::Arc; use crate::{ - flag_definitions, + flag_definitions::{self, FeatureFlag}, redis::{Client, RedisClient}, team::{self, Team}, }; @@ -91,3 +91,36 @@ pub fn setup_redis_client(url: Option) -> Arc { let client = RedisClient::new(redis_url).expect("Failed to create redis client"); Arc::new(client) } + +pub fn create_flag_from_json(json_value: Option) -> Vec { + let payload = match json_value { + Some(value) => value, + None => json!([{ + "id": 1, + "key": "flag1", + "name": "flag1 description", + "active": true, + "deleted": false, + "team_id": 1, + "filters": { + "groups": [ + { + "properties": [ + { + "key": "email", + "value": "a@b.com", + "type": "person", + }, + ], + "rollout_percentage": 50, + }, + ], + }, + }]) + .to_string(), + }; + + let flags: Vec = + serde_json::from_str(&payload).expect("Failed to parse data to flags list"); + flags +} diff --git a/feature-flags/tests/test_flag_matching_consistency.rs b/feature-flags/tests/test_flag_matching_consistency.rs new file mode 100644 index 0000000..4a24b0e --- /dev/null +++ b/feature-flags/tests/test_flag_matching_consistency.rs @@ -0,0 +1,1209 @@ +/// These tests are common between all libraries doing local evaluation of feature flags. +/// This ensures there are no mismatches between implementations. +use feature_flags::flag_matching::{FeatureFlagMatch, FeatureFlagMatcher}; + +use feature_flags::test_utils::create_flag_from_json; +use serde_json::json; + +#[test] +fn it_is_consistent_with_rollout_calculation_for_simple_flags() { + let flags = create_flag_from_json(Some( + json!([{ + "id": 1, + "key": "simple-flag", + "name": "Simple flag", + "active": true, + "deleted": false, + "team_id": 1, + "filters": { + "groups": [ + { + "properties": [], + "rollout_percentage": 45, + }, + ], + }, + }]) + .to_string(), + )); + + let results = vec![ + false, true, true, false, true, false, false, true, false, true, false, true, true, false, + true, false, false, false, true, true, false, true, false, false, true, false, true, true, + false, false, false, true, true, true, true, false, false, false, false, false, false, + true, true, false, true, true, false, false, false, true, true, false, false, false, false, + true, false, true, false, true, false, true, true, false, true, false, true, false, true, + true, false, false, true, false, false, true, false, true, false, false, true, false, + false, false, true, true, false, true, true, false, true, true, true, true, true, false, + true, true, false, false, true, true, true, true, false, false, true, false, true, true, + true, false, false, false, false, false, true, false, false, true, true, true, false, + false, true, false, true, false, false, true, false, false, false, false, false, false, + false, false, true, true, false, false, true, false, false, true, true, false, false, true, + false, true, false, true, true, true, false, false, false, true, false, false, false, + false, true, true, false, true, true, false, true, false, true, true, false, true, false, + true, true, true, false, true, false, false, true, true, false, true, false, true, true, + false, false, true, true, true, true, false, true, true, false, false, true, false, true, + false, false, true, true, false, true, false, true, false, false, false, false, false, + false, false, true, false, true, true, false, false, true, false, true, false, false, + false, true, false, true, false, false, false, true, false, false, true, false, true, true, + false, false, false, false, true, false, false, false, false, false, false, false, false, + false, false, false, false, false, true, true, false, true, false, true, true, false, true, + false, true, false, false, false, true, true, true, true, false, false, false, false, + false, true, true, true, false, false, true, true, false, false, false, false, false, true, + false, true, true, true, true, false, true, true, true, false, false, true, false, true, + false, false, true, true, true, false, true, false, false, false, true, true, false, true, + false, true, false, true, true, true, true, true, false, false, true, false, true, false, + true, true, true, false, true, false, true, true, false, true, true, true, true, true, + false, false, false, false, false, true, false, true, false, false, true, true, false, + false, false, true, false, true, true, true, true, false, false, false, false, true, true, + false, false, true, true, false, true, true, true, true, false, true, true, true, false, + false, true, true, false, false, true, false, false, true, false, false, false, false, + false, false, false, false, false, false, true, true, false, false, true, false, false, + true, false, true, false, false, true, false, false, false, false, false, false, true, + false, false, false, false, false, false, false, false, false, true, true, true, false, + false, false, true, false, true, false, false, false, true, false, false, false, false, + false, false, false, true, false, false, false, false, false, false, false, false, true, + false, true, false, true, true, true, false, false, false, true, true, true, false, true, + false, true, true, false, false, false, true, false, false, false, false, true, false, + true, false, true, true, false, true, false, false, false, true, false, false, true, true, + false, true, false, false, false, false, false, false, true, true, false, false, true, + false, false, true, true, true, false, false, false, true, false, false, false, false, + true, false, true, false, false, false, true, false, true, true, false, true, false, true, + false, true, false, false, true, false, false, true, false, true, false, true, false, true, + false, false, true, true, true, true, false, true, false, false, false, false, false, true, + false, false, true, false, false, true, true, false, false, false, false, true, true, true, + false, false, true, false, false, true, true, true, true, false, false, false, true, false, + false, false, true, false, false, true, true, true, true, false, false, true, true, false, + true, false, true, false, false, true, true, false, true, true, true, true, false, false, + true, false, false, true, true, false, true, false, true, false, false, true, false, false, + false, false, true, true, true, false, true, false, false, true, false, false, true, false, + false, false, false, true, false, true, false, true, true, false, false, true, false, true, + true, true, false, false, false, false, true, true, false, true, false, false, false, true, + false, false, false, false, true, true, true, false, false, false, true, true, true, true, + false, true, true, false, true, true, true, false, true, false, false, true, false, true, + true, true, true, false, true, false, true, false, true, false, false, true, true, false, + false, true, false, true, false, false, false, false, true, false, true, false, false, + false, true, true, true, false, false, false, true, false, true, true, false, false, false, + false, false, true, false, true, false, false, true, true, false, true, true, true, true, + false, false, true, false, false, true, false, true, false, true, true, false, false, + false, true, false, true, true, false, false, false, true, false, true, false, true, true, + false, true, false, false, true, false, false, false, true, true, true, false, false, + false, false, false, true, false, false, true, true, true, true, true, false, false, false, + false, false, false, false, false, true, true, true, false, false, true, true, false, true, + true, false, true, false, true, false, false, false, true, false, false, true, false, + false, true, true, true, true, false, false, true, false, true, true, false, false, true, + false, false, true, true, false, true, false, false, true, true, true, false, false, false, + false, false, true, false, true, false, false, false, false, false, true, true, false, + true, true, true, false, false, false, false, true, true, true, true, false, true, true, + false, true, false, true, false, true, false, false, false, false, true, true, true, true, + false, false, true, false, true, true, false, false, false, false, false, false, true, + false, true, false, true, true, false, false, true, true, true, true, false, false, true, + false, true, true, false, false, true, true, true, false, true, false, false, true, true, + false, false, false, true, false, false, true, false, false, false, true, true, true, true, + false, true, false, true, false, true, false, true, false, false, true, false, false, true, + false, true, true, + ]; + + for i in 0..1000 { + let distinct_id = format!("distinct_id_{}", i); + + let feature_flag_match = FeatureFlagMatcher::new(distinct_id).get_match(&flags[0]); + + if results[i] { + assert_eq!( + feature_flag_match, + FeatureFlagMatch { + matches: true, + variant: None, + } + ); + } else { + assert_eq!( + feature_flag_match, + FeatureFlagMatch { + matches: false, + variant: None, + } + ); + } + } +} + +#[test] +fn it_is_consistent_with_rollout_calculation_for_multivariate_flags() { + let flags = create_flag_from_json(Some( + json!([{ + "id": 1, + "key": "multivariate-flag", + "name": "Multivariate flag", + "active": true, + "deleted": false, + "team_id": 1, + "filters": { + "groups": [ + { + "properties": [], + "rollout_percentage": 55, + }, + ], + "multivariate": { + "variants": [ + { + "key": "first-variant", + "name": "First Variant", + "rollout_percentage": 50, + }, + { + "key": "second-variant", + "name": "Second Variant", + "rollout_percentage": 20, + }, + { + "key": "third-variant", + "name": "Third Variant", + "rollout_percentage": 20, + }, + { + "key": "fourth-variant", + "name": "Fourth Variant", + "rollout_percentage": 5, + }, + { + "key": "fifth-variant", + "name": "Fifth Variant", + "rollout_percentage": 5, + }, + ], + }, + }, + }]) + .to_string(), + )); + + let results = vec![ + Some("second-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("fourth-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + None, + None, + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + Some("fifth-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("fourth-variant".to_string()), + None, + None, + None, + None, + Some("third-variant".to_string()), + None, + None, + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("second-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + None, + None, + Some("second-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("second-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("fifth-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + Some("third-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + None, + Some("third-variant".to_string()), + None, + None, + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + Some("fourth-variant".to_string()), + Some("fourth-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("fifth-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("second-variant".to_string()), + Some("fifth-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + None, + Some("third-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("fifth-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("fourth-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("fifth-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("fourth-variant".to_string()), + Some("fifth-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + None, + Some("third-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + None, + None, + Some("third-variant".to_string()), + Some("fourth-variant".to_string()), + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + None, + Some("fourth-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("fourth-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("fourth-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("fourth-variant".to_string()), + None, + None, + None, + Some("fourth-variant".to_string()), + None, + None, + Some("third-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + None, + None, + Some("fifth-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + None, + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("third-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + None, + None, + None, + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + None, + None, + None, + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + Some("second-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + None, + None, + None, + None, + None, + Some("fourth-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("fifth-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("fifth-variant".to_string()), + None, + None, + Some("third-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("fifth-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("third-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + None, + Some("second-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("fifth-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + None, + None, + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("second-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + Some("second-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("second-variant".to_string()), + None, + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("fourth-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("fifth-variant".to_string()), + None, + None, + None, + Some("second-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("second-variant".to_string()), + None, + None, + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("third-variant".to_string()), + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + None, + None, + Some("first-variant".to_string()), + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("third-variant".to_string()), + Some("second-variant".to_string()), + Some("third-variant".to_string()), + None, + None, + Some("third-variant".to_string()), + Some("first-variant".to_string()), + None, + Some("first-variant".to_string()), + ]; + + for i in 0..1000 { + let distinct_id = format!("distinct_id_{}", i); + + let feature_flag_match = FeatureFlagMatcher::new(distinct_id).get_match(&flags[0]); + + if results[i].is_some() { + assert_eq!( + feature_flag_match, + FeatureFlagMatch { + matches: true, + variant: results[i].clone(), + } + ); + } else { + assert_eq!( + feature_flag_match, + FeatureFlagMatch { + matches: false, + variant: None, + } + ); + } + } +}