Remove getDictionary from Datasource class#673
Closed
kishoreg wants to merge 1 commit into
Closed
Conversation
feef318 to
b8ab9a5
Compare
b8ab9a5 to
c34f561
Compare
timothy-e
added a commit
to timothy-e/pinot
that referenced
this pull request
Jun 17, 2026
…e SIGSTOP-style timeouts (apache#673) cc stripe-private-oss-forks/pinot-reviewers r? dang saiswapnilar `incrementedServers` was populated only after `submit()` returned, so when `submit()` hung on a frozen-but-connected server and threw `TimeoutException`, the set was empty and `recordPerServerLatencies` was a no-op — the adaptive routing EMA was never updated. SSE already increments before dispatch (`AsyncQueryResponse.java:62`), but MSE didn't because we didn't want to pull out the details of `submit` to get all the servers, but now we see that it's necessary. Instead of leaking the implementation details, pass a pre-dispatch hook through `submit` that calls `recordStatsForQuerySubmission` before entering the `try` block. When `submit()` now times out, `incrementedServers` is populated -> `tryRecover` calls `cancelWithStats` -> unresponsive servers absent from `respondingServerIds` -> Tier 3 in `recordPerServerLatencies` -> `latency = elapsedMs` -> EMA updated. [STREAMANALYTICS-4543](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4543) In the below graph, `before-submit` refers to this PR.  Using a script that * runs a 4 minute load test * after 1 minute of the load test, sends `svc -p` to the Pinot process on one realtime and one offline server * after 2 minutes, sends `svc -c` to continue. * between runs, lowers adaptive routing params so that the stats are reset. Before this change, [grafana showed](https://grafana.corp.stripe.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto&var-host_cluster=northwest&var-pinot_cluster=$__all&var-pinot_tenant=billinganalyticsrose&var-host_type=$__all&from=2026-06-16T13:32:54.129Z&to=2026-06-16T13:50:48.107Z&timezone=America%2FToronto&var-overrides=&refresh=10s) not very conclusive results for MSE latency stats: <img width="1361" alt="Screenshot 2026-06-16 at 10 36 50 am" src="https://git.corp.stripe.com/user-attachments/assets/40f447b0-fbaa-42a0-bf12-3b568af8065f" /> With this change, [grafana showed](https://grafana.corp.stripe.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto&var-host_cluster=northwest&var-pinot_cluster=$__all&var-pinot_tenant=billinganalyticsrose&var-host_type=$__all&from=2026-06-16T13:59:29.589Z&to=2026-06-16T14:14:23.905Z&timezone=America%2FToronto&var-overrides=&refresh=10s) very obvious results in the MSE latency stats. <img width="1365" alt="Screenshot 2026-06-16 at 10 37 54 am" src="https://git.corp.stripe.com/user-attachments/assets/a1721aca-f7d7-42d3-b117-fd1c7bc5e2e0" /> ``` set -euo pipefail ALPHAS=(0.666) AUTODECAYS=(60) EXPONENTS=(2 2 2) RESET_ALPHA=1 RESET_AUTODECAY_MS=1000 RESET_EXPONENT=2 RESET_SLEEP_SECONDS=30 CONTROLLER_HOST=qa-pinotdbcontroller--0de7e24d978a877a9.northwest.stripe.io STREAMING_HOST=qa-pinotdbstreaming--0606dd9b61a58afdb.northwest.stripe.io SERVER_HOST=qa-pinotdbserver--022dc12bf5cb176d2.northwest.stripe.io RESULTS_DIR="results/$(date +%Y-%m-%d)" mkdir -p "$RESULTS_DIR" TOTAL=$((${#ALPHAS[@]} * ${#AUTODECAYS[@]} * ${#EXPONENTS[@]})) RUN=0 set_routing_params() { local alpha="$1" local autodecay_ms="$2" local exponent="$3" local payload payload=$(printf '{"pinot.broker.adaptive.server.selector.autodecay.window.ms": "%s", "pinot.broker.adaptive.server.selector.ewma.alpha": "%s", "pinot.broker.adaptive.server.selector.hybrid.score.exponent": "%s"}' \ "$autodecay_ms" "$alpha" "$exponent") for attempt in 1 2 3 4 5; do if ssh "$CONTROLLER_HOST" \ "curl -X POST localhost:9000/cluster/configs -H 'Content-Type: application/json' -d '$payload'"; then return 0 fi if [ "$attempt" -eq 5 ]; then echo "ERROR: Failed to set params after 5 attempts" return 1 fi echo "Attempt ${attempt} failed, retrying in 5s..." sleep 5 done } pause_servers() { echo "Pausing servers..." until ssh "$STREAMING_HOST" 'sudo svc -p /etc/service/pinot-streaming' && \ ssh "$SERVER_HOST" 'sudo svc -p /etc/service/pinot-server'; do echo "Pause failed, retrying in 2s..." sleep 2 done } unpause_servers() { echo "Unpausing servers..." until ssh "$STREAMING_HOST" 'sudo svc -c /etc/service/pinot-streaming' && \ ssh "$SERVER_HOST" 'sudo svc -c /etc/service/pinot-server'; do echo "Unpause failed, retrying in 2s..." sleep 2 done } run_test() { local alpha="$1" local autodecay_ms="$2" local exponent="$3" local timestamp results_file timestamp=$(date +%H%M%S) results_file="${RESULTS_DIR}/alpha=${alpha}_autodecay=${autodecay_ms}_exponent=${exponent}_${timestamp}.txt" echo "Starting traffic (results will be saved to ${results_file})..." pay remote ssh run-ba-testing-traffic -- python3 pinot-rose-load-test.py --duration 240 > "$results_file" 2>&1 & local traffic_pid=$! echo "Setting autodecay=${autodecay_ms}ms, alpha=${alpha}, exponent=${exponent}..." set_routing_params "$alpha" "$autodecay_ms" "$exponent" echo "Waiting 60s..." sleep 60 pause_servers echo "Waiting 120s before unpausing..." sleep 120 unpause_servers echo "Waiting for traffic generation to complete..." wait $traffic_pid echo "Done. Results saved to ${results_file}" } for autodecay in "${AUTODECAYS[@]}"; do for alpha in "${ALPHAS[@]}"; do for exponent in "${EXPONENTS[@]}"; do RUN=$((RUN + 1)) echo "=== Run ${RUN}/${TOTAL}: alpha=${alpha}, autodecay=${autodecay}s, exponent=${exponent} ===" run_test "$alpha" "$((autodecay * 1000))" "$exponent" echo "" if [ "$RUN" -lt "$TOTAL" ]; then echo "Resetting adaptive routing to alpha=${RESET_ALPHA}, autodecay=${RESET_AUTODECAY_MS}ms, exponent=${RESET_EXPONENT} between runs..." set_routing_params "$RESET_ALPHA" "$RESET_AUTODECAY_MS" "$RESET_EXPONENT" echo "Sleeping ${RESET_SLEEP_SECONDS}s between runs..." sleep "$RESET_SLEEP_SECONDS" fi done done done echo "=== Sweep complete: ${TOTAL} runs ===" ``` Stripe-Original-Repo: stripe-private-oss-forks/pinot Stripe-Monotonic-Timestamp: v2/2026-06-17T03:35:29Z/0 Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/673
timothy-e
added a commit
to timothy-e/pinot
that referenced
this pull request
Jun 23, 2026
…e SIGSTOP-style timeouts (apache#673) cc stripe-private-oss-forks/pinot-reviewers r? dang saiswapnilar `incrementedServers` was populated only after `submit()` returned, so when `submit()` hung on a frozen-but-connected server and threw `TimeoutException`, the set was empty and `recordPerServerLatencies` was a no-op — the adaptive routing EMA was never updated. SSE already increments before dispatch (`AsyncQueryResponse.java:62`), but MSE didn't because we didn't want to pull out the details of `submit` to get all the servers, but now we see that it's necessary. Instead of leaking the implementation details, pass a pre-dispatch hook through `submit` that calls `recordStatsForQuerySubmission` before entering the `try` block. When `submit()` now times out, `incrementedServers` is populated -> `tryRecover` calls `cancelWithStats` -> unresponsive servers absent from `respondingServerIds` -> Tier 3 in `recordPerServerLatencies` -> `latency = elapsedMs` -> EMA updated. [STREAMANALYTICS-4543](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4543) In the below graph, `before-submit` refers to this PR.  Using a script that * runs a 4 minute load test * after 1 minute of the load test, sends `svc -p` to the Pinot process on one realtime and one offline server * after 2 minutes, sends `svc -c` to continue. * between runs, lowers adaptive routing params so that the stats are reset. Before this change, [grafana showed](https://grafana.corp.stripe.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto&var-host_cluster=northwest&var-pinot_cluster=$__all&var-pinot_tenant=billinganalyticsrose&var-host_type=$__all&from=2026-06-16T13:32:54.129Z&to=2026-06-16T13:50:48.107Z&timezone=America%2FToronto&var-overrides=&refresh=10s) not very conclusive results for MSE latency stats: <img width="1361" alt="Screenshot 2026-06-16 at 10 36 50 am" src="https://git.corp.stripe.com/user-attachments/assets/40f447b0-fbaa-42a0-bf12-3b568af8065f" /> With this change, [grafana showed](https://grafana.corp.stripe.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto&var-host_cluster=northwest&var-pinot_cluster=$__all&var-pinot_tenant=billinganalyticsrose&var-host_type=$__all&from=2026-06-16T13:59:29.589Z&to=2026-06-16T14:14:23.905Z&timezone=America%2FToronto&var-overrides=&refresh=10s) very obvious results in the MSE latency stats. <img width="1365" alt="Screenshot 2026-06-16 at 10 37 54 am" src="https://git.corp.stripe.com/user-attachments/assets/a1721aca-f7d7-42d3-b117-fd1c7bc5e2e0" /> ``` set -euo pipefail ALPHAS=(0.666) AUTODECAYS=(60) EXPONENTS=(2 2 2) RESET_ALPHA=1 RESET_AUTODECAY_MS=1000 RESET_EXPONENT=2 RESET_SLEEP_SECONDS=30 CONTROLLER_HOST=qa-pinotdbcontroller--0de7e24d978a877a9.northwest.stripe.io STREAMING_HOST=qa-pinotdbstreaming--0606dd9b61a58afdb.northwest.stripe.io SERVER_HOST=qa-pinotdbserver--022dc12bf5cb176d2.northwest.stripe.io RESULTS_DIR="results/$(date +%Y-%m-%d)" mkdir -p "$RESULTS_DIR" TOTAL=$((${#ALPHAS[@]} * ${#AUTODECAYS[@]} * ${#EXPONENTS[@]})) RUN=0 set_routing_params() { local alpha="$1" local autodecay_ms="$2" local exponent="$3" local payload payload=$(printf '{"pinot.broker.adaptive.server.selector.autodecay.window.ms": "%s", "pinot.broker.adaptive.server.selector.ewma.alpha": "%s", "pinot.broker.adaptive.server.selector.hybrid.score.exponent": "%s"}' \ "$autodecay_ms" "$alpha" "$exponent") for attempt in 1 2 3 4 5; do if ssh "$CONTROLLER_HOST" \ "curl -X POST localhost:9000/cluster/configs -H 'Content-Type: application/json' -d '$payload'"; then return 0 fi if [ "$attempt" -eq 5 ]; then echo "ERROR: Failed to set params after 5 attempts" return 1 fi echo "Attempt ${attempt} failed, retrying in 5s..." sleep 5 done } pause_servers() { echo "Pausing servers..." until ssh "$STREAMING_HOST" 'sudo svc -p /etc/service/pinot-streaming' && \ ssh "$SERVER_HOST" 'sudo svc -p /etc/service/pinot-server'; do echo "Pause failed, retrying in 2s..." sleep 2 done } unpause_servers() { echo "Unpausing servers..." until ssh "$STREAMING_HOST" 'sudo svc -c /etc/service/pinot-streaming' && \ ssh "$SERVER_HOST" 'sudo svc -c /etc/service/pinot-server'; do echo "Unpause failed, retrying in 2s..." sleep 2 done } run_test() { local alpha="$1" local autodecay_ms="$2" local exponent="$3" local timestamp results_file timestamp=$(date +%H%M%S) results_file="${RESULTS_DIR}/alpha=${alpha}_autodecay=${autodecay_ms}_exponent=${exponent}_${timestamp}.txt" echo "Starting traffic (results will be saved to ${results_file})..." pay remote ssh run-ba-testing-traffic -- python3 pinot-rose-load-test.py --duration 240 > "$results_file" 2>&1 & local traffic_pid=$! echo "Setting autodecay=${autodecay_ms}ms, alpha=${alpha}, exponent=${exponent}..." set_routing_params "$alpha" "$autodecay_ms" "$exponent" echo "Waiting 60s..." sleep 60 pause_servers echo "Waiting 120s before unpausing..." sleep 120 unpause_servers echo "Waiting for traffic generation to complete..." wait $traffic_pid echo "Done. Results saved to ${results_file}" } for autodecay in "${AUTODECAYS[@]}"; do for alpha in "${ALPHAS[@]}"; do for exponent in "${EXPONENTS[@]}"; do RUN=$((RUN + 1)) echo "=== Run ${RUN}/${TOTAL}: alpha=${alpha}, autodecay=${autodecay}s, exponent=${exponent} ===" run_test "$alpha" "$((autodecay * 1000))" "$exponent" echo "" if [ "$RUN" -lt "$TOTAL" ]; then echo "Resetting adaptive routing to alpha=${RESET_ALPHA}, autodecay=${RESET_AUTODECAY_MS}ms, exponent=${RESET_EXPONENT} between runs..." set_routing_params "$RESET_ALPHA" "$RESET_AUTODECAY_MS" "$RESET_EXPONENT" echo "Sleeping ${RESET_SLEEP_SECONDS}s between runs..." sleep "$RESET_SLEEP_SECONDS" fi done done done echo "=== Sweep complete: ${TOTAL} runs ===" ``` Stripe-Original-Repo: stripe-private-oss-forks/pinot Stripe-Monotonic-Timestamp: v2/2026-06-17T03:35:29Z/0 Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/673
timothy-e
added a commit
to timothy-e/pinot
that referenced
this pull request
Jun 25, 2026
…e SIGSTOP-style timeouts (apache#673) cc stripe-private-oss-forks/pinot-reviewers r? dang saiswapnilar `incrementedServers` was populated only after `submit()` returned, so when `submit()` hung on a frozen-but-connected server and threw `TimeoutException`, the set was empty and `recordPerServerLatencies` was a no-op — the adaptive routing EMA was never updated. SSE already increments before dispatch (`AsyncQueryResponse.java:62`), but MSE didn't because we didn't want to pull out the details of `submit` to get all the servers, but now we see that it's necessary. Instead of leaking the implementation details, pass a pre-dispatch hook through `submit` that calls `recordStatsForQuerySubmission` before entering the `try` block. When `submit()` now times out, `incrementedServers` is populated -> `tryRecover` calls `cancelWithStats` -> unresponsive servers absent from `respondingServerIds` -> Tier 3 in `recordPerServerLatencies` -> `latency = elapsedMs` -> EMA updated. [STREAMANALYTICS-4543](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4543) In the below graph, `before-submit` refers to this PR.  Using a script that * runs a 4 minute load test * after 1 minute of the load test, sends `svc -p` to the Pinot process on one realtime and one offline server * after 2 minutes, sends `svc -c` to continue. * between runs, lowers adaptive routing params so that the stats are reset. Before this change, [grafana showed](https://grafana.corp.stripe.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto&var-host_cluster=northwest&var-pinot_cluster=$__all&var-pinot_tenant=billinganalyticsrose&var-host_type=$__all&from=2026-06-16T13:32:54.129Z&to=2026-06-16T13:50:48.107Z&timezone=America%2FToronto&var-overrides=&refresh=10s) not very conclusive results for MSE latency stats: <img width="1361" alt="Screenshot 2026-06-16 at 10 36 50 am" src="https://git.corp.stripe.com/user-attachments/assets/40f447b0-fbaa-42a0-bf12-3b568af8065f" /> With this change, [grafana showed](https://grafana.corp.stripe.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto&var-host_cluster=northwest&var-pinot_cluster=$__all&var-pinot_tenant=billinganalyticsrose&var-host_type=$__all&from=2026-06-16T13:59:29.589Z&to=2026-06-16T14:14:23.905Z&timezone=America%2FToronto&var-overrides=&refresh=10s) very obvious results in the MSE latency stats. <img width="1365" alt="Screenshot 2026-06-16 at 10 37 54 am" src="https://git.corp.stripe.com/user-attachments/assets/a1721aca-f7d7-42d3-b117-fd1c7bc5e2e0" /> ``` set -euo pipefail ALPHAS=(0.666) AUTODECAYS=(60) EXPONENTS=(2 2 2) RESET_ALPHA=1 RESET_AUTODECAY_MS=1000 RESET_EXPONENT=2 RESET_SLEEP_SECONDS=30 CONTROLLER_HOST=qa-pinotdbcontroller--0de7e24d978a877a9.northwest.stripe.io STREAMING_HOST=qa-pinotdbstreaming--0606dd9b61a58afdb.northwest.stripe.io SERVER_HOST=qa-pinotdbserver--022dc12bf5cb176d2.northwest.stripe.io RESULTS_DIR="results/$(date +%Y-%m-%d)" mkdir -p "$RESULTS_DIR" TOTAL=$((${#ALPHAS[@]} * ${#AUTODECAYS[@]} * ${#EXPONENTS[@]})) RUN=0 set_routing_params() { local alpha="$1" local autodecay_ms="$2" local exponent="$3" local payload payload=$(printf '{"pinot.broker.adaptive.server.selector.autodecay.window.ms": "%s", "pinot.broker.adaptive.server.selector.ewma.alpha": "%s", "pinot.broker.adaptive.server.selector.hybrid.score.exponent": "%s"}' \ "$autodecay_ms" "$alpha" "$exponent") for attempt in 1 2 3 4 5; do if ssh "$CONTROLLER_HOST" \ "curl -X POST localhost:9000/cluster/configs -H 'Content-Type: application/json' -d '$payload'"; then return 0 fi if [ "$attempt" -eq 5 ]; then echo "ERROR: Failed to set params after 5 attempts" return 1 fi echo "Attempt ${attempt} failed, retrying in 5s..." sleep 5 done } pause_servers() { echo "Pausing servers..." until ssh "$STREAMING_HOST" 'sudo svc -p /etc/service/pinot-streaming' && \ ssh "$SERVER_HOST" 'sudo svc -p /etc/service/pinot-server'; do echo "Pause failed, retrying in 2s..." sleep 2 done } unpause_servers() { echo "Unpausing servers..." until ssh "$STREAMING_HOST" 'sudo svc -c /etc/service/pinot-streaming' && \ ssh "$SERVER_HOST" 'sudo svc -c /etc/service/pinot-server'; do echo "Unpause failed, retrying in 2s..." sleep 2 done } run_test() { local alpha="$1" local autodecay_ms="$2" local exponent="$3" local timestamp results_file timestamp=$(date +%H%M%S) results_file="${RESULTS_DIR}/alpha=${alpha}_autodecay=${autodecay_ms}_exponent=${exponent}_${timestamp}.txt" echo "Starting traffic (results will be saved to ${results_file})..." pay remote ssh run-ba-testing-traffic -- python3 pinot-rose-load-test.py --duration 240 > "$results_file" 2>&1 & local traffic_pid=$! echo "Setting autodecay=${autodecay_ms}ms, alpha=${alpha}, exponent=${exponent}..." set_routing_params "$alpha" "$autodecay_ms" "$exponent" echo "Waiting 60s..." sleep 60 pause_servers echo "Waiting 120s before unpausing..." sleep 120 unpause_servers echo "Waiting for traffic generation to complete..." wait $traffic_pid echo "Done. Results saved to ${results_file}" } for autodecay in "${AUTODECAYS[@]}"; do for alpha in "${ALPHAS[@]}"; do for exponent in "${EXPONENTS[@]}"; do RUN=$((RUN + 1)) echo "=== Run ${RUN}/${TOTAL}: alpha=${alpha}, autodecay=${autodecay}s, exponent=${exponent} ===" run_test "$alpha" "$((autodecay * 1000))" "$exponent" echo "" if [ "$RUN" -lt "$TOTAL" ]; then echo "Resetting adaptive routing to alpha=${RESET_ALPHA}, autodecay=${RESET_AUTODECAY_MS}ms, exponent=${RESET_EXPONENT} between runs..." set_routing_params "$RESET_ALPHA" "$RESET_AUTODECAY_MS" "$RESET_EXPONENT" echo "Sleeping ${RESET_SLEEP_SECONDS}s between runs..." sleep "$RESET_SLEEP_SECONDS" fi done done done echo "=== Sweep complete: ${TOTAL} runs ===" ``` Stripe-Original-Repo: stripe-private-oss-forks/pinot Stripe-Monotonic-Timestamp: v2/2026-06-17T03:35:29Z/0 Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/673
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This will be series of changes to remove the current assumption that data is always dictionary encoded.
The intermediate goal is the remove the usage of data-source during the execution phase. All operates must work off of block interfaces.