diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 10880cb..ae79057 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2105,6 +2105,7 @@ dependencies = [ "google-api-proto", "google-authz", "hyper", + "prost", "prost-types", "serde", "serde-firestore-value", diff --git a/rust/crates/web/Cargo.toml b/rust/crates/web/Cargo.toml index 935205c..60c6aee 100644 --- a/rust/crates/web/Cargo.toml +++ b/rust/crates/web/Cargo.toml @@ -14,6 +14,7 @@ features = "0.10.0" google-api-proto = { version = "1.415.0", features = ["google-firestore-v1"] } google-authz = { version = "1.0.0-alpha.5", features = ["tonic"] } hyper = { version = "0.14.27", features = ["full"] } +prost = "0.12.1" prost-types = "0.12" serde = { version = "1.0.190", features = ["derive"] } serde-firestore-value = "0.2.0" diff --git a/rust/crates/web/src/infra/firestore.rs b/rust/crates/web/src/infra/firestore.rs index 0109fcd..34ed428 100644 --- a/rust/crates/web/src/infra/firestore.rs +++ b/rust/crates/web/src/infra/firestore.rs @@ -75,4 +75,21 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_transaction() -> anyhow::Result<()> { + let endpoint = "http://firebase:8080"; + let mut client = Client::new( + "demo-project1".to_string(), + "(default)".to_string(), + endpoint, + ) + .await?; + // let collection_path = client.collection("repositories".to_string()); + + let transaction = client.begin_transaction().await?; + client.commit(transaction, vec![]).await?; + + Ok(()) + } } diff --git a/rust/crates/web/src/infra/firestore/client.rs b/rust/crates/web/src/infra/firestore/client.rs index eb4db15..b32e961 100644 --- a/rust/crates/web/src/infra/firestore/client.rs +++ b/rust/crates/web/src/infra/firestore/client.rs @@ -1,13 +1,14 @@ use google_api_proto::google::firestore::v1::{ firestore_client::FirestoreClient, precondition::ConditionType, value::ValueType, + BeginTransactionRequest, BeginTransactionResponse, CommitRequest, CommitResponse, CreateDocumentRequest, DeleteDocumentRequest, Document as FirestoreDocument, GetDocumentRequest, ListDocumentsRequest, ListDocumentsResponse, MapValue, Precondition, - UpdateDocumentRequest, + UpdateDocumentRequest, Write, }; use google_authz::{Credentials, GoogleAuthz}; use serde::{de::DeserializeOwned, Serialize}; use serde_firestore_value::to_value; -use tonic::{transport::Channel, Request}; +use tonic::transport::Channel; use crate::infra::firestore::document; @@ -17,6 +18,8 @@ use super::{ timestamp::Timestamp, }; +pub struct Transaction(prost::bytes::Bytes); + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("credentials {0}")] @@ -41,8 +44,6 @@ pub struct Client { } impl Client { - // TODO: begin_transaction - // TODO: commit // TODO: rollback // TODO: run_query @@ -62,10 +63,40 @@ impl Client { Ok(Self { client, root_path }) } + pub async fn begin_transaction(&mut self) -> Result { + let response = self + .client + .begin_transaction(BeginTransactionRequest { + database: self.root_path.database_name(), + options: None, + }) + .await?; + let BeginTransactionResponse { transaction } = response.into_inner(); + Ok(Transaction(transaction)) + } + pub fn collection(&self, collection_id: String) -> CollectionPath { self.root_path.clone().collection(collection_id) } + pub async fn commit( + &mut self, + transaction: Transaction, + writes: Vec, + ) -> Result<((), Option), Error> { + let response = self + .client + .commit(CommitRequest { + database: self.root_path.database_name(), + writes, + transaction: transaction.0, + }) + .await?; + // TODO: write_results + let CommitResponse { commit_time, .. } = response.into_inner(); + Ok(((), commit_time.map(Timestamp::from))) + } + pub async fn create( &mut self, collection_path: &CollectionPath, @@ -77,7 +108,7 @@ impl Client { { let response = self .client - .create_document(Request::new(CreateDocumentRequest { + .create_document(CreateDocumentRequest { parent: collection_path.parent().path(), collection_id: collection_path.id().to_string(), document_id: "".to_string(), @@ -95,7 +126,7 @@ impl Client { update_time: None, }), mask: None, - })) + }) .await?; Document::new(response.into_inner()).map_err(Error::Deserialize) } @@ -106,14 +137,14 @@ impl Client { current_update_time: Timestamp, ) -> Result<(), Error> { self.client - .delete_document(Request::new(DeleteDocumentRequest { + .delete_document(DeleteDocumentRequest { name: document_path.path(), current_document: Some(Precondition { condition_type: Some(ConditionType::UpdateTime(prost_types::Timestamp::from( current_update_time, ))), }), - })) + }) .await?; Ok(()) } @@ -128,11 +159,11 @@ impl Client { { let response = self .client - .get_document(Request::new(GetDocumentRequest { + .get_document(GetDocumentRequest { name: document_path.path(), mask: None, consistency_selector: None, - })) + }) .await?; Document::new(response.into_inner()).map_err(Error::Deserialize) } @@ -147,12 +178,12 @@ impl Client { { let response = self .client - .list_documents(Request::new(ListDocumentsRequest { + .list_documents(ListDocumentsRequest { parent: collection_path.parent().path(), collection_id: collection_path.id().to_string(), page_size: 100, ..Default::default() - })) + }) .await?; let ListDocumentsResponse { documents, @@ -178,7 +209,7 @@ impl Client { { let response = self .client - .update_document(Request::new(UpdateDocumentRequest { + .update_document(UpdateDocumentRequest { document: Some(FirestoreDocument { name: document_path.path(), fields: { @@ -199,7 +230,7 @@ impl Client { current_update_time, ))), }), - })) + }) .await?; Document::new(response.into_inner()).map_err(Error::Deserialize) }