diff --git a/asap-common/.gitignore b/asap-common/.gitignore index 102b6eac..a3b80cf2 100644 --- a/asap-common/.gitignore +++ b/asap-common/.gitignore @@ -5,6 +5,7 @@ .vscode/ dependencies/py/promql_utilities/promql_utilities.egg-info/ +dependencies/py/promql_utilities/build/ dependencies/rs/**/target/ tests/**/*.json diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs index 4e717a98..a6f593a1 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs @@ -496,6 +496,65 @@ mod tests { ); } + // ── ClickHouse parametric syntax + explicit BETWEEN timestamps ──────────── + // These verify that a fully ClickHouse-compatible query (no DATEADD, no NOW()) + // is parseable by ASAP: quantile(q)(col) + BETWEEN 'start' AND 'end'. + + #[test] + fn test_clickhouse_explicit_datetime_temporal_quantile() { + check_query( + "SELECT quantile(0.95)(value) FROM cpu_usage WHERE time BETWEEN '2025-10-01 00:00:00' AND '2025-10-01 00:00:10' GROUP BY L1, L2, L3, L4", + vec![QueryType::TemporalQuantile], + None, + ); + } + + #[test] + // ASAP-only: parse_datetime accepts the Z suffix (interprets as UTC), but ClickHouse + // rejects it with TYPE_MISMATCH when comparing against a DateTime column. + // Do not use Z-suffix strings in queries intended for both systems. + fn test_asap_only_iso_z_temporal_quantile() { + check_query( + "SELECT quantile(0.95)(value) FROM cpu_usage WHERE time BETWEEN '2025-10-01T00:00:00Z' AND '2025-10-01T00:00:10Z' GROUP BY L1, L2, L3, L4", + vec![QueryType::TemporalQuantile], + None, + ); + } + + #[test] + // Both ASAP (parse_datetime) and ClickHouse treat ISO-without-Z as local server time. + // They agree only when running in the same timezone; prefer 'YYYY-MM-DD HH:MM:SS' + // (space format) to avoid this implicit dependency. + fn test_iso_no_z_treated_as_local_time_temporal_quantile() { + check_query( + "SELECT quantile(0.95)(value) FROM cpu_usage WHERE time BETWEEN '2025-10-01T00:00:00' AND '2025-10-01T00:00:10' GROUP BY L1, L2, L3, L4", + vec![QueryType::TemporalQuantile], + None, + ); + } + + #[test] + fn test_clickhouse_explicit_datetime_spatial_quantile() { + check_query( + "SELECT quantile(0.95)(value) FROM cpu_usage WHERE time BETWEEN '2025-10-01 00:00:00' AND '2025-10-01 00:00:01' GROUP BY L1", + vec![QueryType::Spatial], + None, + ); + } + + #[test] + fn test_clickhouse_explicit_matches_now_template() { + // A ClickHouse-style query (explicit timestamps, parametric quantile) must + // match a stored DATEADD(NOW()) template of the same shape. + let template = parse_sql_query( + "SELECT quantile(0.95)(value) FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() GROUP BY L1, L2, L3, L4" + ).unwrap(); + let incoming = parse_sql_query( + "SELECT quantile(0.95)(value) FROM cpu_usage WHERE time BETWEEN '2025-10-01 00:00:00' AND '2025-10-01 00:00:10' GROUP BY L1, L2, L3, L4" + ).unwrap(); + assert!(incoming.matches_sql_pattern(&template)); + } + // ── Error cases ────────────────────────────────────────────────────────── #[test] diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs index 5a952e96..4b653ec2 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs @@ -325,6 +325,11 @@ impl SQLPatternParser { } fn get_timestamp_from_datetime_str(datetime_str: &str) -> Option { + // parse_datetime treats timezone-naive strings (e.g. "2025-10-01 00:00:00", + // "2025-10-01T00:00:00") as local server time, matching ClickHouse's behavior — + // but only when both run in the same timezone. Z-suffix strings (e.g. + // "2025-10-01T00:00:00Z") are interpreted as UTC here but rejected by ClickHouse. + // Use space-format datetime strings ("YYYY-MM-DD HH:MM:SS") for portability. let parsed_datetime = parse_datetime(datetime_str).ok()?; Some(parsed_datetime.timestamp().as_second() as f64) } diff --git a/asap-query-engine/.gitignore b/asap-query-engine/.gitignore index eb5a316c..5c63ba3f 100644 --- a/asap-query-engine/.gitignore +++ b/asap-query-engine/.gitignore @@ -1 +1,2 @@ target +output/ diff --git a/asap-summary-ingest/.gitignore b/asap-summary-ingest/.gitignore index f7ee054e..49407f65 100644 --- a/asap-summary-ingest/.gitignore +++ b/asap-summary-ingest/.gitignore @@ -1,3 +1,4 @@ __pycache__ **/*.pyc **/*.swp +outputs/ diff --git a/asap-tools/execution-utilities/.gitignore b/asap-tools/execution-utilities/.gitignore index c8760c7f..7704e7d1 100644 --- a/asap-tools/execution-utilities/.gitignore +++ b/asap-tools/execution-utilities/.gitignore @@ -7,6 +7,9 @@ clickhouse-benchmark-pipeline/benchmark_results/ **/data/ +benchmark/arroyo_outputs/ +benchmark/queries/ +benchmark/results/ **/*.csv **/*.png diff --git a/asap-tools/execution-utilities/benchmark/README.md b/asap-tools/execution-utilities/benchmark/README.md new file mode 100644 index 00000000..9ee62f77 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/README.md @@ -0,0 +1,417 @@ +# ASAP Generalized Benchmark Pipeline + +Measures ASAP query latency (KLL sketch) against ClickHouse baseline for +arbitrary datasets. Supports ClickBench and H2O groupby out of the box. + +## Architecture + +``` +data_file → prepare_data.py → arroyo_file.json + ↓ + export_to_arroyo.py (file source) + ↓ + sketch_topic (Kafka) + ↓ + QueryEngineRust :8088 + ↓ +data_file → export_to_database.py run_benchmark.py → results/ + ↓ + ClickHouse :8123 (baseline) +``` + +**Key difference from the old pipeline:** Arroyo reads directly from a local +file (`single_file_custom` connector) rather than from a Kafka input topic. +Kafka is still required for the **sketch output** topic (`sketch_topic`). + +--- + +## Prerequisites + +```bash +export INSTALL_DIR=/scratch/sketch_db_for_prometheus +pip3 install --user -r requirements.txt + +# Build binaries (one-time) +cd ~/ASAPQuery/asap-query-engine && cargo build --release +``` + +> **UTC requirement:** Both ASAP and ClickHouse must run in UTC so that bare +> datetime strings (`'YYYY-MM-DD HH:MM:SS'`) are interpreted identically by both +> systems. Set `TZ=UTC` in the environment for ASAP processes and ensure +> ClickHouse's `timezone` config is set to `UTC`. If the two systems run in +> different timezones, queries will target different time windows on each side. + +--- + +## ClickBench + ClickHouse End-to-End Example + +### Step 1 — Download dataset + +```bash +cd ~/ASAPQuery/asap-tools/execution-utilities/benchmark +python download_dataset.py --dataset clickbench --output-dir ./data +``` + +Optionally limit to 1M rows: + +```bash +cd ./data +mv hits.json.gz hits_full.json.gz +zcat hits_full.json.gz | head -n 1000000 | gzip > hits.json.gz +``` + +### Step 2 — Prepare data for Arroyo file source + +The Arroyo file source requires RFC3339 timestamps and string metadata columns. +This step converts the raw ClickBench JSON: + +```bash +python prepare_data.py \ + --dataset clickbench \ + --input ./data/hits.json.gz \ + --output ./data/hits_arroyo.json \ + --max-rows 1000000 +``` + +This produces `hits_arroyo.json` with: +- `EventTime` converted from `"2013-07-14 20:38:47"` → `"2013-07-14T20:38:47Z"` +- `RegionID`, `OS`, `UserAgent`, `TraficSourceID` as strings +- Records sorted by `EventTime` + +### Step 3 — Start infrastructure + +```bash +# Kafka +~/ASAPQuery/asap-tools/installation/kafka/run.sh $INSTALL_DIR/kafka + +# Create sketch output topic +KAFKA=$INSTALL_DIR/kafka/bin +$KAFKA/kafka-topics.sh --bootstrap-server localhost:9092 --create \ + --topic sketch_topic --partitions 1 --replication-factor 1 \ + --config max.message.bytes=20971520 + +# ClickHouse +~/ASAPQuery/asap-tools/installation/clickhouse/run.sh $INSTALL_DIR +``` + +### Step 4 — Start Arroyo cluster + +```bash +~/ASAPQuery/asap-summary-ingest/target/release/arroyo \ + --config ~/ASAPQuery/asap-summary-ingest/config.yaml cluster \ + > /tmp/arroyo.log 2>&1 & +``` + +### Step 5 — Generate queries and configs + +```bash +python generate_queries.py \ + --table-name hits \ + --ts-column EventTime \ + --value-column ResolutionWidth \ + --group-by-columns RegionID,OS,UserAgent,TraficSourceID \ + --window-size 10 \ + --num-queries 50 \ + --window-form dateadd \ + --generate-configs \ + --auto-detect-timestamps \ + --data-file ./data/hits_arroyo.json \ + --data-file-format json \ + --output-prefix ./queries/clickbench +``` + +This writes: +- `queries/clickbench.sql` — shared query file for both ASAP and ClickHouse +- `queries/clickbench_streaming.yaml` — Arroyo streaming config +- `queries/clickbench_inference.yaml` — QueryEngineRust inference config + +### Step 6 — Launch Arroyo sketch pipeline (file source) + +```bash +python export_to_arroyo.py \ + --streaming-config ./queries/clickbench_streaming.yaml \ + --source-type file \ + --input-file ./data/hits_arroyo.json \ + --file-format json \ + --ts-format rfc3339 \ + --pipeline-name clickbench_pipeline \ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest \ + --output-dir ./arroyo_outputs +``` + +### Step 7 — Start QueryEngineRust + +```bash +cd ~/ASAPQuery/asap-query-engine +nohup ./target/release/query_engine_rust \ + --kafka-topic sketch_topic --input-format json \ + --config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/queries/clickbench_inference.yaml \ + --streaming-config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/queries/clickbench_streaming.yaml \ + --http-port 8088 --delete-existing-db --log-level DEBUG \ + --output-dir ./output --streaming-engine arroyo \ + --query-language SQL --lock-strategy per-key \ + --prometheus-scrape-interval 1 > /tmp/query_engine.log 2>&1 & +``` + +### Step 8 — Load data into ClickHouse (baseline) + +```bash +cd ~/ASAPQuery/asap-tools/execution-utilities/benchmark +python export_to_database.py \ + --dataset clickbench \ + --file-path ./data/hits.json.gz \ + --clickhouse-url "http://localhost:8123/" \ + --init-sql-file ./configs/clickbench_hits_init.sql +``` + +Verify: `$INSTALL_DIR/clickhouse client --query "SELECT count(*) FROM hits"` + +### Step 9 — Run benchmark + +```bash +python run_benchmark.py \ + --mode both \ + --asap-sql-file ./queries/clickbench.sql \ + --baseline-sql-file ./queries/clickbench.sql \ + --asap-url "http://localhost:8088/api/v1/query" \ + --output-dir ./results \ + --output-prefix clickbench +``` + +Results: `results/clickbench_asap.csv`, `results/clickbench_baseline.csv`, +`results/clickbench_comparison.png`. + +--- + +## H2O GroupBy End-to-End Example + +### Step 1 — Download dataset + +```bash +python download_dataset.py --dataset h2o --output-dir ./data +``` + +### Step 2 — Prepare data for Arroyo file source + +```bash +python prepare_data.py \ + --dataset h2o \ + --input ./data/G1_1e7_1e2_0_0.csv \ + --output ./data/h2o_arroyo.json \ + --max-rows 1000000 +``` + +### Steps 3–4 — Start infrastructure and Arroyo (same as ClickBench) + +### Step 5 — Generate queries and configs + +```bash +python generate_queries.py \ + --table-name h2o_groupby \ + --ts-column timestamp \ + --value-column v1 \ + --group-by-columns id1,id2 \ + --window-size 10 \ + --num-queries 50 \ + --generate-configs \ + --auto-detect-timestamps \ + --data-file ./data/h2o_arroyo.json \ + --data-file-format json \ + --output-prefix ./queries/h2o +``` + +### Step 6 — Launch Arroyo sketch pipeline + +```bash +python export_to_arroyo.py \ + --streaming-config ./queries/h2o_streaming.yaml \ + --source-type file \ + --input-file ./data/h2o_arroyo.json \ + --file-format json \ + --ts-format rfc3339 \ + --pipeline-name h2o_pipeline \ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest \ + --output-dir ./arroyo_outputs +``` + +### Step 7 — Start QueryEngineRust + +```bash +cd ~/ASAPQuery/asap-query-engine +nohup ./target/release/query_engine_rust \ + --kafka-topic sketch_topic --input-format json \ + --config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/queries/h2o_inference.yaml \ + --streaming-config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/queries/h2o_streaming.yaml \ + --http-port 8088 --delete-existing-db --log-level DEBUG \ + --output-dir ./output --streaming-engine arroyo \ + --query-language SQL --lock-strategy per-key \ + --prometheus-scrape-interval 1 > /tmp/query_engine.log 2>&1 & +``` + +### Step 8 — Load data into ClickHouse (baseline) + +```bash +python export_to_database.py \ + --dataset h2o \ + --file-path ./data/G1_1e7_1e2_0_0.csv \ + --init-sql-file ./configs/h2o_init.sql \ + --max-rows 1000000 +``` + +### Step 9 — Run benchmark + +```bash +python run_benchmark.py \ + --mode both \ + --asap-sql-file ./queries/h2o.sql \ + --baseline-sql-file ./queries/h2o.sql \ + --asap-url "http://localhost:8088/api/v1/query" \ + --output-dir ./results \ + --output-prefix h2o +``` +--- +## Elasticsearch End-to-End Example using H2O Dataset + +### Step 1-5: +Follow the same instructions from the H2O GroupBy example above. + +### Step 6 — Launch Arroyo sketch pipeline + +```bash +python export_to_arroyo.py \ + --streaming-config ./configs/h2o_streaming.yaml \ + --source-type file \ + --input-file ./data/h2o_arroyo.json \ + --file-format json \ + --ts-format unix_millis \ + --pipeline-name h2o_pipeline \ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest \ + --output-dir ./arroyo_outputs +``` + +### Step 7 — Start QueryEngineRust + +```bash +cd ~/ASAPQuery/asap-query-engine + +./target/release/query_engine_rust \ + --kafka-topic sketch_topic + --input-format json \ + --config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/configs/h2o_inference.yaml \ + --streaming-config ~/ASAPQuery/asap-tools/execution-utilities/benchmark/configs/h2o_streaming.yaml \ + --http-port 8088 --delete-existing-db --log-level DEBUG \ + --output-dir ./output --streaming-engine arroyo \ + --query-language SQL --lock-strategy per-key \ + --prometheus-scrape-interval 1 > /tmp/query_engine.log 2>&1 & +``` + +### Step 8 — Load data into Elasticsearch (baseline) + +```bash +python export_to_database.py + --dataset h2o + --file-path ./data/G1_1e7_1e2_0_0.csv + --es-host localhost + --es-port 9200 + --es-index h2o_groupby + --es-api-key your-api-key + --es-bulk-size 5000 +``` + +### Step 9 — Run benchmark + +```bash +python run_benchmark.py + --mode asap + --asap-sql-file ./queries/h2o.sql + --baseline-sql-file ./queries/h2o.sql + --elastic-host localhost + --elastic-port 9200 + --elastic-api-key your-api-key + --output-dir ./results --output-prefix h2o +``` +--- + +## Custom Dataset + +```bash +# 1. Download (any HTTP URL) +python download_dataset.py --dataset custom \ + --custom-url https://example.com/mydata.json.gz \ + --output-dir ./data + +# 2. Prepare (edit prepare_data.py for your schema, or skip if already RFC3339) + +# 3. Generate queries and configs +python generate_queries.py \ + --table-name my_table \ + --ts-column event_time \ + --value-column metric_value \ + --group-by-columns region,host \ + --window-size 10 \ + --num-queries 50 \ + --generate-configs \ + --auto-detect-timestamps \ + --data-file ./data/mydata.json \ + --output-prefix ./queries/my_dataset + +# 4. Export to Arroyo +python export_to_arroyo.py \ + --streaming-config ./queries/my_dataset_streaming.yaml \ + --source-type file \ + --input-file ./data/mydata.json \ + --file-format json \ + --ts-format rfc3339 \ + --pipeline-name my_pipeline \ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest + +# 5. Export to ClickHouse +python export_to_database.py \ + --dataset custom \ + --file-path ./data/mydata.json \ + --init-sql-file ./configs/my_init.sql \ + --table-name my_table + +# 6. Run benchmark +python run_benchmark.py \ + --mode both \ + --asap-sql-file ./queries/my_dataset.sql \ + --baseline-sql-file ./queries/my_dataset.sql \ + --asap-url "http://localhost:8088/api/v1/query" \ + --output-dir ./results +``` + +--- + +## Reset + +```bash +pkill -f "arroyo"; pkill -f "query_engine_rust" +sleep 2 +pkill -f "kafka-server-start.sh"; pkill -f "clickhouse server" +sleep 2 +rm -rf /tmp/arroyo/ + +KAFKA=$INSTALL_DIR/kafka/bin +$KAFKA/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic sketch_topic + +cd ~/ASAPQuery/asap-summary-ingest +python3 delete_pipeline.py --all_pipelines + +$INSTALL_DIR/clickhouse client --query "TRUNCATE TABLE hits" +# or for H2O: $INSTALL_DIR/clickhouse client --query "TRUNCATE TABLE h2o_groupby" +``` + +--- + +## Files + +| File | Purpose | +|------|---------| +| `download_dataset.py` | Download ClickBench, H2O, or custom datasets | +| `prepare_data.py` | Convert raw data to Arroyo file source format (RFC3339, string columns) | +| `export_to_arroyo.py` | Launch Arroyo sketch pipeline (file or kafka source) | +| `export_to_database.py` | Load data into ClickHouse for baseline | +| `generate_queries.py` | Generate a shared SQL query file (ClickHouse-compatible syntax, used for both ASAP and ClickHouse) and optional streaming/inference YAML configs | +| `run_benchmark.py` | Run queries and produce CSV results + plots | +| `configs/` | ClickHouse init SQL (CREATE TABLE statements) | diff --git a/asap-tools/execution-utilities/benchmark/configs/clickbench_hits_init.sql b/asap-tools/execution-utilities/benchmark/configs/clickbench_hits_init.sql new file mode 100644 index 00000000..b462faec --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/clickbench_hits_init.sql @@ -0,0 +1,115 @@ +-- ClickHouse init for ClickBench baseline (MergeTree only, no Kafka engine) +-- Use this with export_to_database.py --dataset clickbench --init-sql-file + +CREATE TABLE IF NOT EXISTS hits +( + WatchID Int64, + JavaEnable UInt8, + Title String, + GoodEvent Int16, + EventTime DateTime, + EventDate Date, + CounterID UInt32, + ClientIP Int32, + RegionID UInt32, + UserID Int64, + CounterClass Int8, + OS UInt8, + UserAgent UInt8, + URL String, + Referer String, + IsRefresh UInt8, + RefererCategoryID UInt16, + RefererRegionID UInt32, + URLCategoryID UInt16, + URLRegionID UInt32, + ResolutionWidth UInt16, + ResolutionHeight UInt16, + ResolutionDepth UInt8, + FlashMajor UInt8, + FlashMinor UInt8, + FlashMinor2 String, + NetMajor UInt8, + NetMinor UInt8, + UserAgentMajor UInt16, + UserAgentMinor String, + CookieEnable UInt8, + JavascriptEnable UInt8, + IsMobile UInt8, + MobilePhone UInt8, + MobilePhoneModel String, + Params String, + IPNetworkID UInt32, + TraficSourceID Int8, + SearchEngineID UInt16, + SearchPhrase String, + AdvEngineID UInt8, + IsArtifical UInt8, + WindowClientWidth UInt16, + WindowClientHeight UInt16, + ClientTimeZone Int16, + ClientEventTime DateTime, + SilverlightVersion1 UInt8, + SilverlightVersion2 UInt8, + SilverlightVersion3 UInt32, + SilverlightVersion4 UInt16, + PageCharset String, + CodeVersion UInt32, + IsLink UInt8, + IsDownload UInt8, + IsNotBounce UInt8, + FUniqID Int64, + OriginalURL String, + HID UInt32, + IsOldCounter UInt8, + IsEvent UInt8, + IsParameter UInt8, + DontCountHits UInt8, + WithHash UInt8, + HitColor String, + LocalEventTime DateTime, + Age UInt8, + Sex UInt8, + Income UInt8, + Interests UInt16, + Robotness UInt8, + RemoteIP Int32, + WindowName Int32, + OpenerName Int32, + HistoryLength Int16, + BrowserLanguage String, + BrowserCountry String, + SocialNetwork String, + SocialAction String, + HTTPError UInt16, + SendTiming UInt32, + DNSTiming UInt32, + ConnectTiming UInt32, + ResponseStartTiming UInt32, + ResponseEndTiming UInt32, + FetchTiming UInt32, + SocialSourceNetworkID UInt8, + SocialSourcePage String, + ParamPrice Int64, + ParamOrderID String, + ParamCurrency String, + ParamCurrencyID UInt16, + OpenstatServiceName String, + OpenstatCampaignID String, + OpenstatAdID String, + OpenstatSourceID String, + UTMSource String, + UTMMedium String, + UTMCampaign String, + UTMContent String, + UTMTerm String, + FromTag String, + HasGCLID UInt8, + RefererHash Int64, + URLHash Int64, + CLID UInt32 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(EventDate) +ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime, WatchID) +SETTINGS index_granularity = 8192; diff --git a/asap-tools/execution-utilities/benchmark/configs/clickbench_inference.yaml b/asap-tools/execution-utilities/benchmark/configs/clickbench_inference.yaml new file mode 100644 index 00000000..7c4af097 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/clickbench_inference.yaml @@ -0,0 +1,21 @@ +# ASAP Inference Config for ClickBench Hits Dataset +# Source: asap_query_latency/inference_config.yaml + +tables: + - name: hits + time_column: EventTime + metadata_columns: [RegionID, OS, UserAgent, TraficSourceID] + value_columns: [ResolutionWidth] + +cleanup_policy: + name: read_based + +queries: + # Temporal queries (10s window, all labels) - QUANTILE + - aggregations: + - aggregation_id: 12 + read_count_threshold: 999999 + query: | + SELECT QUANTILE(0.95, ResolutionWidth) FROM hits + WHERE EventTime BETWEEN DATEADD(s, -10, NOW()) AND NOW() + GROUP BY RegionID, OS, UserAgent, TraficSourceID diff --git a/asap-tools/execution-utilities/benchmark/configs/clickbench_streaming.yaml b/asap-tools/execution-utilities/benchmark/configs/clickbench_streaming.yaml new file mode 100644 index 00000000..3d18e1ed --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/clickbench_streaming.yaml @@ -0,0 +1,26 @@ +# ASAP Streaming Config for ClickBench Hits Dataset +# Defines sketch aggregations for Arroyo to compute +# Source: asap_query_latency/streaming_config.yaml + +tables: + - name: hits + time_column: EventTime + metadata_columns: [RegionID, OS, UserAgent, TraficSourceID] + value_columns: [ResolutionWidth] + +aggregations: + # Temporal queries (10s window, all labels) - QUANTILE (DatasketchesKLL) + - aggregationId: 12 + aggregationType: DatasketchesKLL + aggregationSubType: '' + labels: + grouping: [RegionID, OS, UserAgent, TraficSourceID] + rollup: [] + aggregated: [] + table_name: hits + value_column: ResolutionWidth + parameters: + K: 200 + windowSize: 10 + windowType: tumbling + spatialFilter: '' diff --git a/asap-tools/execution-utilities/benchmark/configs/h2o_inference.yaml b/asap-tools/execution-utilities/benchmark/configs/h2o_inference.yaml new file mode 100644 index 00000000..fde732f9 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/h2o_inference.yaml @@ -0,0 +1,20 @@ +# ASAP Inference Config for H2O GroupBy Dataset +# Source: asap_benchmark_pipeline/inference_config.yaml + +tables: + - name: h2o_groupby + time_column: timestamp + metadata_columns: [id1, id2, id3, id4, id5, id6] + value_columns: [v1, v2, v3] + +cleanup_policy: + name: read_based + +queries: + - aggregations: + - aggregation_id: 12 + read_count_threshold: 999999 + query: |- + SELECT PERCENTILE(v3, 95) FROM h2o_groupby + WHERE timestamp BETWEEN DATEADD(s, -10, NOW()) AND NOW() + GROUP BY id1, id2 ORDER BY id1, id2; diff --git a/asap-tools/execution-utilities/benchmark/configs/h2o_init.sql b/asap-tools/execution-utilities/benchmark/configs/h2o_init.sql new file mode 100644 index 00000000..dbaf81c0 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/h2o_init.sql @@ -0,0 +1,20 @@ +-- ClickHouse init for H2O GroupBy baseline (MergeTree, direct load) +-- Use this with export_to_database.py --dataset h2o --init-sql-file +-- Source: asap_benchmark_pipeline/h2o_init.sql + +DROP TABLE IF EXISTS h2o_groupby; + +CREATE TABLE IF NOT EXISTS h2o_groupby +( + timestamp DateTime, + id1 String, + id2 String, + id3 String, + id4 Int32, + id5 Int32, + id6 Int32, + v1 Int32, + v2 Int32, + v3 Float64 +) ENGINE = MergeTree() +ORDER BY (id1, id2); diff --git a/asap-tools/execution-utilities/benchmark/configs/h2o_streaming.yaml b/asap-tools/execution-utilities/benchmark/configs/h2o_streaming.yaml new file mode 100644 index 00000000..9a7e6299 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/configs/h2o_streaming.yaml @@ -0,0 +1,26 @@ +# ASAP Streaming Config for H2O GroupBy Dataset +# Source: asap_benchmark_pipeline/streaming_config.yaml + +tables: + - name: h2o_groupby + time_column: timestamp + metadata_columns: [id1, id2, id3, id4, id5, id6] + value_columns: [v1, v2, v3] + +aggregations: + # Temporal queries (10s window, all labels) - QUANTILE (DatasketchesKLL) + - aggregationId: 12 + aggregationType: DatasketchesKLL + aggregationSubType: '' + labels: + grouping: [id1, id2] + rollup: [id3, id4, id5, id6] + aggregated: [] + table_name: h2o_groupby + value_column: v3 + parameters: + K: 200 + tumblingWindowSize: 10 + windowSize: 10 + windowType: tumbling + spatialFilter: '' diff --git a/asap-tools/execution-utilities/benchmark/download_dataset.py b/asap-tools/execution-utilities/benchmark/download_dataset.py new file mode 100644 index 00000000..26ee54d5 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/download_dataset.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +""" +Unified dataset downloader for the ASAP benchmark pipeline. + +Supports ClickBench (hits.json.gz), H2O groupby (G1_1e7_1e2_0_0.csv), +or any custom HTTP URL. + +Usage: + python download_dataset.py --dataset clickbench --output-dir ./data + python download_dataset.py --dataset h2o --output-dir ./data + python download_dataset.py --dataset custom --custom-url https://... --output-dir ./data +""" + +import argparse +import os +import sys +import urllib.request + +CLICKBENCH_URL = "https://datasets.clickhouse.com/hits_compatible/hits.json.gz" +CLICKBENCH_FILENAME = "hits.json.gz" + +H2O_FILE_ID = "15SVQjQ2QehzYDLoDonio4aP7xqdMiNyi" +H2O_FILENAME = "G1_1e7_1e2_0_0.csv" + + +def _http_download(url: str, output_path: str) -> str: + """Download a file via HTTP with progress reporting.""" + print(f"Downloading from {url}") + request = urllib.request.Request( + url, headers={"User-Agent": "Mozilla/5.0 (compatible; ASAP-Benchmark/1.0)"} + ) + try: + with urllib.request.urlopen(request) as response: + total_size = int(response.headers.get("Content-Length", 0)) + downloaded = 0 + last_percent = -1 + block_size = 8192 * 128 # ~1 MB blocks + + with open(output_path, "wb") as f: + while True: + block = response.read(block_size) + if not block: + break + f.write(block) + downloaded += len(block) + if total_size > 0: + percent = downloaded * 100 // total_size + if percent != last_percent: + last_percent = percent + mb = downloaded / (1024 * 1024) + total_mb = total_size / (1024 * 1024) + sys.stdout.write( + f"\rProgress: {percent}% ({mb:.1f}/{total_mb:.1f} MB)" + ) + sys.stdout.flush() + + print("\nDownload complete!") + return output_path + + except urllib.error.HTTPError as e: + print(f"\nDownload failed: HTTP {e.code} - {e.reason}") + raise + + +def download_clickbench(output_path: str, force: bool = False) -> str: + """Download hits.json.gz from ClickHouse datasets CDN.""" + if not force and os.path.exists(output_path): + print(f"Using existing file: {output_path}") + return output_path + print("Downloading ClickBench dataset (~14 GB compressed). Please wait...") + return _http_download(CLICKBENCH_URL, output_path) + + +def download_h2o(output_path: str, force: bool = False) -> str: + """Download H2O groupby CSV (~300 MB) from Google Drive via gdown.""" + if ( + not force + and os.path.exists(output_path) + and os.path.getsize(output_path) > 100 * 1024 * 1024 + ): + print(f"Using existing file: {output_path}") + return output_path + + try: + import gdown + except ImportError: + print("Installing gdown...") + import subprocess + + subprocess.check_call([sys.executable, "-m", "pip", "install", "gdown"]) + import gdown + + print(f"Downloading H2O dataset via gdown (ID: {H2O_FILE_ID})...") + url = f"https://drive.google.com/uc?id={H2O_FILE_ID}" + gdown.download(url, output_path, quiet=False) + return output_path + + +def download_custom(url: str, output_path: str, force: bool = False) -> str: + """Download a dataset from an arbitrary HTTP URL.""" + if not force and os.path.exists(output_path): + print(f"Using existing file: {output_path}") + return output_path + return _http_download(url, output_path) + + +def main(): + parser = argparse.ArgumentParser( + description="Download benchmark datasets", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--dataset", + choices=["clickbench", "h2o", "custom"], + required=True, + help="Dataset to download", + ) + parser.add_argument( + "--output-dir", + required=True, + help="Directory to save the downloaded file", + ) + parser.add_argument( + "--output-file", + default=None, + help="Exact output file path (overrides --output-dir)", + ) + parser.add_argument( + "--custom-url", + default=None, + help="URL to download (required when --dataset custom)", + ) + parser.add_argument( + "--force-redownload", + action="store_true", + help="Re-download even if the file already exists", + ) + args = parser.parse_args() + + if args.dataset == "custom" and not args.custom_url: + parser.error("--custom-url is required when --dataset custom") + + os.makedirs(args.output_dir, exist_ok=True) + + if args.output_file: + output_path = args.output_file + os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) + elif args.dataset == "clickbench": + output_path = os.path.join(args.output_dir, CLICKBENCH_FILENAME) + elif args.dataset == "h2o": + output_path = os.path.join(args.output_dir, H2O_FILENAME) + else: + filename = args.custom_url.rstrip("/").split("/")[-1] or "data" + output_path = os.path.join(args.output_dir, filename) + + if args.dataset == "clickbench": + download_clickbench(output_path, force=args.force_redownload) + elif args.dataset == "h2o": + download_h2o(output_path, force=args.force_redownload) + else: + download_custom(args.custom_url, output_path, force=args.force_redownload) + + print(f"Dataset saved to: {output_path}") + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/export_to_arroyo.py b/asap-tools/execution-utilities/benchmark/export_to_arroyo.py new file mode 100644 index 00000000..38533668 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/export_to_arroyo.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +""" +Launch an Arroyo sketch pipeline from a local file source. + +Arroyo reads directly from a local JSON/Parquet file and writes sketches to +a Kafka topic (default: sketch_topic) for consumption by QueryEngineRust. + +Usage: + python export_to_arroyo.py \\ + --streaming-config configs/clickbench_streaming.yaml \\ + --input-file ./data/hits_arroyo.json \\ + --file-format json \\ + --ts-format rfc3339 \\ + --pipeline-name clickbench_pipeline \\ + --arroyosketch-dir ~/ASAPQuery/asap-summary-ingest +""" + +import argparse +import os +import subprocess +import sys +import time + +import requests + +DEFAULT_ARROYO_URL = "http://localhost:5115/api/v1" +DEFAULT_OUTPUT_KAFKA_TOPIC = "sketch_topic" +DEFAULT_PARALLELISM = 1 +DEFAULT_WAIT_TIMEOUT = 300 + + +def wait_for_pipeline_running( + pipeline_name: str, + arroyo_url: str = DEFAULT_ARROYO_URL, + timeout: int = DEFAULT_WAIT_TIMEOUT, +) -> bool: + """Poll the Arroyo API until the named pipeline reaches RUNNING state.""" + print(f"Waiting for pipeline '{pipeline_name}' to reach RUNNING state...") + elapsed = 0 + while True: + state = "error" + try: + r = requests.get(f"{arroyo_url}/pipelines", timeout=5) + if r.ok: + data = r.json() + for p in data.get("data", []): + if p.get("name") == pipeline_name: + s = p.get("state") + stop = p.get("stop", "") + if s is None and stop == "none": + state = "running" + else: + state = str(s).lower() if s else "unknown" + break + else: + state = "not_found" + except Exception: + state = "error" + + if state == "running": + print(f"Pipeline '{pipeline_name}' is RUNNING") + return True + + print(f" Pipeline state: {state} (elapsed: {elapsed}s)") + time.sleep(5) + elapsed += 5 + if elapsed >= timeout: + print(f"ERROR: Pipeline did not reach RUNNING state within {timeout}s") + return False + + +def build_arroyosketch_cmd(args, arroyosketch_script: str) -> list: + """Build the run_arroyosketch.py command from our CLI arguments.""" + return [ + sys.executable, + arroyosketch_script, + "--source_type", + "file", + "--output_format", + "json", + "--pipeline_name", + args.pipeline_name, + "--config_file_path", + os.path.abspath(args.streaming_config), + "--output_kafka_topic", + args.output_kafka_topic, + "--output_dir", + os.path.abspath(args.output_dir), + "--parallelism", + str(args.parallelism), + "--query_language", + "sql", + "--input_file_path", + os.path.abspath(args.input_file), + "--file_format", + args.file_format, + "--ts_format", + args.ts_format, + ] + + +def main(): + parser = argparse.ArgumentParser( + description="Launch Arroyo sketch pipeline from a local file source", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--streaming-config", + required=True, + help="Path to streaming_config.yaml", + ) + parser.add_argument( + "--input-file", + required=True, + help="Path to input data file (JSON or Parquet)", + ) + parser.add_argument( + "--file-format", + choices=["json", "parquet"], + default="json", + help="File format (default: json)", + ) + parser.add_argument( + "--ts-format", + choices=["unix_millis", "unix_seconds", "rfc3339"], + default="rfc3339", + help="Timestamp format in the data file (default: rfc3339)", + ) + parser.add_argument( + "--output-kafka-topic", + default=DEFAULT_OUTPUT_KAFKA_TOPIC, + help=f"Kafka topic for sketch output (default: {DEFAULT_OUTPUT_KAFKA_TOPIC})", + ) + parser.add_argument( + "--pipeline-name", + required=True, + help="Arroyo pipeline name", + ) + parser.add_argument( + "--parallelism", + type=int, + default=DEFAULT_PARALLELISM, + help=f"Arroyo pipeline parallelism (default: {DEFAULT_PARALLELISM})", + ) + parser.add_argument( + "--arroyosketch-dir", + required=True, + help="Path to asap-summary-ingest/ directory (contains run_arroyosketch.py)", + ) + parser.add_argument( + "--arroyo-url", + default=DEFAULT_ARROYO_URL, + help=f"Arroyo API base URL (default: {DEFAULT_ARROYO_URL})", + ) + parser.add_argument( + "--output-dir", + default="./arroyo_outputs", + help="Directory for Arroyo pipeline output artifacts (default: ./arroyo_outputs)", + ) + parser.add_argument( + "--no-wait", + action="store_true", + help="Do not wait for pipeline to reach RUNNING state", + ) + parser.add_argument( + "--wait-timeout", + type=int, + default=DEFAULT_WAIT_TIMEOUT, + help=f"Seconds to wait for RUNNING state (default: {DEFAULT_WAIT_TIMEOUT})", + ) + + args = parser.parse_args() + + arroyosketch_script = os.path.join( + os.path.abspath(args.arroyosketch_dir), "run_arroyosketch.py" + ) + if not os.path.exists(arroyosketch_script): + print(f"ERROR: run_arroyosketch.py not found at {arroyosketch_script}") + sys.exit(1) + + os.makedirs(args.output_dir, exist_ok=True) + + cmd = build_arroyosketch_cmd(args, arroyosketch_script) + print(f"Launching Arroyo pipeline '{args.pipeline_name}'...") + print(f"Command: {' '.join(cmd)}") + + result = subprocess.run(cmd, cwd=os.path.abspath(args.arroyosketch_dir)) + if result.returncode != 0: + print(f"ERROR: run_arroyosketch.py exited with code {result.returncode}") + sys.exit(result.returncode) + + if not args.no_wait: + success = wait_for_pipeline_running( + args.pipeline_name, + arroyo_url=args.arroyo_url, + timeout=args.wait_timeout, + ) + if not success: + sys.exit(1) + + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/export_to_database.py b/asap-tools/execution-utilities/benchmark/export_to_database.py new file mode 100644 index 00000000..1e6359d1 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/export_to_database.py @@ -0,0 +1,526 @@ +#!/usr/bin/env python3 +""" +Load a dataset into ClickHouse or Elasticsearch for baseline comparison. + +Supports ClickBench (hits.json.gz), H2O groupby CSV, or a custom table. + +Usage: + # ClickBench to Clickhouse + python export_to_database.py \\ + --dataset clickbench --database clickhouse \\ + --file-path ./data/hits.json.gz \\ + --init-sql-file ../clickhouse-benchmark-pipeline/clickhouse/clickbench_init.sql + + # H2O to Clickhouse + python export_to_database.py \\ + --dataset h2o --database clickhouse \\ + --file-path ./data/G1_1e7_1e2_0_0.csv \\ + --init-sql-file ../asap_benchmark_pipeline/h2o_init.sql + + # H2O to Elasticsearch + python export_to_database.py \\ + --dataset h2o --database elasticsearch \\ + --file-path ./data/G1_1e7_1e2_0_0.csv \\ + --es-host localhost \\ + --es-port 9200 \\ + --es-index h2o_benchmark \\ + --es-api-key your_api_key_here \\ + --es-bulk-size 5000 + + # Custom JSON to ClickHouse + python export_to_database.py \\ + --dataset custom --database clickhouse \\ + --file-path ./data/mydata.json \\ + --table-name mytable \\ + --ts-column event_time \\ + --ts-assignment passthrough +""" + +import argparse +import gzip +import os +import sys +from datetime import datetime, timezone + +import requests + +DEFAULT_CLICKHOUSE_URL = "http://localhost:8123/" +H2O_BATCH_SIZE = 50_000 +H2O_ROWS_PER_SECOND = 1000 +H2O_BASE_EPOCH = 1704067200 # 2024-01-01T00:00:00Z + +# Valid (dataset, database) combinations tested so far +VALID_COMBINATIONS = { + ("clickbench", "clickhouse"), + ("h2o", "clickhouse"), + ("h2o", "elasticsearch"), + ("custom", "clickhouse"), +} + + +def _exec_clickhouse_sql(clickhouse_url: str, sql: str, label: str = ""): + """Execute a SQL statement via the ClickHouse HTTP API.""" + r = requests.post(clickhouse_url, data=sql.encode()) + if not r.ok: + print(f" WARN [{label}]: {r.text.strip()[:200]}") + else: + short = sql.strip()[:80].replace("\n", " ") + print(f" OK: {short}") + + +def run_init_sql(clickhouse_url: str, init_sql_file: str): + """Execute DDL statements from a SQL file.""" + print(f"Running init SQL from {init_sql_file}...") + with open(init_sql_file) as f: + content = f.read() + stmts = [s.strip() for s in content.split(";") if s.strip()] + for stmt in stmts: + _exec_clickhouse_sql(clickhouse_url, stmt, label=stmt[:40]) + + +def check_row_count(clickhouse_url: str, table_name: str) -> int: + r = requests.post(clickhouse_url, data=f"SELECT count(*) FROM {table_name}") + if r.ok: + return int(r.text.strip()) + return 0 + + +def load_clickbench( + clickhouse_url: str, + file_path: str, + init_sql_file: str = None, + skip_table_init: bool = False, + skip_if_loaded: bool = False, + max_rows: int = 0, +): + """Load hits.json.gz into ClickHouse via HTTP INSERT.""" + if not skip_table_init and init_sql_file: + run_init_sql(clickhouse_url, init_sql_file) + + if skip_if_loaded: + count = check_row_count(clickhouse_url, "hits") + if count > 0: + print(f"Data already loaded ({count:,} rows). Skipping.") + return True + + if not os.path.exists(file_path): + print(f"ERROR: Data file not found: {file_path}") + return False + + print(f"Loading ClickBench data from {file_path}...") + + def _row_stream(): + with gzip.open(file_path, "rt") as f: + for i, line in enumerate(f): + if max_rows > 0 and i >= max_rows: + break + yield line.encode() + + url = clickhouse_url.rstrip("/") + "/?query=INSERT+INTO+hits+FORMAT+JSONEachRow" + r = requests.post(url, data=_row_stream(), stream=True) + if not r.ok: + print(f"ERROR: ClickHouse insert failed: {r.text[:200]}") + return False + + count = check_row_count(clickhouse_url, "hits") + print(f"Loaded {count:,} rows into ClickHouse (hits)") + return True + + +def _flush_h2o_batch(clickhouse_url: str, rows: list): + """Flush a batch of H2O rows to ClickHouse via HTTP INSERT.""" + sql = "INSERT INTO h2o_groupby VALUES " + ",".join(rows) + r = requests.post(clickhouse_url, data=sql.encode()) + if not r.ok: + raise RuntimeError(f"ClickHouse insert failed: {r.text[:200]}") + + +def load_h2o_clickhouse( + clickhouse_url: str, + file_path: str, + init_sql_file: str = None, + skip_table_init: bool = False, + skip_if_loaded: bool = False, + max_rows: int = 0, +): + """Load H2O groupby CSV into ClickHouse with synthetic timestamps. + + Timestamps are assigned at H2O_ROWS_PER_SECOND rows/sec starting from + H2O_BASE_EPOCH (2024-01-01T00:00:00Z). + Adapted from asap_benchmark_pipeline/run_benchmark.py:load_h2o_data_clickhouse(). + """ + if not skip_table_init and init_sql_file: + run_init_sql(clickhouse_url, init_sql_file) + + if skip_if_loaded: + count = check_row_count(clickhouse_url, "h2o_groupby") + if count > 0: + print(f"Data already loaded ({count:,} rows). Skipping.") + return True + + if not os.path.exists(file_path): + print(f"ERROR: Data file not found: {file_path}") + return False + + print(f"Inserting H2O data from {file_path} into ClickHouse...") + batch: list = [] + total = 0 + + with open(file_path, "r", encoding="utf-8") as f: + f.readline() # skip header + for i, line in enumerate(f): + if max_rows > 0 and i >= max_rows: + break + parts = line.rstrip("\n").split(",") + abs_sec = H2O_BASE_EPOCH + i // H2O_ROWS_PER_SECOND + ts = datetime.fromtimestamp(abs_sec, tz=timezone.utc) + ts_str = ts.strftime("%Y-%m-%d %H:%M:%S") + + batch.append( + f"('{ts_str}','{parts[0]}','{parts[1]}','{parts[2]}'," + f"{parts[3]},{parts[4]},{parts[5]}," + f"{parts[6]},{parts[7]},{parts[8]})" + ) + + if len(batch) >= H2O_BATCH_SIZE: + _flush_h2o_batch(clickhouse_url, batch) + total += len(batch) + batch = [] + if total % 500_000 == 0: + print(f" Inserted {total:,} rows...") + + if batch: + _flush_h2o_batch(clickhouse_url, batch) + total += len(batch) + + print(f"Loaded {total:,} rows into ClickHouse (h2o_groupby)") + return True + + +def load_h2o_elasticsearch( + es_host: str, + es_port: int, + index_name: str, + file_path: str, + api_key: str = None, + skip_if_loaded: bool = False, + max_rows: int = 0, +): + """Load H2O groupby CSV into Elasticsearch with synthetic timestamps.""" + try: + from elasticsearch import Elasticsearch, helpers + except ImportError: + print("ERROR: elasticsearch-py not installed. Run: pip install elasticsearch") + return False + + auth = {"api_key": api_key} if api_key else {} + es = Elasticsearch(f"http://{es_host}:{es_port}", **auth) + + if not es.ping(): + print(f"ERROR: Cannot connect to Elasticsearch at {es_host}:{es_port}") + return False + + if skip_if_loaded and es.indices.exists(index=index_name): + count = es.count(index=index_name)["count"] + if count > 0: + print(f"Data already loaded ({count:,} rows). Skipping.") + return True + + if es.indices.exists(index=index_name): + print(f"Deleting existing index: {index_name}") + es.indices.delete(index=index_name) + + print(f"Creating index: {index_name}") + es.indices.create( + index=index_name, + body={ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": "30s", + }, + "mappings": { + "properties": { + "timestamp": {"type": "date", "format": "epoch_millis"}, + "id1": {"type": "keyword"}, + "id2": {"type": "keyword"}, + "id3": {"type": "keyword"}, + "id4": {"type": "long"}, + "id5": {"type": "long"}, + "id6": {"type": "long"}, + "v1": {"type": "long"}, + "v2": {"type": "long"}, + "v3": {"type": "double"}, + } + }, + }, + ) + + if not os.path.exists(file_path): + print(f"ERROR: Data file not found: {file_path}") + return False + + print(f"Importing H2O data from {file_path} into Elasticsearch ({index_name})...") + + base_timestamp_ms = 1704067200000 # 2024-01-01T00:00:00Z in millis + + def generate_docs(): + with open(file_path, "r", encoding="utf-8") as f: + f.readline() # skip header + for row_num, line in enumerate(f): + if max_rows > 0 and row_num >= max_rows: + break + parts = line.rstrip("\n").split(",") + if len(parts) < 9: + continue + yield { + "_index": index_name, + "_source": { + "timestamp": base_timestamp_ms + row_num * 10, + "id1": parts[0], + "id2": parts[1], + "id3": parts[2], + "id4": int(parts[3] or 0), + "id5": int(parts[4] or 0), + "id6": int(parts[5] or 0), + "v1": int(parts[6] or 0), + "v2": int(parts[7] or 0), + "v3": float(parts[8] or 0.0), + }, + } + + total = 0 + errors = 0 + for ok, _ in helpers.streaming_bulk( + es, generate_docs(), chunk_size=H2O_BATCH_SIZE, raise_on_error=False + ): + if ok: + total += 1 + else: + errors += 1 + if total % 500_000 == 0 and total > 0: + print(f" Indexed {total:,} documents...") + + print(f"Indexed {total:,} documents ({errors} errors)") + print("Refreshing index...") + es.indices.refresh(index=index_name) + print(f"✓ Import complete! Index: {index_name}") + return True + + +def load_custom( + clickhouse_url: str, + file_path: str, + table_name: str, + ts_column: str, + ts_assignment: str = "passthrough", + init_sql_file: str = None, + skip_table_init: bool = False, + skip_if_loaded: bool = False, + max_rows: int = 0, +): + """Load a custom JSON or CSV file into ClickHouse. + + For JSON files: uses INSERT FORMAT JSONEachRow via clickhouse-client. + ts_assignment='synthetic' is only supported for CSV (same logic as H2O). + """ + if not skip_table_init and init_sql_file: + run_init_sql(clickhouse_url, init_sql_file) + + if skip_if_loaded: + count = check_row_count(clickhouse_url, table_name) + if count > 0: + print(f"Data already loaded ({count:,} rows). Skipping.") + return True + + if not os.path.exists(file_path): + print(f"ERROR: Data file not found: {file_path}") + return False + + path_lower = file_path.lower() + url = ( + clickhouse_url.rstrip("/") + + f"/?query=INSERT+INTO+{table_name}+FORMAT+JSONEachRow" + ) + + def _stream_gzip(): + with gzip.open(file_path, "rt") as f: + for i, line in enumerate(f): + if max_rows > 0 and i >= max_rows: + break + yield line.encode() + + def _stream_plain(): + with open(file_path, "r") as f: + for i, line in enumerate(f): + if max_rows > 0 and i >= max_rows: + break + yield line.encode() + + if path_lower.endswith(".json.gz") or path_lower.endswith(".jsonl.gz"): + print(f"Loading {file_path} into ClickHouse ({table_name})...") + r = requests.post(url, data=_stream_gzip(), stream=True) + if not r.ok: + print(f"ERROR: ClickHouse insert failed: {r.text[:200]}") + return False + elif path_lower.endswith(".json") or path_lower.endswith(".jsonl"): + print(f"Loading {file_path} into ClickHouse ({table_name})...") + r = requests.post(url, data=_stream_plain(), stream=True) + if not r.ok: + print(f"ERROR: ClickHouse insert failed: {r.text[:200]}") + return False + else: + print( + f"ERROR: Unsupported file format for {file_path}. Use --dataset h2o for CSV." + ) + return False + + count = check_row_count(clickhouse_url, table_name) + print(f"Loaded {count:,} rows into ClickHouse ({table_name})") + return True + + +def main(): + parser = argparse.ArgumentParser( + description="Load a dataset into ClickHouse or Elasticsearch for baseline comparison", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--dataset", + choices=["clickbench", "h2o", "custom"], + required=True, + help="Dataset type", + ) + parser.add_argument( + "--database", + choices=["clickhouse", "elasticsearch"], + required=True, + help="Target database", + ) + parser.add_argument( + "--file-path", + required=True, + help="Path to the source data file", + ) + parser.add_argument( + "--clickhouse-url", + default=DEFAULT_CLICKHOUSE_URL, + help=f"ClickHouse HTTP URL (default: {DEFAULT_CLICKHOUSE_URL})", + ) + parser.add_argument( + "--init-sql-file", + default=None, + help="DDL SQL file to run before loading (CREATE TABLE ...)", + ) + parser.add_argument( + "--table-name", + default=None, + help="Target table name (required for --dataset custom)", + ) + parser.add_argument( + "--ts-column", + default=None, + help="Timestamp column name (for --dataset custom)", + ) + parser.add_argument( + "--ts-assignment", + choices=["synthetic", "passthrough"], + default="passthrough", + help="How to assign timestamps for custom CSV data (default: passthrough)", + ) + parser.add_argument( + "--skip-table-init", + action="store_true", + help="Skip CREATE TABLE (assume tables already exist)", + ) + parser.add_argument( + "--skip-if-loaded", + action="store_true", + help="Skip insert if the table already has rows", + ) + parser.add_argument( + "--max-rows", + type=int, + default=0, + help="Maximum rows to load (0 = all)", + ) + + # Elasticsearch-specific flags + es_group = parser.add_argument_group( + "Elasticsearch options (--database elasticsearch)" + ) + es_group.add_argument("--es-host", default="localhost", help="Elasticsearch host") + es_group.add_argument( + "--es-port", type=int, default=9200, help="Elasticsearch port" + ) + es_group.add_argument( + "--es-index", default="h2o_benchmark", help="Elasticsearch index name" + ) + es_group.add_argument("--es-api-key", default=None, help="Elasticsearch API key") + es_group.add_argument( + "--es-bulk-size", type=int, default=5000, help="Bulk insert batch size" + ) + + args = parser.parse_args() + + # Validate (dataset, database) combination + combo = (args.dataset, args.database) + if combo not in VALID_COMBINATIONS: + valid = ", ".join(f"({d}/{db})" for d, db in sorted(VALID_COMBINATIONS)) + parser.error( + f"--dataset {args.dataset} is not supported with --database {args.database}. " + f"Valid combinations: {valid}" + ) + + if args.dataset == "custom" and not args.table_name: + parser.error("--table-name is required when --dataset custom") + + success = False + if args.dataset == "clickbench": + success = load_clickbench( + args.clickhouse_url, + args.file_path, + init_sql_file=args.init_sql_file, + skip_table_init=args.skip_table_init, + skip_if_loaded=args.skip_if_loaded, + max_rows=args.max_rows, + ) + elif args.dataset == "h2o": + if args.database == "elasticsearch": + success = load_h2o_elasticsearch( + es_host=args.es_host, + es_port=args.es_port, + index_name=args.es_index, + file_path=args.file_path, + api_key=args.es_api_key, + skip_if_loaded=args.skip_if_loaded, + max_rows=args.max_rows, + ) + else: + success = load_h2o_clickhouse( + args.clickhouse_url, + args.file_path, + init_sql_file=args.init_sql_file, + skip_table_init=args.skip_table_init, + skip_if_loaded=args.skip_if_loaded, + max_rows=args.max_rows, + ) + elif args.dataset == "custom": + success = load_custom( + args.clickhouse_url, + args.file_path, + table_name=args.table_name, + ts_column=args.ts_column, + ts_assignment=args.ts_assignment, + init_sql_file=args.init_sql_file, + skip_table_init=args.skip_table_init, + skip_if_loaded=args.skip_if_loaded, + max_rows=args.max_rows, + ) + + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/generate_queries.py b/asap-tools/execution-utilities/benchmark/generate_queries.py new file mode 100644 index 00000000..eb1b5d4e --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/generate_queries.py @@ -0,0 +1,494 @@ +#!/usr/bin/env python3 +""" +Generate ASAP/ClickHouse SQL query files for benchmarking, +and optionally generate streaming/inference YAML configs. + +Both ASAP and ClickHouse receive identical queries using native ClickHouse syntax: + - quantile(q)(col) parametric aggregate + - 'YYYY-MM-DD HH:MM:SS' datetime timestamps (no Z suffix) + +This works because after PR #166 ASAP's parser accepts ClickHouse parametric syntax, +and both systems interpret bare datetime strings as local server time — which is +unambiguous only when both run in UTC. See README for the UTC requirement. + +Each query targets a fixed time window (window-end timestamp) and matches the +annotation format `-- T{NNN}: description` expected by run_benchmark.py. + +Output (always): + {prefix}.sql shared query file for both ASAP and ClickHouse + +Output (with --generate-configs): + {prefix}_streaming.yaml Arroyo streaming config + {prefix}_inference.yaml QueryEngineRust inference config + +Usage: + # Generate queries + configs in one shot + python generate_queries.py \\ + --table-name h2o_groupby \\ + --ts-column timestamp \\ + --value-column v1 \\ + --group-by-columns id1,id2 \\ + --window-size 30 \\ + --num-queries 50 \\ + --generate-configs \\ + --auto-detect-timestamps \\ + --data-file ./data/h2o_arroyo_full.json \\ + --data-file-format json \\ + --output-prefix ./queries/h2o_30s + + # Queries only (no configs) + python generate_queries.py \\ + --table-name hits \\ + --ts-column EventTime \\ + --value-column ResolutionWidth \\ + --group-by-columns RegionID,OS,UserAgent,TraficSourceID \\ + --window-size 10 \\ + --num-queries 50 \\ + --auto-detect-timestamps \\ + --data-file ./data/hits.json.gz \\ + --data-file-format json.gz \\ + --output-prefix ./queries/clickbench + + # Use a pre-built timestamps file + python generate_queries.py \\ + --table-name h2o_groupby \\ + --ts-column timestamp \\ + --value-column v1 \\ + --group-by-columns id1,id2 \\ + --window-size 10 \\ + --num-queries 50 \\ + --timestamps-file ./my_timestamps.txt \\ + --output-prefix ./queries/h2o +""" + +import argparse +import gzip +import json +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import List, Optional + + +def _parse_timestamp(value: str) -> Optional[datetime]: + """Try to parse a timestamp string in common formats.""" + value = str(value).strip() + for fmt in ( + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d", + ): + try: + return datetime.strptime(value, fmt).replace(tzinfo=timezone.utc) + except ValueError: + pass + # Try unix seconds/millis (numeric string) + try: + v = float(value) + if v > 1e12: # millis + return datetime.fromtimestamp(v / 1000, tz=timezone.utc) + return datetime.fromtimestamp(v, tz=timezone.utc) + except ValueError: + pass + return None + + +def _scan_ts_range_json(file_path: str, ts_column: str, compressed: bool) -> tuple: + """Scan a JSON-lines file and return (min_ts, max_ts, count).""" + min_ts = max_ts = None + count = 0 + opener = gzip.open if compressed else open + mode = "rt" if compressed else "r" + with opener(file_path, mode) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + val = obj.get(ts_column) + if val is not None: + ts = _parse_timestamp(val) + if ts: + count += 1 + if min_ts is None or ts < min_ts: + min_ts = ts + if max_ts is None or ts > max_ts: + max_ts = ts + except (json.JSONDecodeError, KeyError): + continue + return min_ts, max_ts, count + + +def _scan_ts_range_csv(file_path: str, ts_column: str) -> tuple: + """Scan a CSV file and return (min_ts, max_ts, count).""" + import csv + + min_ts = max_ts = None + count = 0 + with open(file_path, "r", newline="") as f: + reader = csv.DictReader(f) + if ts_column not in (reader.fieldnames or []): + print( + f"WARNING: Column '{ts_column}' not found in CSV. " + f"Available: {reader.fieldnames}" + ) + return None, None, 0 + for row in reader: + ts = _parse_timestamp(row[ts_column]) + if ts: + count += 1 + if min_ts is None or ts < min_ts: + min_ts = ts + if max_ts is None or ts > max_ts: + max_ts = ts + return min_ts, max_ts, count + + +def detect_timestamps(data_file: str, data_file_format: str, ts_column: str) -> tuple: + """Return (min_ts, max_ts) by scanning the entire data file.""" + fmt = data_file_format.lower() + if fmt in ("json.gz", "jsonl.gz"): + min_ts, max_ts, count = _scan_ts_range_json( + data_file, ts_column, compressed=True + ) + elif fmt in ("json", "jsonl"): + min_ts, max_ts, count = _scan_ts_range_json( + data_file, ts_column, compressed=False + ) + elif fmt == "csv": + min_ts, max_ts, count = _scan_ts_range_csv(data_file, ts_column) + else: + print(f"ERROR: Unsupported data file format: {data_file_format}") + sys.exit(1) + + if min_ts is None: + print(f"ERROR: No '{ts_column}' timestamps found in {data_file}") + sys.exit(1) + + return min_ts, max_ts + + +def _snap_to_window_boundary(ts: datetime, window_size: int) -> datetime: + """Round a timestamp up to the next window boundary (epoch-aligned). + + Arroyo tumbling windows are aligned to epoch multiples of window_size. + Querying at a non-boundary timestamp will miss the sketch. + """ + epoch_sec = int(ts.timestamp()) + remainder = epoch_sec % window_size + if remainder == 0: + return ts + snapped = epoch_sec + (window_size - remainder) + return datetime.fromtimestamp(snapped, tz=timezone.utc) + + +def generate_window_ends( + min_ts: datetime, + max_ts: datetime, + window_size: int, + stride: int, + num_queries: int, +) -> List[datetime]: + """Generate evenly-spaced window-end timestamps within [min_ts, max_ts]. + + Timestamps are snapped to epoch-aligned window boundaries so that + Arroyo's tumbling window sketches can be found by QueryEngineRust. + """ + # First valid window-end: snap to next boundary after min_ts + window_size + earliest = min_ts + timedelta(seconds=window_size) + start = _snap_to_window_boundary(earliest, window_size) + if start >= max_ts: + print( + f"WARNING: window_size ({window_size}s) exceeds the data time range " + f"({(max_ts - min_ts).total_seconds():.0f}s). Using max_ts as only endpoint." + ) + return [max_ts] + + ends = [] + current = start + while current <= max_ts and len(ends) < num_queries: + ends.append(current) + current += timedelta(seconds=stride) + + return ends + + +def generate_sql_file( + table_name: str, + ts_column: str, + value_column: str, + group_by_columns: List[str], + quantile: float, + window_size: int, + window_ends: List[datetime], + window_form: str, + output_prefix: str, +): + """Write a single SQL file using ClickHouse-compatible syntax. + + Uses quantile(q)(col) and 'YYYY-MM-DD HH:MM:SS' datetime strings. + Both ASAP and ClickHouse accept this format when running in UTC. + """ + group_by_clause = ", ".join(group_by_columns) + lines = [] + + for i, end_ts in enumerate(window_ends): + end_str = end_ts.strftime("%Y-%m-%d %H:%M:%S") + start_str = (end_ts - timedelta(seconds=window_size)).strftime( + "%Y-%m-%d %H:%M:%S" + ) + label = f"T{i:03d}" + + if window_form == "dateadd": + where = f"{ts_column} BETWEEN DATEADD(s, -{window_size}, '{end_str}') AND '{end_str}'" + else: + where = f"{ts_column} BETWEEN '{start_str}' AND '{end_str}'" + + lines.append( + f"-- {label}: quantile window ending at {end_str}\n" + f"SELECT quantile({quantile})({value_column}) FROM {table_name} " + f"WHERE {where} GROUP BY {group_by_clause};" + ) + + sql_file = f"{output_prefix}.sql" + Path(sql_file).parent.mkdir(parents=True, exist_ok=True) + + with open(sql_file, "w") as f: + f.write("\n".join(lines) + "\n") + + print(f"Generated {len(window_ends)} queries → {sql_file}") + + +def generate_config_files( + table_name: str, + ts_column: str, + value_column: str, + group_by_columns: List[str], + quantile: float, + window_size: int, + aggregation_id: int, + aggregation_k: int, + output_prefix: str, +): + """Write paired streaming and inference YAML config files.""" + meta_yaml = "[" + ", ".join(group_by_columns) + "]" + group_by_clause = ", ".join(group_by_columns) + + streaming_content = f"""\ +tables: + - name: {table_name} + time_column: {ts_column} + metadata_columns: {meta_yaml} + value_columns: [{value_column}] + +aggregations: + - aggregationId: {aggregation_id} + aggregationType: DatasketchesKLL + aggregationSubType: '' + labels: + grouping: {meta_yaml} + rollup: [] + aggregated: [] + table_name: {table_name} + value_column: {value_column} + parameters: + K: {aggregation_k} + tumblingWindowSize: {window_size} + windowSize: {window_size} + windowType: tumbling + spatialFilter: '' +""" + + inference_content = f"""\ +tables: + - name: {table_name} + time_column: {ts_column} + metadata_columns: {meta_yaml} + value_columns: [{value_column}] + +cleanup_policy: + name: read_based + +queries: + - aggregations: + - aggregation_id: {aggregation_id} + read_count_threshold: 999999 + query: |- + SELECT quantile({quantile})({value_column}) FROM {table_name} + WHERE {ts_column} BETWEEN DATEADD(s, -{window_size}, NOW()) AND NOW() + GROUP BY {group_by_clause}; +""" + + streaming_file = f"{output_prefix}_streaming.yaml" + inference_file = f"{output_prefix}_inference.yaml" + + Path(streaming_file).parent.mkdir(parents=True, exist_ok=True) + + with open(streaming_file, "w") as f: + f.write(streaming_content) + + with open(inference_file, "w") as f: + f.write(inference_content) + + print("Generated configs:") + print(f" Streaming: {streaming_file}") + print(f" Inference: {inference_file}") + + +def main(): + parser = argparse.ArgumentParser( + description="Generate ASAP + ClickHouse SQL query files (shared syntax)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + # Table/column config + parser.add_argument("--table-name", required=True) + parser.add_argument("--ts-column", required=True, help="Timestamp column name") + parser.add_argument( + "--value-column", required=True, help="Column to compute quantile on" + ) + parser.add_argument( + "--group-by-columns", + required=True, + help="Comma-separated GROUP BY columns", + ) + # Query parameters + parser.add_argument("--quantile", type=float, default=0.95) + parser.add_argument( + "--window-size", type=int, default=10, help="Window size in seconds" + ) + parser.add_argument("--num-queries", type=int, default=50) + parser.add_argument( + "--window-form", + choices=["explicit", "dateadd"], + default="explicit", + help="SQL window form: explicit='BETWEEN start AND end', dateadd='BETWEEN DATEADD(s,-N,end) AND end' (default: explicit)", + ) + parser.add_argument( + "--output-prefix", + required=True, + help="Output file prefix (e.g. ./queries/clickbench → clickbench.sql)", + ) + # Timestamp sources (mutually exclusive) + ts_group = parser.add_mutually_exclusive_group(required=True) + ts_group.add_argument( + "--auto-detect-timestamps", + action="store_true", + help="Scan data file to determine time range", + ) + ts_group.add_argument( + "--timestamps-file", + default=None, + help="File with explicit window-end timestamps (one ISO timestamp per line)", + ) + # Auto-detect options + parser.add_argument( + "--data-file", + default=None, + help="Path to data file (required with --auto-detect-timestamps)", + ) + parser.add_argument( + "--data-file-format", + choices=["json", "jsonl", "json.gz", "jsonl.gz", "csv"], + default="json", + help="Data file format (default: json)", + ) + parser.add_argument( + "--stride-seconds", + type=int, + default=None, + help="Spacing between window-end timestamps (default: window-size * 3)", + ) + # Config generation + parser.add_argument( + "--generate-configs", + action="store_true", + help="Also generate streaming and inference YAML config files", + ) + parser.add_argument( + "--aggregation-id", + type=int, + default=12, + help="Aggregation ID for config files (default: 12)", + ) + parser.add_argument( + "--aggregation-k", + type=int, + default=200, + help="KLL sketch K parameter (default: 200)", + ) + + args = parser.parse_args() + + if args.auto_detect_timestamps and not args.data_file: + parser.error("--data-file is required when --auto-detect-timestamps is set") + + group_by_columns = [c.strip() for c in args.group_by_columns.split(",")] + stride = args.stride_seconds if args.stride_seconds else args.window_size * 3 + + # Determine window-end timestamps + if args.timestamps_file: + window_ends = [] + with open(args.timestamps_file) as f: + for line in f: + line = line.strip() + if not line: + continue + ts = _parse_timestamp(line) + if ts: + window_ends.append(ts) + else: + print(f"WARNING: Could not parse timestamp: {line!r}") + if not window_ends: + print("ERROR: No valid timestamps found in --timestamps-file") + sys.exit(1) + window_ends = window_ends[: args.num_queries] + print( + f"Using {len(window_ends)} timestamps from {args.timestamps_file} " + f"({window_ends[0]} – {window_ends[-1]})" + ) + else: + print(f"Scanning {args.data_file} for timestamp range...") + min_ts, max_ts = detect_timestamps( + args.data_file, args.data_file_format, args.ts_column + ) + print(f" Detected range: {min_ts} – {max_ts}") + window_ends = generate_window_ends( + min_ts, max_ts, args.window_size, stride, args.num_queries + ) + print( + f" Generated {len(window_ends)} window endpoints " + f"(stride={stride}s, window={args.window_size}s)" + ) + + generate_sql_file( + table_name=args.table_name, + ts_column=args.ts_column, + value_column=args.value_column, + group_by_columns=group_by_columns, + quantile=args.quantile, + window_size=args.window_size, + window_ends=window_ends, + window_form=args.window_form, + output_prefix=args.output_prefix, + ) + + if args.generate_configs: + generate_config_files( + table_name=args.table_name, + ts_column=args.ts_column, + value_column=args.value_column, + group_by_columns=group_by_columns, + quantile=args.quantile, + window_size=args.window_size, + aggregation_id=args.aggregation_id, + aggregation_k=args.aggregation_k, + output_prefix=args.output_prefix, + ) + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/prepare_data.py b/asap-tools/execution-utilities/benchmark/prepare_data.py new file mode 100644 index 00000000..043c6e06 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/prepare_data.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +""" +Prepare data files for use with the Arroyo file source. + +The Arroyo file source (single_file_custom connector) requires: + - JSON-lines format + - Timestamps in RFC3339 format (e.g. "2013-07-14T20:38:47Z") + - Metadata columns (GROUP BY columns) as strings + - Value columns as floats + +This script converts raw downloaded datasets into the right format. + +Usage: + # ClickBench: convert hits.json.gz → hits_arroyo.json + python prepare_data.py --dataset clickbench \\ + --input ./data/hits.json.gz \\ + --output ./data/hits_arroyo.json \\ + [--max-rows 1000000] + + # H2O: convert G1_1e7_1e2_0_0.csv → h2o_arroyo.json (adds synthetic timestamps) + python prepare_data.py --dataset h2o \\ + --input ./data/G1_1e7_1e2_0_0.csv \\ + --output ./data/h2o_arroyo.json \\ + [--max-rows 1000000] +""" + +import argparse +import gzip +import json +from datetime import datetime, timezone +from pathlib import Path + +# Synthetic timestamp base for H2O (2024-01-01T00:00:00Z) +H2O_BASE_EPOCH = 1704067200 +H2O_ROWS_PER_SECOND = 1000 + +# ClickBench columns needed by Arroyo (must match streaming_config.yaml) +CB_TIMESTAMP_FIELD = "EventTime" +CB_VALUE_FIELDS = ["ResolutionWidth"] +CB_METADATA_FIELDS = ["RegionID", "OS", "UserAgent", "TraficSourceID"] +CB_KEEP_FIELDS = [CB_TIMESTAMP_FIELD] + CB_VALUE_FIELDS + CB_METADATA_FIELDS + +# H2O columns +H2O_TIMESTAMP_FIELD = "timestamp" +H2O_METADATA_FIELDS = ["id1", "id2"] +H2O_VALUE_FIELDS = ["v1"] + + +def _parse_clickbench_ts(ts_str: str) -> str: + """Convert 'YYYY-MM-DD HH:MM:SS' → 'YYYY-MM-DDTHH:MM:SSZ' (RFC3339).""" + try: + dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + except ValueError: + return ts_str # already RFC3339 or unknown format + + +def prepare_clickbench(input_path: str, output_path: str, max_rows: int = 0): + """Convert hits.json.gz to Arroyo-compatible JSON. + + - Converts EventTime to RFC3339 + - Stringifies integer metadata columns (RegionID, OS, UserAgent, TraficSourceID) + - Sorts by EventTime (required for Arroyo event-time watermarks) + - Writes only the fields needed by the streaming config + """ + print(f"Reading {input_path}...") + records = [] + + opener = gzip.open if input_path.endswith(".gz") else open + with opener(input_path, "rt") as f: + for i, line in enumerate(f): + if max_rows > 0 and i >= max_rows: + break + if i % 100_000 == 0 and i > 0: + print(f" Read {i:,} rows...", end="\r") + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + + ts = _parse_clickbench_ts(str(obj.get(CB_TIMESTAMP_FIELD, ""))) + record = {CB_TIMESTAMP_FIELD: ts} + for col in CB_VALUE_FIELDS: + record[col] = float(obj.get(col, 0)) + for col in CB_METADATA_FIELDS: + record[col] = str(obj.get(col, "")) + records.append(record) + + print(f"\nSorting {len(records):,} records by {CB_TIMESTAMP_FIELD}...") + records.sort(key=lambda r: r[CB_TIMESTAMP_FIELD]) + + print(f"Writing to {output_path}...") + with open(output_path, "w") as f: + for record in records: + f.write(json.dumps(record) + "\n") + + print(f"Done. {len(records):,} records written.") + if records: + print( + f" Time range: {records[0][CB_TIMESTAMP_FIELD]} – {records[-1][CB_TIMESTAMP_FIELD]}" + ) + + +def prepare_h2o(input_path: str, output_path: str, max_rows: int = 0): + """Convert H2O CSV to Arroyo-compatible JSON with synthetic timestamps. + + - Adds synthetic RFC3339 timestamps at H2O_ROWS_PER_SECOND rows/sec + starting from 2024-01-01T00:00:00Z + - Converts id4, id5, id6 to strings (metadata columns are expected as strings) + """ + print(f"Reading {input_path}...") + count = 0 + + with open(input_path, "r", encoding="utf-8") as fin, open(output_path, "w") as fout: + + header = fin.readline().strip() + cols = header.split(",") + id_idx = {c: i for i, c in enumerate(cols)} + + for i, line in enumerate(fin): + if max_rows > 0 and i >= max_rows: + break + if i % 100_000 == 0 and i > 0: + print(f" Written {i:,} rows...", end="\r") + + parts = line.rstrip("\n").split(",") + abs_ms = H2O_BASE_EPOCH * 1000 + i * 10 # 10 ms per row + record = { + H2O_TIMESTAMP_FIELD: abs_ms, + "id1": parts[id_idx["id1"]], + "id2": parts[id_idx["id2"]], + "id3": parts[id_idx["id3"]], + "id4": str(parts[id_idx["id4"]]), + "id5": str(parts[id_idx["id5"]]), + "id6": str(parts[id_idx["id6"]]), + "v1": float(parts[id_idx["v1"]]), + "v2": float(parts[id_idx["v2"]]), + "v3": float(parts[id_idx["v3"]]), + } + fout.write(json.dumps(record) + "\n") + count += 1 + + print(f"\nDone. {count:,} records written to {output_path}.") + first_ts = datetime.fromtimestamp(H2O_BASE_EPOCH, tz=timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + last_ts = datetime.fromtimestamp( + H2O_BASE_EPOCH + count // H2O_ROWS_PER_SECOND, tz=timezone.utc + ).strftime("%Y-%m-%dT%H:%M:%SZ") + print(f" Time range: {first_ts} – {last_ts}") + + +def main(): + parser = argparse.ArgumentParser( + description="Prepare dataset files for Arroyo file source", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--dataset", + choices=["clickbench", "h2o"], + required=True, + help="Dataset type to prepare", + ) + parser.add_argument("--input", required=True, help="Path to raw input file") + parser.add_argument( + "--output", required=True, help="Path to write prepared JSON file" + ) + parser.add_argument( + "--max-rows", + type=int, + default=0, + help="Max rows to process (0 = all, default: 0)", + ) + args = parser.parse_args() + + Path(args.output).parent.mkdir(parents=True, exist_ok=True) + + if args.dataset == "clickbench": + prepare_clickbench(args.input, args.output, args.max_rows) + else: + prepare_h2o(args.input, args.output, args.max_rows) + + +if __name__ == "__main__": + main() diff --git a/asap-tools/execution-utilities/benchmark/requirements.txt b/asap-tools/execution-utilities/benchmark/requirements.txt new file mode 100644 index 00000000..85676314 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/requirements.txt @@ -0,0 +1,5 @@ +requests>=2.28 +gdown>=4.7 +pyyaml>=6.0 +matplotlib>=3.7 +numpy>=1.24 diff --git a/asap-tools/execution-utilities/benchmark/run_benchmark.py b/asap-tools/execution-utilities/benchmark/run_benchmark.py new file mode 100644 index 00000000..5501e7e7 --- /dev/null +++ b/asap-tools/execution-utilities/benchmark/run_benchmark.py @@ -0,0 +1,662 @@ +#!/usr/bin/env python3 +""" +Unified benchmark runner: ASAP (QueryEngineRust) vs ClickHouse/Elasticsearch baseline. + +Reads SQL files generated by generate_queries.py, sends each query to the +configured endpoint, and writes results to CSV. With --mode both, runs +baseline then ASAP and generates a latency comparison plot. + +Usage: + # Both modes, ClickHouse baseline + python run_benchmark.py \\ + --mode both --database clickhouse \\ + --asap-sql-file ./queries/clickbench_asap.sql \\ + --baseline-sql-file ./queries/clickbench_clickhouse.sql \\ + --output-dir ./results + + # Both modes, Elasticsearch baseline + python run_benchmark.py \\ + --mode both --database elasticsearch \\ + --asap-sql-file ./queries/h2o_asap.sql \\ + --baseline-sql-file ./queries/h2o_elasticsearch.sql \\ + --elastic-host localhost \\ + --elastic-port 9200 \\ + --elastic-api-key your_api_key_here \\ + --output-dir ./results \\ + --output-prefix h2o + + # ASAP only + python run_benchmark.py \\ + --mode asap --database clickhouse \\ + --asap-sql-file ./queries/h2o_asap.sql \\ + --output-dir ./results + + # Baseline only + python run_benchmark.py \\ + --mode baseline --database clickhouse \\ + --baseline-sql-file ./queries/h2o_clickhouse.sql \\ + --output-dir ./results +""" + +import argparse +import csv +import re +import time +import urllib.parse +from pathlib import Path +from typing import List, Optional, Tuple + +import matplotlib.pyplot as plt +import numpy as np +import requests +import json + +DEFAULT_ELASTIC_HOST = "localhost" +DEFAULT_ELASTIC_PORT = 9200 +DEFAULT_ASAP_CLICKHOUSE_URL = "http://localhost:8088/clickhouse/query" +DEFAULT_ASAP_ELASTIC_URL = "http://localhost:8088/_sql?format=json" +DEFAULT_CLICKHOUSE_URL = "http://localhost:8123/?session_timezone=UTC" +DEFAULT_OUTPUT_DIR = "./results" +DEFAULT_OUTPUT_PREFIX = "benchmark" + + +# --------------------------------------------------------------------------- +# Query extraction +# Reused from asap_query_latency/run_benchmark.py:extract_queries_from_sql() +# --------------------------------------------------------------------------- + + +def extract_queries_from_sql(sql_file: Path) -> List[Tuple[str, str]]: + """Extract (query_id, sql) pairs from an annotated SQL file. + + Expects lines of the form: + -- T001: description + SELECT ... ; + """ + with open(sql_file) as f: + content = f.read() + pattern = r"-- ([A-Za-z0-9_]+):[^\n]*\n(SELECT[^;]+;)" + return [ + (qid, sql.strip()) + for qid, sql in re.findall(pattern, content, re.DOTALL | re.IGNORECASE) + ] + + +# --------------------------------------------------------------------------- +# Query runner +# Adapted from asap_benchmark_pipeline/run_benchmark.py:run_query() +# Uses requests.Session for connection reuse across queries. +# --------------------------------------------------------------------------- + + +def run_query( + query: str, + endpoint_url: str, + session: requests.Session, + timeout: int = 30, + debug: bool = False, + database: str = "clickhouse", + api_key: Optional[str] = None, + fetch_size: int = 1000, +) -> Tuple[float, Optional[str], int, Optional[str]]: + """Send a single SQL query and return (latency_ms, result_text, num_rows, error).""" + try: + start = time.time() + + if database == "elasticsearch": + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"ApiKey {api_key}" + body = {"query": query.strip().rstrip(";"), "fetch_size": fetch_size} + response = session.post( + endpoint_url, headers=headers, json=body, timeout=timeout + ) + else: + encoded_query = urllib.parse.quote(query) + separator = "&" if "?" in endpoint_url else "?" + url = f"{endpoint_url}{separator}query={encoded_query}" + response = session.get(url, timeout=timeout) + + latency_ms = (time.time() - start) * 1000 + + if debug: + source = ( + "OK" if response.status_code == 200 else f"HTTP {response.status_code}" + ) + print(f" [{source}] {latency_ms:.2f}ms") + + if response.status_code == 200: + if database == "elasticsearch": + data = response.json() + + if "hits" in data: + hits = data["hits"].get("hits", []) + if hits: + col_names = list(hits[0].get("_source", {}).keys()) + formatted_rows = [ + ", ".join( + f"{k}={hit.get('_source', {}).get(k)}" + for k in col_names + ) + for hit in hits + ] + result_text = "\n".join(formatted_rows) + num_rows = len(hits) + else: + result_text = "" + num_rows = 0 + + elif "rows" in data: + rows = data.get("rows", []) + columns = data.get("columns", []) + col_names = [ + c.get("name", f"col{i}") for i, c in enumerate(columns) + ] + formatted_rows = [ + ( + ", ".join( + f"{col_names[i]}={v}" if i < len(col_names) else str(v) + for i, v in enumerate(row) + ) + if isinstance(row, (list, tuple)) + else str(row) + ) + for row in rows + ] + result_text = "\n".join(formatted_rows) + num_rows = len(rows) + + else: + result_text = "" + num_rows = 0 + else: + result_text = response.text.strip() + num_rows = len(result_text.split("\n")) if result_text else 0 + + return latency_ms, result_text, num_rows, None + else: + return ( + latency_ms, + None, + 0, + f"HTTP {response.status_code}: {response.text[:200]}", + ) + except requests.Timeout: + return timeout * 1000.0, None, 0, "Timeout" + except Exception as e: + return 0.0, None, 0, str(e) + + +# --------------------------------------------------------------------------- +# Benchmark runner +# Consolidated from both asap_query_latency/run_benchmark.py and +# asap_benchmark_pipeline/run_benchmark.py:run_benchmark(). +# --------------------------------------------------------------------------- + + +def _infer_pattern(query_id: str) -> str: + if query_id.startswith("ST"): + return "SpatioTemporal" + if query_id.startswith("S"): + return "Spatial" + if query_id.startswith("T"): + return "Temporal" + if query_id.startswith("N"): + return "Nested" + if query_id.startswith("D"): + return "Dated" + if query_id.startswith("L"): + return "LongRange" + return "Unknown" + + +def _latency_summary(latencies: List[float], label: str): + if not latencies: + return + s = sorted(latencies) + n = len(s) + print(f"\n{label} ({n} successful queries):") + print( + f" min={s[0]:.2f}ms avg={sum(s)/n:.2f}ms " + f"p50={s[int(n*0.50)]:.2f}ms p95={s[int(n*0.95)]:.2f}ms max={s[-1]:.2f}ms" + ) + + +def run_benchmark( + sql_file: Path, + endpoint_url: str, + output_csv: Path, + mode: str, + query_filter: Optional[List[str]] = None, + timeout: int = 30, + repeat: int = 1, + debug: bool = False, + no_plot: bool = False, + database: str = "clickhouse", + api_key: Optional[str] = None, +): + """Run all queries and write results to CSV. + + CSV columns: query_id, query_pattern, latency_ms, result_rows, + result_full, error, mode + """ + print(f"\nRunning benchmark in {mode.upper()} mode...") + print(f"Endpoint: {endpoint_url}") + print(f"SQL file: {sql_file}") + print(f"Output: {output_csv}") + if debug: + print("Debug: per-request HTTP status shown.") + + queries = extract_queries_from_sql(sql_file) + if query_filter: + queries = [(qid, sql) for qid, sql in queries if qid in query_filter] + print(f"Found {len(queries)} queries (repeat={repeat})") + + output_csv.parent.mkdir(parents=True, exist_ok=True) + session = requests.Session() + latencies_ok: List[float] = [] + plot_latencies: List[float] = [] + + with open(output_csv, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerow( + [ + "query_id", + "query_pattern", + "latency_ms", + "result_rows", + "result_full", + "error", + "mode", + ] + ) + + for query_id, sql in queries: + pattern = _infer_pattern(query_id) + print(f"Running {query_id}...", end=" " if not debug else "\n", flush=True) + + trial_latencies = [] + last_result, last_error, last_row_count = None, None, 0 + for _ in range(repeat): + lat, result, row_count, error = run_query( + sql, + endpoint_url, + session, + timeout, + debug, + database=database, + api_key=api_key, + ) + trial_latencies.append(lat) + last_result, last_error, last_row_count = result, error, row_count + if error: + break + + latency_ms = sorted(trial_latencies)[len(trial_latencies) // 2] + + if last_error: + print(f"ERROR {last_error}") + writer.writerow( + [query_id, pattern, f"{latency_ms:.2f}", 0, "", last_error, mode] + ) + plot_latencies.append(0.0) + else: + preview = last_result.replace("\n", " | ") if last_result else "" + latencies_ok.append(latency_ms) + plot_latencies.append(latency_ms) + print(f"{latency_ms:.2f}ms ({last_row_count} rows)") + writer.writerow( + [ + query_id, + pattern, + f"{latency_ms:.2f}", + last_row_count, + preview, + "", + mode, + ] + ) + + time.sleep(0.1) + + print(f"\nResults saved to {output_csv}") + _latency_summary(latencies_ok, "Latency summary") + + if not no_plot and plot_latencies: + _plot_single(plot_latencies, mode, output_csv.with_suffix(".png")) + + +def _plot_single(latencies: List[float], mode: str, out_path: Path): + """Bar chart of per-query latency for a single mode.""" + color = "#4682b4" if mode == "asap" else "#f4a460" + x = list(range(1, len(latencies) + 1)) + plt.figure(figsize=(12, 5)) + plt.bar(x, latencies, color=color, edgecolor="black") + plt.xlabel("Query Execution Order") + plt.ylabel("Latency (ms)") + plt.title(f"Query Latency — {mode.upper()} mode") + plt.grid(axis="y", linestyle="--", alpha=0.7) + plt.tight_layout() + plt.savefig(out_path, dpi=150) + plt.close() + print(f"Plot saved to {out_path}") + + +def _parse_result_values(result_full: str) -> List[float]: + """Extract numeric values from a pipe-separated result_full string.""" + if not result_full: + return [] + values = [] + for part in result_full.split(" | "): + part = part.strip() + if not part: + continue + cols = part.split("\t") + try: + values.append(float(cols[-1])) + except (ValueError, IndexError): + continue + return values + + +def _compute_result_error( + baseline_values: List[float], asap_values: List[float] +) -> Optional[float]: + """Mean absolute relative error between two sorted result sets.""" + if not baseline_values or not asap_values: + return None + b = sorted(baseline_values) + a = sorted(asap_values) + n = min(len(b), len(a)) + if n == 0: + return None + b, a = b[:n], a[:n] + errors = [] + for bv, av in zip(b, a): + if bv == 0: + errors.append(0.0 if av == 0 else abs(av)) + else: + errors.append(abs(av - bv) / abs(bv)) + return sum(errors) / len(errors) + + +def _plot_comparison(asap_csv: Path, baseline_csv: Path, out_path: Path): + """Three-panel comparison: latency bars, speedup, and result accuracy.""" + + def _load(path): + rows = {} + with open(path) as f: + for row in csv.DictReader(f): + if not row["error"]: + rows[row["query_id"]] = { + "latency": float(row["latency_ms"]), + "result": row.get("result_full", ""), + } + return rows + + asap = _load(asap_csv) + base = _load(baseline_csv) + qids = sorted(set(asap) & set(base)) + if not qids: + print("WARNING: No common query IDs for comparison plot.") + return + + x = np.arange(len(qids)) + a_vals = [asap[q]["latency"] for q in qids] + b_vals = [base[q]["latency"] for q in qids] + speedup = [b / a if a > 0 else 0 for a, b in zip(a_vals, b_vals)] + + errors_pct = [] + for q in qids: + b_results = _parse_result_values(base[q]["result"]) + a_results = _parse_result_values(asap[q]["result"]) + err = _compute_result_error(b_results, a_results) + errors_pct.append((err or 0.0) * 100) + + has_accuracy = any(e > 0 for e in errors_pct) + n_panels = 3 if has_accuracy else 2 + ratios = [3, 1, 1.5] if has_accuracy else [3, 1] + + fig, axes = plt.subplots( + n_panels, + 1, + figsize=(14, 4 + 3 * n_panels), + gridspec_kw={"height_ratios": ratios}, + ) + ax1, ax2 = axes[0], axes[1] + + w = 0.4 + ax1.bar(x - w / 2, b_vals, w, label="Baseline", color="#f4a460") + ax1.bar(x + w / 2, a_vals, w, label="ASAP (KLL sketch)", color="#4682b4") + ax1.set_xticks(x) + ax1.set_xticklabels(qids, rotation=90, fontsize=7) + ax1.set_ylabel("Latency (ms)") + ax1.set_title( + f"Query latency: ASAP vs baseline " + f"(p50: {np.median(a_vals):.1f}ms vs {np.median(b_vals):.1f}ms)" + ) + ax1.legend() + ax1.set_xlim(-0.6, len(qids) - 0.4) + + ax2.bar(x, speedup, color="#2e8b57", width=0.7) + ax2.axhline( + np.mean(speedup), + color="red", + linewidth=1, + linestyle="--", + label=f"mean {np.mean(speedup):.1f}×", + ) + ax2.set_xticks(x) + ax2.set_xticklabels(qids, rotation=90, fontsize=7) + ax2.set_ylabel("Speedup (×)") + ax2.legend(fontsize=8) + ax2.set_xlim(-0.6, len(qids) - 0.4) + + if has_accuracy: + ax3 = axes[2] + colors = [ + "#d9534f" if e > 10 else "#f0ad4e" if e > 5 else "#5cb85c" + for e in errors_pct + ] + ax3.bar( + x, errors_pct, color=colors, width=0.7, edgecolor="black", linewidth=0.3 + ) + mean_err = np.mean(errors_pct) + ax3.axhline( + mean_err, + color="red", + linewidth=1, + linestyle="--", + label=f"mean {mean_err:.2f}%", + ) + ax3.set_xticks(x) + ax3.set_xticklabels(qids, rotation=90, fontsize=7) + ax3.set_ylabel("Relative Error (%)") + ax3.set_title("Result accuracy: ASAP estimate vs baseline exact answer") + ax3.legend(fontsize=8) + ax3.set_xlim(-0.6, len(qids) - 0.4) + + plt.tight_layout() + plt.savefig(out_path, dpi=150) + plt.close() + print(f"Comparison plot saved to {out_path}") + + if has_accuracy: + s = sorted(errors_pct) + n = len(s) + print( + f"Result error: mean={np.mean(s):.2f}% " + f"p50={s[int(n*0.50)]:.2f}% p95={s[int(n*0.95)]:.2f}% " + f"max={s[-1]:.2f}%" + ) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def main(): + parser = argparse.ArgumentParser( + description="Benchmark ASAP vs ClickHouse/Elasticsearch baseline", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--mode", + choices=["asap", "baseline", "both"], + default="both", + help="Which mode(s) to run (default: both)", + ) + parser.add_argument( + "--database", + choices=["clickhouse", "elasticsearch"], + required=True, + help="Baseline database to benchmark against", + ) + parser.add_argument( + "--asap-sql-file", + default=None, + help="SQL file for ASAP mode (required if mode is asap or both)", + ) + parser.add_argument( + "--baseline-sql-file", + default=None, + help="SQL file for baseline mode (required if mode is baseline or both)", + ) + + # ClickHouse flags + ch_group = parser.add_argument_group("ClickHouse options (--database clickhouse)") + ch_group.add_argument( + "--asap-url", + default=None, + help=f"ASAP endpoint for ClickHouse mode (default: {DEFAULT_ASAP_CLICKHOUSE_URL})", + ) + ch_group.add_argument( + "--clickhouse-url", + default=DEFAULT_CLICKHOUSE_URL, + help=f"ClickHouse HTTP URL (default: {DEFAULT_CLICKHOUSE_URL})", + ) + + # Elasticsearch flags + es_group = parser.add_argument_group( + "Elasticsearch options (--database elasticsearch)" + ) + es_group.add_argument( + "--elastic-host", default=DEFAULT_ELASTIC_HOST, help="Elasticsearch host" + ) + es_group.add_argument( + "--elastic-port", + type=int, + default=DEFAULT_ELASTIC_PORT, + help="Elasticsearch port", + ) + es_group.add_argument( + "--elastic-api-key", default=None, help="Elasticsearch API key" + ) + + # Shared flags + parser.add_argument( + "--output-dir", + default=DEFAULT_OUTPUT_DIR, + help=f"Directory for results (default: {DEFAULT_OUTPUT_DIR})", + ) + parser.add_argument( + "--output-prefix", + default=DEFAULT_OUTPUT_PREFIX, + help=f"Prefix for output files (default: {DEFAULT_OUTPUT_PREFIX})", + ) + parser.add_argument( + "--query-filter", + default=None, + help="Comma-separated query IDs to run (e.g. T000,T001)", + ) + parser.add_argument( + "--repeat", + type=int, + default=1, + help="Run each query N times and report the median (default: 1)", + ) + parser.add_argument( + "--timeout", + type=int, + default=30, + help="Per-query timeout in seconds (default: 30)", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Show per-query HTTP status", + ) + parser.add_argument( + "--no-plot", + action="store_true", + help="Do not generate any plots", + ) + args = parser.parse_args() + + # Validate required SQL files + if args.mode in ("asap", "both") and not args.asap_sql_file: + parser.error("--asap-sql-file is required when --mode is asap or both") + if args.mode in ("baseline", "both") and not args.baseline_sql_file: + parser.error("--baseline-sql-file is required when --mode is baseline or both") + + # Resolve endpoints based on --database + use_elastic = args.database == "elasticsearch" + + baseline_url = ( + f"http://{args.elastic_host}:{args.elastic_port}/_sql?format=json" + if use_elastic + else args.clickhouse_url + ) + asap_url = args.asap_url or ( + DEFAULT_ASAP_ELASTIC_URL if use_elastic else DEFAULT_ASAP_CLICKHOUSE_URL + ) + + output_dir = Path(args.output_dir) + prefix = args.output_prefix + query_filter = ( + [q.strip() for q in args.query_filter.split(",")] if args.query_filter else None + ) + + asap_csv = output_dir / f"{prefix}_asap.csv" + baseline_csv = output_dir / f"{prefix}_baseline.csv" + + if args.mode in ("baseline", "both"): + run_benchmark( + sql_file=Path(args.baseline_sql_file), + endpoint_url=baseline_url, + output_csv=baseline_csv, + mode="baseline", + database=args.database, + api_key=args.elastic_api_key if use_elastic else None, + query_filter=query_filter, + timeout=args.timeout, + repeat=args.repeat, + debug=args.debug, + no_plot=args.no_plot, + ) + + if args.mode in ("asap", "both"): + run_benchmark( + sql_file=Path(args.asap_sql_file), + endpoint_url=asap_url, + output_csv=asap_csv, + mode="asap", + database=args.database, + api_key=args.elastic_api_key if use_elastic else None, + query_filter=query_filter, + timeout=args.timeout, + repeat=args.repeat, + debug=args.debug, + no_plot=args.no_plot, + ) + + if args.mode == "both" and not args.no_plot: + _plot_comparison( + asap_csv, baseline_csv, output_dir / f"{prefix}_comparison.png" + ) + + +if __name__ == "__main__": + main()