Skip to content

Commit

Permalink
twiq: Fix to use EventAt
Browse files Browse the repository at this point in the history
  • Loading branch information
bouzuya committed Nov 10, 2022
1 parent ac7b714 commit 33bab6c
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 22 deletions.
30 changes: 24 additions & 6 deletions twiq/crates/db/src/firestore_rest_event_store.rs
Expand Up @@ -17,7 +17,7 @@ use crate::{
};
use event_store_core::{
event::Event, event_id::EventId, event_payload::EventPayload, event_stream_id::EventStreamId,
event_stream_seq::EventStreamSeq, EventStream, EventType,
event_stream_seq::EventStreamSeq, EventAt, EventStream, EventType,
};

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -128,6 +128,20 @@ fn fields_to_event(fields: HashMap<String, Value>) -> Result<Event, TryFromEvent
EventStreamSeq::try_from(*n)
.map_err(|e| TryFromEventError::InvalidFormat(e.to_string()))
})?;
let field = "at";
let at = fields
.get(field)
.ok_or_else(|| TryFromEventError::NoField(field.to_owned()))
.and_then(|v| {
if let Value::String(s) = v {
Ok(s)
} else {
Err(TryFromEventError::InvalidValueType(field.to_owned()))
}
})
.and_then(|s| {
EventAt::from_str(s).map_err(|e| TryFromEventError::InvalidFormat(e.to_string()))
})?;
let field = "data";
let data = fields
.get(field)
Expand All @@ -143,7 +157,7 @@ fn fields_to_event(fields: HashMap<String, Value>) -> Result<Event, TryFromEvent
EventPayload::try_from(s.to_owned())
.map_err(|e| TryFromEventError::InvalidFormat(e.to_string()))
})?;
Ok(Event::new(id, r#type, stream_id, stream_seq, data))
Ok(Event::new(id, r#type, stream_id, stream_seq, at, data))
}

async fn find_events_by_event_id_after(
Expand Down Expand Up @@ -650,15 +664,17 @@ mod tests {
let r#type = EventType::from_str("created")?;
let stream_id = EventStreamId::generate();
let stream_seq = EventStreamSeq::from(1_u32);
let at = EventAt::now();
let data = EventPayload::try_from("{}".to_owned())?;
let event1 = Event::new(id, r#type, stream_id, stream_seq, data);
let event1 = Event::new(id, r#type, stream_id, stream_seq, at, data);
store(&project_id, &credential, None, vec![event1.clone()]).await?;

let stream_seq2 = stream_seq.next()?;
let id = EventId::generate();
let r#type = EventType::from_str("updated")?;
let at = EventAt::now();
let data = EventPayload::try_from(r#"{"foo":"bar"}"#.to_owned())?;
let event2 = Event::new(id, r#type, stream_id, stream_seq2, data);
let event2 = Event::new(id, r#type, stream_id, stream_seq2, at, data);
store(
&project_id,
&credential,
Expand All @@ -676,13 +692,15 @@ mod tests {
let r#type = EventType::from_str("created")?;
let stream_id = EventStreamId::generate();
let stream_seq = EventStreamSeq::from(1_u32);
let at = EventAt::now();
let data = EventPayload::try_from("{}".to_owned())?;
let event3 = Event::new(id, r#type, stream_id, stream_seq, data);
let event3 = Event::new(id, r#type, stream_id, stream_seq, at, data);
let stream_seq2 = stream_seq.next()?;
let id = EventId::generate();
let r#type = EventType::from_str("updated")?;
let at = EventAt::now();
let data = EventPayload::try_from(r#"{"foo":"bar"}"#.to_owned())?;
let event4 = Event::new(id, r#type, stream_id, stream_seq2, data);
let event4 = Event::new(id, r#type, stream_id, stream_seq2, at, data);
store(
&project_id,
&credential,
Expand Down
21 changes: 15 additions & 6 deletions twiq/crates/db/src/firestore_rpc_event_store.rs
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, str::FromStr};
use async_trait::async_trait;
use event_store_core::{
event_store::{self, EventStore},
Event, EventId, EventPayload, EventStream, EventStreamId, EventStreamSeq, EventType,
Event, EventAt, EventId, EventPayload, EventStream, EventStreamId, EventStreamSeq, EventType,
};
use prost_types::Timestamp;
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
Expand Down Expand Up @@ -419,12 +419,17 @@ fn event_from_fields(document: &Document) -> Result<Event> {
.transpose()
.map_err(|_| Error::Unknown("stream_id is not well-formed".to_owned()))?
.ok_or_else(|| Error::Unknown("stream_id is not found".to_owned()))?;
let at = get_field_as_str(document, "at")
.map(EventAt::from_str)
.transpose()
.map_err(|_| Error::Unknown("at is not well-formed".to_owned()))?
.ok_or_else(|| Error::Unknown("at is not found".to_owned()))?;
let payload = get_field_as_str(document, "data")
.map(EventPayload::from_str)
.transpose()
.map_err(|_| Error::Unknown("payload is not well-formed".to_owned()))?
.ok_or_else(|| Error::Unknown("payload is not found".to_owned()))?;
Ok(Event::new(id, r#type, stream_id, stream_seq, payload))
Ok(Event::new(id, r#type, stream_id, stream_seq, at, payload))
}

fn event_to_fields(event: &Event) -> HashMap<String, Value> {
Expand Down Expand Up @@ -474,8 +479,9 @@ mod tests {
let r#type = EventType::from_str("created")?;
let stream_id = EventStreamId::generate();
let stream_seq = EventStreamSeq::from(1_u32);
let at = EventAt::now();
let data = EventPayload::try_from("{}".to_owned())?;
let event1 = Event::new(id, r#type, stream_id, stream_seq, data);
let event1 = Event::new(id, r#type, stream_id, stream_seq, at, data);
let mut event_stream = EventStream::new(vec![event1.clone()])?;
event_store.store(None, event_stream.clone()).await?;
transaction.commit().await?;
Expand All @@ -486,8 +492,9 @@ mod tests {
let stream_seq2 = stream_seq.next()?;
let id = EventId::generate();
let r#type = EventType::from_str("updated")?;
let at = EventAt::now();
let data = EventPayload::try_from(r#"{"foo":"bar"}"#.to_owned())?;
let event2 = Event::new(id, r#type, stream_id, stream_seq2, data);
let event2 = Event::new(id, r#type, stream_id, stream_seq2, at, data);
event_stream.push_event(event2.clone())?;
event_store
.store(Some(stream_seq), event_stream.clone())
Expand All @@ -513,13 +520,15 @@ mod tests {
let r#type = EventType::from_str("created")?;
let stream_id = EventStreamId::generate();
let stream_seq = EventStreamSeq::from(1_u32);
let at = EventAt::now();
let data = EventPayload::try_from("{}".to_owned())?;
let event3 = Event::new(id, r#type, stream_id, stream_seq, data);
let event3 = Event::new(id, r#type, stream_id, stream_seq, at, data);
let stream_seq2 = stream_seq.next()?;
let id = EventId::generate();
let r#type = EventType::from_str("updated")?;
let at = EventAt::now();
let data = EventPayload::try_from(r#"{"foo":"bar"}"#.to_owned())?;
let event4 = Event::new(id, r#type, stream_id, stream_seq2, data);
let event4 = Event::new(id, r#type, stream_id, stream_seq2, at, data);
let event_stream = EventStream::new(vec![event3.clone(), event4.clone()])?;
event_store.store(None, event_stream.clone()).await?;
transaction.commit().await?;
Expand Down
3 changes: 2 additions & 1 deletion twiq/crates/domain/src/aggregate/user/event/user_created.rs
Expand Up @@ -88,7 +88,7 @@ impl TryFrom<RawEvent> for UserCreated {

#[cfg(test)]
mod tests {
use event_store_core::{EventId, EventStreamId, EventStreamSeq};
use event_store_core::{EventAt, EventId, EventStreamId, EventStreamSeq};

use super::*;

Expand All @@ -111,6 +111,7 @@ mod tests {
RawEventType::from(UserCreated::r#type()),
EventStreamId::generate(),
EventStreamSeq::from(1),
EventAt::now(),
e
))?,
o
Expand Down
Expand Up @@ -103,7 +103,7 @@ impl TryFrom<RawEvent> for UserRequested {

#[cfg(test)]
mod tests {
use event_store_core::{EventId, EventStreamId, EventStreamSeq};
use event_store_core::{EventAt, EventId, EventStreamId, EventStreamSeq};

use super::*;

Expand All @@ -128,6 +128,7 @@ mod tests {
RawEventType::from(UserRequested::r#type()),
EventStreamId::generate(),
EventStreamSeq::from(1),
EventAt::now(),
e
))?,
o
Expand Down
3 changes: 2 additions & 1 deletion twiq/crates/domain/src/aggregate/user/event/user_updated.rs
Expand Up @@ -104,7 +104,7 @@ impl TryFrom<RawEvent> for UserUpdated {

#[cfg(test)]
mod tests {
use event_store_core::{EventId, EventStreamId, EventStreamSeq};
use event_store_core::{EventAt, EventId, EventStreamId, EventStreamSeq};

use super::*;

Expand All @@ -129,6 +129,7 @@ mod tests {
RawEventType::from(UserUpdated::r#type()),
EventStreamId::generate(),
EventStreamSeq::from(1),
EventAt::now(),
e
))?,
o
Expand Down
Expand Up @@ -99,7 +99,7 @@ impl TryFrom<RawEvent> for UserRequestCreated {

#[cfg(test)]
mod tests {
use event_store_core::{EventId, EventStreamId, EventStreamSeq};
use event_store_core::{EventAt, EventId, EventStreamId, EventStreamSeq};

use super::*;

Expand Down Expand Up @@ -128,6 +128,7 @@ mod tests {
RawEventType::from(UserRequestCreated::r#type()),
EventStreamId::generate(),
EventStreamSeq::from(1),
EventAt::now(),
e
))?,
o
Expand Down
Expand Up @@ -105,7 +105,7 @@ impl TryFrom<RawEvent> for UserRequestFinished {

#[cfg(test)]
mod tests {
use event_store_core::{EventId, EventStreamId, EventStreamSeq};
use event_store_core::{EventAt, EventId, EventStreamId, EventStreamSeq};

use super::*;

Expand Down Expand Up @@ -135,6 +135,7 @@ mod tests {
RawEventType::from(UserRequestFinished::r#type()),
EventStreamId::generate(),
EventStreamSeq::from(1),
EventAt::now(),
e
))?,
o
Expand Down
Expand Up @@ -71,7 +71,7 @@ impl TryFrom<RawEvent> for UserRequestStarted {

#[cfg(test)]
mod tests {
use event_store_core::{EventId, EventStreamId, EventStreamSeq};
use event_store_core::{EventAt, EventId, EventStreamId, EventStreamSeq};

use super::*;

Expand All @@ -92,6 +92,7 @@ mod tests {
RawEventType::from(UserRequestStarted::r#type()),
EventStreamId::generate(),
EventStreamSeq::from(1),
EventAt::now(),
e
))?,
o
Expand Down
22 changes: 20 additions & 2 deletions twiq/crates/event_store_core/src/event.rs
@@ -1,6 +1,6 @@
use crate::{
event_id::EventId, event_payload::EventPayload, event_stream_id::EventStreamId,
event_stream_seq::EventStreamSeq, event_type::EventType,
event_stream_seq::EventStreamSeq, event_type::EventType, EventAt,
};

#[derive(Clone, Debug, Eq, PartialEq)]
Expand All @@ -9,6 +9,7 @@ pub struct Event {
r#type: EventType,
stream_id: EventStreamId,
stream_seq: EventStreamSeq,
at: EventAt,
payload: EventPayload,
}

Expand All @@ -18,13 +19,15 @@ impl Event {
r#type: EventType,
stream_id: EventStreamId,
stream_seq: EventStreamSeq,
at: EventAt,
payload: EventPayload,
) -> Self {
Self {
id,
r#type,
stream_id,
stream_seq,
at,
payload,
}
}
Expand All @@ -45,13 +48,19 @@ impl Event {
self.stream_seq
}

pub fn at(&self) -> EventAt {
self.at
}

pub fn payload(&self) -> &EventPayload {
&self.payload
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use super::*;

#[test]
Expand All @@ -60,12 +69,21 @@ mod tests {
let r#type = EventType::try_from("created".to_owned())?;
let stream_id = EventStreamId::generate();
let stream_seq = EventStreamSeq::from(1_u32);
let at = EventAt::from_str("2020-01-02T03:04:05Z")?;
let payload = EventPayload::try_from(String::from("123"))?;
let event = Event::new(id, r#type.clone(), stream_id, stream_seq, payload.clone());
let event = Event::new(
id,
r#type.clone(),
stream_id,
stream_seq,
at,
payload.clone(),
);
assert_eq!(event.id(), id);
assert_eq!(event.r#type(), &r#type);
assert_eq!(event.stream_id(), stream_id);
assert_eq!(event.stream_seq(), stream_seq);
assert_eq!(event.at(), at);
assert_eq!(event.payload(), &payload);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion twiq/crates/event_store_core/src/event_at.rs
Expand Up @@ -8,7 +8,7 @@ pub enum Error {
InvalidFormat(String),
}

#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct EventAt(OffsetDateTime);

impl Display for EventAt {
Expand Down

0 comments on commit 33bab6c

Please sign in to comment.