Skip to content

Commit

Permalink
twiq: Add db::in_memory_user_request_repository mod
Browse files Browse the repository at this point in the history
  • Loading branch information
bouzuya committed Oct 8, 2022
1 parent bbe6c39 commit 3dfaec9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
54 changes: 54 additions & 0 deletions twiq/crates/db/src/in_memory_user_request_repository.rs
@@ -0,0 +1,54 @@
use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use domain::aggregate::{user::UserRequestId, user_request::UserRequest};
use event_store_core::{event_store::EventStore, EventStream, EventStreamId};
use tokio::sync::Mutex;
use use_case::user_request_repository::{Error, Result, UserRequestRepository};

use crate::in_memory_event_store::InMemoryEventStore;

#[derive(Debug, Default)]
pub struct InMemoryUserRequestRepository {
event_store: Arc<Mutex<InMemoryEventStore>>,
aggregate_ids: Arc<Mutex<HashMap<UserRequestId, EventStreamId>>>,
}

#[async_trait]
impl UserRequestRepository for InMemoryUserRequestRepository {
async fn find(&self, id: UserRequestId) -> Result<Option<UserRequest>> {
let event_store = self.event_store.lock().await;
let aggregate_ids = self.aggregate_ids.lock().await;
let event_stream_id = match aggregate_ids.get(&id) {
None => return Ok(None),
Some(event_stream_id) => *event_stream_id,
};
let event_stream = event_store
.find_event_stream(event_stream_id)
.await
.map_err(|e| Error::Unknown(e.to_string()))?;
match event_stream {
None => Ok(None),
Some(event_stream) => UserRequest::try_from(event_stream)
.map(Some)
.map_err(|e| Error::Unknown(e.to_string())),
}
}

async fn store(&self, before: Option<UserRequest>, after: UserRequest) -> Result<()> {
let event_store = self.event_store.lock().await;
let mut user_request_ids = self.aggregate_ids.lock().await;
let aggregate_id = after.id();
let event_stream = EventStream::from(after);
let event_stream_id = event_stream.id();
event_store
.store(
before.map(|user_request| EventStream::from(user_request).seq()),
event_stream,
)
.await
.map_err(|e| Error::Unknown(e.to_string()))?;
user_request_ids.insert(aggregate_id, event_stream_id);
Ok(())
}
}
1 change: 1 addition & 0 deletions twiq/crates/db/src/lib.rs
Expand Up @@ -2,4 +2,5 @@ pub mod firestore_event_store;
pub mod firestore_rest;
pub mod in_memory_event_store;
pub mod in_memory_user_repository;
pub mod in_memory_user_request_repository;
pub mod in_memory_worker_repository;
6 changes: 6 additions & 0 deletions twiq/crates/domain/src/aggregate/user_request.rs
Expand Up @@ -84,6 +84,10 @@ impl UserRequest {
.unwrap();
Ok(())
}

pub fn id(&self) -> UserRequestId {
self.id
}
}

impl From<UserRequest> for EventStream {
Expand Down Expand Up @@ -142,6 +146,8 @@ mod tests {

use super::*;

// TODO: test id

#[test]
fn test() -> anyhow::Result<()> {
use event_store_core::EventType as RawEventType;
Expand Down

0 comments on commit 3dfaec9

Please sign in to comment.