Skip to content

Commit

Permalink
mutations via native queries (#7)
Browse files Browse the repository at this point in the history
Implements a mutation endpoint to run procedures. I added an example native query to fixtures, `insertArtist`, to demonstrate.

I'll follow up with another PR that implements arguments to native queries where I'll replace the hard-coded inputs.

I'm having a problem querying the `Artist` collection which I think is unrelated, but I'm going to check that out and likely submit a bug fix PR. I confirmed that `insertArtist` does update the database correctly, and I wanted to get this PR out.

Ticket: https://hasurahq.atlassian.net/browse/MDB-86
  • Loading branch information
hallettj committed Mar 20, 2024
1 parent 76a3317 commit 43841fc
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/mongodb-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ configuration = { path = "../configuration" }
dc-api = { path = "../dc-api" }
dc-api-types = { path = "../dc-api-types" }
enum-iterator = "1.4.1"
futures = "^0.3"
http = "^0.2"
indexmap = { version = "2.1.0", features = ["serde"] }
itertools = "^0.10"
Expand Down
1 change: 1 addition & 0 deletions crates/mongodb-connector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod api_type_conversions;
mod capabilities;
mod error_mapping;
mod mongo_connector;
mod mutation;
mod schema;

use std::error::Error;
Expand Down
10 changes: 4 additions & 6 deletions crates/mongodb-connector/src/mongo_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use ndc_sdk::{
},
};

use crate::capabilities::mongo_capabilities_response;
use crate::{
api_type_conversions::{
v2_to_v3_explain_response, v2_to_v3_query_response, v3_to_v2_query_request, QueryContext,
},
capabilities::scalar_types,
error_mapping::{mongo_agent_error_to_explain_error, mongo_agent_error_to_query_error},
};
use crate::{capabilities::mongo_capabilities_response, mutation::handle_mutation_request};

#[derive(Clone, Default)]
pub struct MongoConnector;
Expand Down Expand Up @@ -115,12 +115,10 @@ impl Connector for MongoConnector {

async fn mutation(
_configuration: &Self::Configuration,
_state: &Self::State,
_request: MutationRequest,
state: &Self::State,
request: MutationRequest,
) -> Result<JsonResponse<MutationResponse>, MutationError> {
Err(MutationError::UnsupportedOperation(
"The MongoDB agent does not yet support mutations".to_owned(),
))
handle_mutation_request(state, request).await
}

async fn query(
Expand Down
98 changes: 98 additions & 0 deletions crates/mongodb-connector/src/mutation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::collections::BTreeMap;

use configuration::native_queries::NativeQuery;
use futures::future::try_join_all;
use itertools::Itertools;
use mongodb::Database;
use mongodb_agent_common::interface_types::MongoConfig;
use ndc_sdk::{
connector::MutationError,
json_response::JsonResponse,
models::{MutationOperation, MutationOperationResults, MutationRequest, MutationResponse},
};
use serde_json::Value;

/// A procedure combined with inputs
#[derive(Clone, Debug)]
#[allow(dead_code)]
struct Job<'a> {
// For the time being all procedures are native queries.
native_query: &'a NativeQuery,
arguments: BTreeMap<String, Value>,
}

impl<'a> Job<'a> {
pub fn new(native_query: &'a NativeQuery, arguments: BTreeMap<String, Value>) -> Self {
Job {
native_query,
arguments,
}
}
}

pub async fn handle_mutation_request(
config: &MongoConfig,
mutation_request: MutationRequest,
) -> Result<JsonResponse<MutationResponse>, MutationError> {
tracing::debug!(?config, mutation_request = %serde_json::to_string(&mutation_request).unwrap(), "executing mutation");
let database = config.client.database(&config.database);
let jobs = look_up_procedures(config, mutation_request)?;
let operation_results = try_join_all(
jobs.into_iter()
.map(|job| execute_job(database.clone(), job)),
)
.await?;
Ok(JsonResponse::Value(MutationResponse { operation_results }))
}

/// Looks up procedures according to the names given in the mutation request, and pairs them with
/// arguments and requested fields. Returns an error if any procedures cannot be found.
fn look_up_procedures(
config: &MongoConfig,
mutation_request: MutationRequest,
) -> Result<Vec<Job<'_>>, MutationError> {
let (jobs, not_found): (Vec<Job>, Vec<String>) = mutation_request
.operations
.into_iter()
.map(|operation| match operation {
MutationOperation::Procedure {
name, arguments, ..
} => {
let native_query = config
.native_queries
.iter()
.find(|native_query| native_query.name == name);
native_query.ok_or(name).map(|nq| Job::new(nq, arguments))
}
})
.partition_result();

if !not_found.is_empty() {
return Err(MutationError::UnprocessableContent(format!(
"request includes unknown procedures: {}",
not_found.join(", ")
)));
}

Ok(jobs)
}

async fn execute_job(
database: Database,
job: Job<'_>,
) -> Result<MutationOperationResults, MutationError> {
let result = database
.run_command(job.native_query.command.clone(), None)
.await
.map_err(|err| match *err.kind {
mongodb::error::ErrorKind::InvalidArgument { message, .. } => {
MutationError::UnprocessableContent(message)
}
err => MutationError::Other(Box::new(err)),
})?;
let json_result =
serde_json::to_value(result).map_err(|err| MutationError::Other(Box::new(err)))?;
Ok(MutationOperationResults::Procedure {
result: json_result,
})
}
1 change: 1 addition & 0 deletions fixtures/connector/chinook/native_queries/hello.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
name: hello
description: Example of a read-only native query
objectTypes:
- name: HelloResult
fields:
Expand Down
16 changes: 16 additions & 0 deletions fixtures/connector/chinook/native_queries/insert_artist.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: insertArtist
description: Example of a database update using a native query
objectTypes:
- name: InsertArtist
fields:
- name: ok
type: !scalar int
- name: n
type: !scalar int
resultType: !object InsertArtist
# TODO: implement arguments instead of hard-coding inputs
command:
insert: "Artist"
documents:
- ArtistId: 1001
Name: Regina Spektor
54 changes: 54 additions & 0 deletions fixtures/ddn/subgraphs/chinook/commands/InsertArtist.hml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
kind: Command
version: v1
definition:
name: insertArtist
description: Example of a database update using a native query
outputType: InsertArtist
arguments: []
source:
dataConnectorName: mongodb
dataConnectorCommand:
procedure: insertArtist
typeMapping:
InsertArtist:
fieldMapping:
ok: { column: ok }
n: { column: n }
graphql:
rootFieldName: insertArtist
rootFieldKind: Mutation

---
kind: CommandPermissions
version: v1
definition:
commandName: insertArtist
permissions:
- role: admin
allowExecution: true

---
kind: ObjectType
version: v1
definition:
name: InsertArtist
graphql:
typeName: InsertArtist
fields:
- name: ok
type: Int!
- name: n
type: Int!

---
kind: TypePermissions
version: v1
definition:
typeName: InsertArtist
permissions:
- role: admin
output:
allowedFields:
- ok
- n

14 changes: 13 additions & 1 deletion fixtures/ddn/subgraphs/chinook/dataconnectors/mongodb.hml
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,12 @@ definition:
type: { type: named, name: Int }
readOnly:
type: { type: named, name: Boolean }
InsertArtist:
fields:
ok:
type: { type: named, name: Int }
n:
type: { type: named, name: Int }
collections:
- name: Album
arguments: {}
Expand Down Expand Up @@ -1002,10 +1008,16 @@ definition:
foreign_keys: {}
functions:
- name: hello
description: Example of a read-only native query
result_type: { type: named, name: HelloResult }
arguments: {}
command: { hello: 1 }
procedures: []
procedures:
- name: insertArtist
description: Example of a database update using a native query
result_type: { type: named, name: InsertArtist }
arguments: {}
command: { insert: Artist, documents: [{ ArtistId: 1001, Name: Regina Spektor }] }
capabilities:
version: ^0.1.0
capabilities:
Expand Down

0 comments on commit 43841fc

Please sign in to comment.