Skip to content

Commit

Permalink
flatten_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
espindola committed Jan 27, 2022
1 parent d2a3731 commit d3dc67f
Showing 1 changed file with 2 additions and 30 deletions.
32 changes: 2 additions & 30 deletions server/src/query/engine.rs
Expand Up @@ -6,7 +6,7 @@ use crate::types::{Field, ObjectDelta, ObjectType, Type, OAUTHUSER_TYPE_NAME};
use anyhow::{anyhow, Context as AnyhowContext};
use futures::stream::BoxStream;
use futures::stream::Stream;
use futures::Future;
use futures::FutureExt;
use futures::StreamExt;
use itertools::{zip, Itertools};
use pin_project::pin_project;
Expand Down Expand Up @@ -68,36 +68,8 @@ async fn make_transactioned_stream(
}
}

#[pin_project]
struct QueryResults<F: Future> {
#[pin]
fut: F,
#[pin]
stream: Option<F::Output>,
}

pub(crate) fn new_query_results(raw_query: String, tr: TransactionStatic) -> RawSqlStream {
Box::pin(QueryResults {
fut: make_transactioned_stream(tr, raw_query),
stream: None,
})
}

impl<F, S> Stream for QueryResults<F>
where
F: Future<Output = S>,
S: Stream<Item = anyhow::Result<AnyRow>> + Unpin,
{
type Item = anyhow::Result<AnyRow>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if this.stream.is_none() {
let stream = futures::ready!(this.fut.poll(cx));
this.stream.replace(stream);
}
this.stream.as_pin_mut().unwrap().poll_next(cx)
}
Box::pin(make_transactioned_stream(tr, raw_query).flatten_stream())
}

impl TryFrom<&Field> for ColumnDef {
Expand Down

0 comments on commit d3dc67f

Please sign in to comment.