diff --git a/server/src/db.rs b/server/src/db.rs index cc67aa857..62ba650e4 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -5,7 +5,6 @@ use crate::deno::make_field_policies; use crate::policies::FieldPolicies; use crate::query::engine; use crate::query::engine::new_query_results; -use crate::query::engine::RawSqlStream; use crate::query::engine::SqlStream; use crate::runtime; use crate::types::{Field, ObjectType, Type, TypeSystem, TypeSystemError, OAUTHUSER_TYPE_NAME}; @@ -21,6 +20,7 @@ use futures::TryStreamExt; use itertools::Itertools; use pin_project::pin_project; use serde_json::value::Value; +use sqlx::any::AnyRow; use sqlx::AnyPool; use std::collections::HashMap; use std::collections::HashSet; @@ -228,14 +228,14 @@ enum Query { } #[pin_project] -struct PolicyApplyingStream { +struct PolicyApplyingStream { #[pin] - inner: RawSqlStream, + inner: T, policies: FieldPolicies, columns: Vec<(String, Type)>, } -impl Stream for PolicyApplyingStream { +impl>> Stream for PolicyApplyingStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); @@ -313,9 +313,9 @@ fn sql_backing_store( if policies.transforms.is_empty() { return Query::Sql(query); } - let stream = new_query_results(query, pool); + let inner = new_query_results(query, pool); let pstream = Box::pin(PolicyApplyingStream { - inner: stream, + inner, policies, columns, }); diff --git a/server/src/query/engine.rs b/server/src/query/engine.rs index e0ead43db..8b5678700 100644 --- a/server/src/query/engine.rs +++ b/server/src/query/engine.rs @@ -21,9 +21,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use uuid::Uuid; -// Results directly out of the database -pub(crate) type RawSqlStream = BoxStream<'static, Result>; - // Results with policies applied pub(crate) type SqlStream = BoxStream<'static, Result>; @@ -35,13 +32,15 @@ struct QueryResults { stream: T, } -pub(crate) fn new_query_results(raw_query: String, pool: &AnyPool) -> RawSqlStream { +pub(crate) fn new_query_results( + raw_query: String, + pool: &AnyPool, +) -> impl Stream> { // The string data will not move anymore. let raw_query_ptr = raw_query.as_ref() as *const str; let query = sqlx::query::(unsafe { &*raw_query_ptr }); let stream = query.fetch(pool).map(|i| i.map_err(anyhow::Error::new)); - - Box::pin(QueryResults { raw_query, stream }) + QueryResults { raw_query, stream } } impl>> Stream for QueryResults {