Skip to content

Commit

Permalink
Use existential types to reduce boxing
Browse files Browse the repository at this point in the history
  • Loading branch information
espindola committed Jan 27, 2022
1 parent dd9b7f7 commit 7e11f6e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
12 changes: 6 additions & 6 deletions server/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::deno::make_field_policies;
use crate::policies::FieldPolicies;
use crate::query::engine;
use crate::query::engine::new_query_results;
use crate::query::engine::RawSqlStream;
use crate::query::engine::SqlStream;
use crate::runtime;
use crate::types::{Field, ObjectType, Type, TypeSystem, TypeSystemError, OAUTHUSER_TYPE_NAME};
Expand All @@ -21,6 +20,7 @@ use futures::TryStreamExt;
use itertools::Itertools;
use pin_project::pin_project;
use serde_json::value::Value;
use sqlx::any::AnyRow;
use sqlx::AnyPool;
use std::collections::HashMap;
use std::collections::HashSet;
Expand Down Expand Up @@ -228,14 +228,14 @@ enum Query {
}

#[pin_project]
struct PolicyApplyingStream {
struct PolicyApplyingStream<T> {
#[pin]
inner: RawSqlStream,
inner: T,
policies: FieldPolicies,
columns: Vec<(String, Type)>,
}

impl Stream for PolicyApplyingStream {
impl<T: Stream<Item = Result<AnyRow>>> Stream for PolicyApplyingStream<T> {
type Item = Result<JsonObject>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
Expand Down Expand Up @@ -313,9 +313,9 @@ fn sql_backing_store(
if policies.transforms.is_empty() {
return Query::Sql(query);
}
let stream = new_query_results(query, pool);
let inner = new_query_results(query, pool);
let pstream = Box::pin(PolicyApplyingStream {
inner: stream,
inner,
policies,
columns,
});
Expand Down
11 changes: 5 additions & 6 deletions server/src/query/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use uuid::Uuid;

// Results directly out of the database
pub(crate) type RawSqlStream = BoxStream<'static, Result<AnyRow>>;

// Results with policies applied
pub(crate) type SqlStream = BoxStream<'static, Result<JsonObject>>;

Expand All @@ -35,13 +32,15 @@ struct QueryResults<T> {
stream: T,
}

pub(crate) fn new_query_results(raw_query: String, pool: &AnyPool) -> RawSqlStream {
pub(crate) fn new_query_results(
raw_query: String,
pool: &AnyPool,
) -> impl Stream<Item = anyhow::Result<AnyRow>> {
// The string data will not move anymore.
let raw_query_ptr = raw_query.as_ref() as *const str;
let query = sqlx::query::<Any>(unsafe { &*raw_query_ptr });
let stream = query.fetch(pool).map(|i| i.map_err(anyhow::Error::new));

Box::pin(QueryResults { raw_query, stream })
QueryResults { raw_query, stream }
}

impl<T: Stream<Item = Result<AnyRow>>> Stream for QueryResults<T> {
Expand Down

0 comments on commit 7e11f6e

Please sign in to comment.