Skip to content

Commit

Permalink
twiq: Fix FirestoreTransaction::get_document to support Option
Browse files Browse the repository at this point in the history
  • Loading branch information
bouzuya committed Nov 7, 2022
1 parent 854dd32 commit 109657a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
34 changes: 19 additions & 15 deletions twiq/crates/db/src/firestore_rpc_event_store.rs
Expand Up @@ -94,14 +94,17 @@ fn event_fields_projection() -> Projection {
#[async_trait]
impl EventStore for FirestoreRpcEventStore {
async fn find_event(&self, event_id: EventId) -> event_store::Result<Option<Event>> {
// TODO: use Option
let document = self
.transaction
self.transaction
.get_document("events", &event_id.to_string())
.await
.map_err(|e| event_store::Error::Unknown(e.to_string()))?;
let event = event_from_fields(&document).unwrap();
Ok(Some(event))
.map_err(|e| event_store::Error::Unknown(e.to_string()))
.and_then(|document| {
document
.as_ref()
.map(event_from_fields)
.transpose()
.map_err(|e| event_store::Error::Unknown(e.to_string()))
})
}

async fn find_event_ids(&self, after: Option<EventId>) -> event_store::Result<Vec<EventId>> {
Expand Down Expand Up @@ -194,14 +197,14 @@ impl EventStore for FirestoreRpcEventStore {
async fn find_events(&self, after: Option<EventId>) -> event_store::Result<Vec<Event>> {
// get requested_at
let requested_at = match after {
Some(event_id) => {
let document = self
.transaction
.get_document("events", &event_id.to_string())
.await
.map_err(|status| event_store::Error::Unknown(status.to_string()))?;
Some(get_field_as_timestamp(&document, "requested_at").unwrap())
}
Some(event_id) => self
.transaction
.get_document("events", &event_id.to_string())
.await
.map_err(|status| event_store::Error::Unknown(status.to_string()))?
.and_then(|document| get_field_as_timestamp(&document, "requested_at"))
.map(Some)
.ok_or_else(|| event_store::Error::Unknown("not found".to_owned()))?,
None => None,
};

Expand Down Expand Up @@ -310,7 +313,8 @@ impl EventStore for FirestoreRpcEventStore {
.transaction
.get_document("event_streams", &event_stream.id().to_string())
.await
.map_err(|e| event_store::Error::Unknown(e.to_string()))?;
.map_err(|e| event_store::Error::Unknown(e.to_string()))?
.ok_or_else(|| event_store::Error::Unknown("not found".to_owned()))?;
let event_stream_seq = get_field_as_i64(&document, "seq")
.map(EventStreamSeq::try_from)
.ok_or_else(|| {
Expand Down
22 changes: 16 additions & 6 deletions twiq/crates/db/src/firestore_transaction.rs
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::Mutex;
use tonic::{
codegen::InterceptedService,
transport::{Channel, ClientTlsConfig, Endpoint},
Request, Status,
Code, Request, Status,
};

use crate::firestore_rpc::{
Expand Down Expand Up @@ -80,9 +80,12 @@ impl FirestoreTransaction {
client(&self.credential, self.channel.clone()).await
}

pub async fn get_document(&self, collection_id: &str, document_id: &str) -> Result<Document> {
let response = self
.client()
pub async fn get_document(
&self,
collection_id: &str,
document_id: &str,
) -> Result<Option<Document>> {
self.client()
.await?
.get_document(GetDocumentRequest {
name: self.document_path(collection_id, document_id),
Expand All @@ -91,8 +94,15 @@ impl FirestoreTransaction {
self.name(),
)),
})
.await?;
Ok(response.into_inner())
.await
.map(|response| Some(response.into_inner()))
.or_else(|status| {
if matches!(status.code(), Code::NotFound) {
Ok(None)
} else {
Err(status)?
}
})
}

pub async fn commit(self) -> Result<()> {
Expand Down
16 changes: 10 additions & 6 deletions twiq/crates/db/src/firestore_user_repository.rs
Expand Up @@ -31,10 +31,12 @@ impl UserRepository for FirestoreUserRepository {
.map_err(|e| user_repository::Error::Unknown(e.to_string()))?;
let event_store = FirestoreRpcEventStore::new(transaction.clone());

let document = transaction
.get_document("user_ids", &id.to_string())
.await
.unwrap();
let document = match transaction.get_document("user_ids", &id.to_string()).await {
Ok(None) => return Ok(None),
Ok(Some(doc)) => Ok(doc),
Err(e) => Err(e),
}
.map_err(|e| user_repository::Error::Unknown(e.to_string()))?;
let event_stream_id_as_str = get_field_as_str(&document, "event_stream_id").unwrap();
let event_stream_id = EventStreamId::from_str(event_stream_id_as_str).unwrap();
let event_stream = event_store
Expand Down Expand Up @@ -84,7 +86,8 @@ impl UserRepository for FirestoreUserRepository {
let document = transaction
.get_document(collection_id, &document_id)
.await
.map_err(|e| user_repository::Error::Unknown(e.to_string()))?;
.map_err(|e| user_repository::Error::Unknown(e.to_string()))?
.ok_or_else(|| user_repository::Error::Unknown("not found".to_owned()))?;
let before_event_stream_id_as_str =
get_field_as_str(&document, "event_stream_id").unwrap();
if before_event_stream_id_as_str != event_stream_id.to_string() {
Expand Down Expand Up @@ -133,7 +136,8 @@ impl UserRepository for FirestoreUserRepository {
let document = transaction
.get_document(collection_id, &document_id)
.await
.map_err(|e| user_repository::Error::Unknown(e.to_string()))?;
.map_err(|e| user_repository::Error::Unknown(e.to_string()))?
.ok_or_else(|| user_repository::Error::Unknown("not found".to_owned()))?;
let before_user_id_as_str = get_field_as_str(&document, "user_id").unwrap();
if before_user_id_as_str != user_id.to_string() {
return Err(user_repository::Error::Unknown(
Expand Down

0 comments on commit 109657a

Please sign in to comment.