Skip to content

[streaming-quote-api] Stream per-solver results from the price competition (1/3)#4467

Open
squadgazzz wants to merge 3 commits into
mainfrom
feat/streaming-quote-competition
Open

[streaming-quote-api] Stream per-solver results from the price competition (1/3)#4467
squadgazzz wants to merge 3 commits into
mainfrom
feat/streaming-quote-competition

Conversation

@squadgazzz
Copy link
Copy Markdown
Contributor

@squadgazzz squadgazzz commented Jun 3, 2026

Description

First PR for the streaming quote API (#4456). It adds the one capability the streaming endpoint needs from the pricing layer: the ability for the price-estimation competition to emit each solver's result as it arrives, rather than only the single best one once the whole competition finishes. Purely additive. Nothing consumes it yet, the next PR in the stack does.

Changes

  • Add a StreamingPriceEstimating trait next to PriceEstimating.
  • Implement it for CompetitionEstimator: run every estimator concurrently and yield each result as it completes, with no ranking and no early return. Verification is unaffected, the verified flag still rides on each estimate.

How to test

New unit tests.

Related issues

Part of #4456

- inline estimate_all into the trait impl, elide lifetime to match PriceEstimating
- assert arrival order in the streaming test so a serial impl would fail
- drop em dash in doc comment
@squadgazzz squadgazzz changed the title Stream per-solver results from the price competition (1/3) [quote-streaming-api] Stream per-solver results from the price competition (1/3) Jun 3, 2026
@squadgazzz squadgazzz changed the title [quote-streaming-api] Stream per-solver results from the price competition (1/3) [streaming-quote-api] Stream per-solver results from the price competition (1/3) Jun 4, 2026
@squadgazzz squadgazzz marked this pull request as ready for review June 5, 2026 10:54
@squadgazzz squadgazzz requested a review from a team as a code owner June 5, 2026 10:54
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the StreamingPriceEstimating trait and its implementation for CompetitionEstimator, which allows price estimation results to be yielded concurrently as they complete. It also adds unit tests to verify that the stream yields all results in the correct order and properly propagates errors. No critical issues were found, and there is no feedback to provide.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Copy link
Copy Markdown
Contributor

@MartinquaXD MartinquaXD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me but I'd like to hold the approval until I red all 3 PRs.

Comment on lines +182 to +188
let futures: FuturesUnordered<BoxFuture<'_, PriceEstimateResult>> = FuturesUnordered::new();
for stage in &self.stages {
for (_name, estimator) in stage {
futures.push(estimator.estimate(query.clone()));
}
}
futures.boxed()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to turn this into a single chain:

        self.stages
            .iter()
            .flatten()
            .map(|(_name, estimator)| estimator.estimate(query.clone()))
            .collect::<FuturesUnordered<_>>()
            .boxed()

Feel free to ignore if you prefer the current logic.

Comment on lines +716 to +717
("ok".to_owned(), Arc::new(ok)),
("err".to_owned(), Arc::new(err)),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test could give slightly stronger guarantees that an error doesn't abort the entire stream if you emit the ok response after a tiny sleep.

Having the ordered results can also make the assertion logic at the end a bit easier to read.

let futures: FuturesUnordered<BoxFuture<'_, PriceEstimateResult>> = FuturesUnordered::new();
for stage in &self.stages {
for (_name, estimator) in stage {
futures.push(estimator.estimate(query.clone()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should return the name with the result for logging/metric purposes when being used in the followup stacked PR?

/// by dropping the stream.
fn estimate_stream(&self, query: Arc<Query>) -> BoxStream<'_, PriceEstimateResult> {
let futures: FuturesUnordered<BoxFuture<'_, PriceEstimateResult>> = FuturesUnordered::new();
for stage in &self.stages {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we iterate self.stages and call each leaf estimator's estimate directly — it does not go through CompetitionEstimator::estimate. So it bypasses the wrapper layer that does is_reasonable and emit_quote_event - is that intended?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants