Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into dmoverton/sampling-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dmoverton committed Mar 21, 2024
2 parents b6da219 + 66ded02 commit 01981c9
Show file tree
Hide file tree
Showing 24 changed files with 1,264 additions and 125 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn update(context: &Context, args: &UpdateArgs) -> anyhow::Result<()> {
introspection::sample_schema_from_db(sample_size, &context.mongo_config).await?
}
};
let configuration = Configuration::from_schema(schema);
let configuration = Configuration::from_schema(schema)?;

configuration::write_directory(&context.path, &configuration).await?;

Expand Down
1 change: 1 addition & 0 deletions crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
futures = "^0.3"
itertools = "^0.12"
mongodb = "2.8"
Expand Down
96 changes: 90 additions & 6 deletions crates/configuration/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{io, path::Path};
use std::path::Path;

use anyhow::ensure;
use itertools::Itertools;
use schemars::JsonSchema;
use serde::Deserialize;

use crate::{native_queries::NativeQuery, read_directory, Schema};
use crate::{native_queries::NativeQuery, read_directory, schema::ObjectType, Schema};

#[derive(Clone, Debug, Default, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
Expand All @@ -18,16 +20,98 @@ pub struct Configuration {
}

impl Configuration {
pub fn from_schema(schema: Schema) -> Self {
Self {
pub fn validate(schema: Schema, native_queries: Vec<NativeQuery>) -> anyhow::Result<Self> {
let config = Configuration {
schema,
..Default::default()
native_queries,
};

{
let duplicate_type_names: Vec<&str> = config
.object_types()
.map(|t| t.name.as_ref())
.duplicates()
.collect();
ensure!(
duplicate_type_names.is_empty(),
"configuration contains multiple definitions for these object type names: {}",
duplicate_type_names.join(", ")
);
}

{
let duplicate_collection_names: Vec<&str> = config
.schema
.collections
.iter()
.map(|c| c.name.as_ref())
.duplicates()
.collect();
ensure!(
duplicate_collection_names.is_empty(),
"configuration contains multiple definitions for these collection names: {}",
duplicate_collection_names.join(", ")
);
}

Ok(config)
}

pub fn from_schema(schema: Schema) -> anyhow::Result<Self> {
Self::validate(schema, Default::default())
}

pub async fn parse_configuration(
configuration_dir: impl AsRef<Path> + Send,
) -> io::Result<Self> {
) -> anyhow::Result<Self> {
read_directory(configuration_dir).await
}

/// Returns object types collected from schema and native queries
pub fn object_types(&self) -> impl Iterator<Item = &ObjectType> {
let object_types_from_schema = self.schema.object_types.iter();
let object_types_from_native_queries = self
.native_queries
.iter()
.flat_map(|native_query| &native_query.object_types);
object_types_from_schema.chain(object_types_from_native_queries)
}
}

#[cfg(test)]
mod tests {
use mongodb::bson::doc;

use super::*;
use crate::{schema::Type, Schema};

#[test]
fn fails_with_duplicate_object_types() {
let schema = Schema {
collections: Default::default(),
object_types: vec![ObjectType {
name: "Album".to_owned(),
fields: Default::default(),
description: Default::default(),
}],
};
let native_queries = vec![NativeQuery {
name: "hello".to_owned(),
object_types: vec![ObjectType {
name: "Album".to_owned(),
fields: Default::default(),
description: Default::default(),
}],
result_type: Type::Object("Album".to_owned()),
command: doc! { "command": 1 },
arguments: Default::default(),
selection_criteria: Default::default(),
description: Default::default(),
mode: Default::default(),
}];
let result = Configuration::validate(schema, native_queries);
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("multiple definitions"));
assert!(error_msg.contains("Album"));
}
}
59 changes: 29 additions & 30 deletions crates/configuration/src/directory.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use anyhow::{anyhow, Context as _};
use futures::stream::TryStreamExt as _;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use std::{
io,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;

Expand All @@ -29,7 +27,7 @@ const YAML: FileFormat = FileFormat::Yaml;
/// Read configuration from a directory
pub async fn read_directory(
configuration_dir: impl AsRef<Path> + Send,
) -> io::Result<Configuration> {
) -> anyhow::Result<Configuration> {
let dir = configuration_dir.as_ref();

let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?;
Expand All @@ -38,16 +36,13 @@ pub async fn read_directory(
.await?
.unwrap_or_default();

Ok(Configuration {
schema,
native_queries,
})
Configuration::validate(schema, native_queries)
}

/// Parse all files in a directory with one of the allowed configuration extensions according to
/// the given type argument. For example if `T` is `NativeQuery` this function assumes that all
/// json and yaml files in the given directory should be parsed as native query configurations.
async fn read_subdir_configs<T>(subdir: &Path) -> io::Result<Option<Vec<T>>>
async fn read_subdir_configs<T>(subdir: &Path) -> anyhow::Result<Option<Vec<T>>>
where
for<'a> T: Deserialize<'a>,
{
Expand All @@ -57,6 +52,7 @@ where

let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?);
let configs = dir_stream
.map_err(|err| err.into())
.try_filter_map(|dir_entry| async move {
// Permits regular files and symlinks, does not filter out symlinks to directories.
let is_file = !(dir_entry.file_type().await?.is_dir());
Expand Down Expand Up @@ -86,7 +82,7 @@ where

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc; reads the file; and parses it according to its extension.
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> io::Result<T>
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
{
Expand All @@ -96,38 +92,39 @@ where

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc, and returns the found path with its file format.
async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> {
async fn find_file(
configuration_dir: &Path,
basename: &str,
) -> anyhow::Result<(PathBuf, FileFormat)> {
for (extension, format) in CONFIGURATION_EXTENSIONS {
let path = configuration_dir.join(format!("{basename}.{extension}"));
if fs::try_exists(&path).await? {
return Ok((path, format));
}
}

Err(io::Error::new(
io::ErrorKind::NotFound,
format!(
"could not find file, {:?}",
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
.map(|(ext, _)| ext)
.join(",")
))
),
Err(anyhow!(
"could not find file, {:?}",
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
.map(|(ext, _)| ext)
.join(",")
))
))
}

async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> io::Result<T>
async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
{
let bytes = fs::read(path.as_ref()).await?;
let value = match format {
FileFormat::Json => serde_json::from_slice(&bytes)?,
FileFormat::Json => serde_json::from_slice(&bytes)
.with_context(|| format!("error parsing {:?}", path.as_ref()))?,
FileFormat::Yaml => serde_yaml::from_slice(&bytes)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
.with_context(|| format!("error parsing {:?}", path.as_ref()))?,
};
Ok(value)
}
Expand All @@ -136,7 +133,7 @@ where
pub async fn write_directory(
configuration_dir: impl AsRef<Path>,
configuration: &Configuration,
) -> io::Result<()> {
) -> anyhow::Result<()> {
write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await
}

Expand All @@ -149,11 +146,13 @@ async fn write_file<T>(
configuration_dir: impl AsRef<Path>,
basename: &str,
value: &T,
) -> io::Result<()>
) -> anyhow::Result<()>
where
T: Serialize,
{
let path = default_file_path(configuration_dir, basename);
let bytes = serde_json::to_vec_pretty(value)?;
fs::write(path, bytes).await
fs::write(path.clone(), bytes)
.await
.with_context(|| format!("error writing {:?}", path))
}
19 changes: 16 additions & 3 deletions crates/configuration/src/native_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria};
use schemars::JsonSchema;
use serde::Deserialize;

use crate::schema::{ObjectField, Type};
use crate::schema::{ObjectField, ObjectType, Type};

/// An arbitrary database command using MongoDB's runCommand API.
/// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/
Expand All @@ -12,10 +12,19 @@ pub struct NativeQuery {
/// Name that will be used to identify the query in your data graph
pub name: String,

/// Type of data returned by the query.
/// You may define object types here to reference in `result_type`. Any types defined here will
/// be merged with the definitions in `schema.json`. This allows you to maintain hand-written
/// types for native queries without having to edit a generated `schema.json` file.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub object_types: Vec<ObjectType>,

/// Type of data returned by the query. You may reference object types defined in the
/// `object_types` list in this definition, or you may reference object types from
/// `schema.json`.
pub result_type: Type,

/// Arguments for per-query customization
#[serde(default)]
pub arguments: Vec<ObjectField>,

/// Command to run expressed as a BSON document
Expand All @@ -31,7 +40,11 @@ pub struct NativeQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,

/// Set to `readWrite` if this native query might modify data in the database.
/// Set to `readWrite` if this native query might modify data in the database. When refreshing
/// a dataconnector native queries will appear in the corresponding `DataConnectorLink`
/// definition as `functions` if they are read-only, or as `procedures` if they are read-write.
/// Functions are intended to map to GraphQL Query fields, while procedures map to Mutation
/// fields.
#[serde(default)]
pub mode: Mode,
}
Expand Down
20 changes: 10 additions & 10 deletions crates/mongodb-agent-common/src/mongodb/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::Serialize;

use dc_api_types::Field;

use crate::mongodb::selection::format_col_path;
use crate::mongodb::selection::serialized_null_checked_column_reference;

/// A projection determines which fields to request from the result of a query.
///
Expand Down Expand Up @@ -58,7 +58,7 @@ fn project_field_as(parent_columns: &[&str], field: &Field) -> ProjectAs {
[] => format!("${column}"),
_ => format!("${}.{}", parent_columns.join("."), column),
};
let bson_col_path = format_col_path(col_path, column_type);
let bson_col_path = serialized_null_checked_column_reference(col_path, column_type);
ProjectAs::Expression(bson_col_path)
}
Field::NestedObject { column, query } => {
Expand Down Expand Up @@ -198,18 +198,18 @@ mod tests {
assert_eq!(
to_document(&projection)?,
doc! {
"foo": "$foo",
"foo_again": "$foo",
"foo": { "$ifNull": ["$foo", null] },
"foo_again": { "$ifNull": ["$foo", null] },
"bar": {
"baz": "$bar.baz",
"baz_again": "$bar.baz"
"baz": { "$ifNull": ["$bar.baz", null] },
"baz_again": { "$ifNull": ["$bar.baz", null] }
},
"bar_again": {
"baz": "$bar.baz"
"baz": { "$ifNull": ["$bar.baz", null] }
},
"my_date": {
"$dateToString": {
"date": "$my_date"
"date": { "$ifNull": ["$my_date", null] }
}
}
}
Expand Down Expand Up @@ -260,10 +260,10 @@ mod tests {
to_document(&projection)?,
doc! {
"class_students": {
"name": "$class_students.name",
"name": { "$ifNull": ["$class_students.name", null] },
},
"students": {
"student_name": "$class_students.name",
"student_name": { "$ifNull": ["$class_students.name", null] },
},
}
);
Expand Down
Loading

0 comments on commit 01981c9

Please sign in to comment.