Skip to content

Commit

Permalink
twiq: Add Event::type
Browse files Browse the repository at this point in the history
  • Loading branch information
bouzuya committed Sep 23, 2022
1 parent 7d15c17 commit 52ec460
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 23 deletions.
31 changes: 25 additions & 6 deletions twiq/crates/db/src/firestore_event_store.rs
Expand Up @@ -14,7 +14,7 @@ use crate::firestore_rest::{
};
use event_store_core::{
event::Event, event_data::EventData, event_id::EventId, event_stream_id::EventStreamId,
event_stream_seq::EventStreamSeq,
event_stream_seq::EventStreamSeq, EventType,
};

#[derive(Debug, thiserror::Error)]
Expand All @@ -41,6 +41,7 @@ fn event_stream_to_fields(
fn event_to_fields(event: &Event) -> HashMap<String, Value> {
let mut map = HashMap::new();
map.insert("id".to_owned(), Value::String(event.id().to_string()));
map.insert("type".to_owned(), Value::String(event.r#type().to_string()));
map.insert(
"stream_id".to_owned(),
Value::String(event.stream_id().to_string()),
Expand Down Expand Up @@ -81,6 +82,20 @@ fn fields_to_event(fields: HashMap<String, Value>) -> Result<Event, TryFromEvent
.and_then(|s| {
EventId::from_str(s).map_err(|e| TryFromEventError::InvalidFormat(e.to_string()))
})?;
let field = "type";
let r#type = fields
.get(field)
.ok_or_else(|| TryFromEventError::NoField(field.to_owned()))
.and_then(|v| {
if let Value::String(s) = v {
Ok(s)
} else {
Err(TryFromEventError::InvalidValueType(field.to_owned()))
}
})
.and_then(|s| {
EventType::from_str(s).map_err(|e| TryFromEventError::InvalidFormat(e.to_string()))
})?;
let field = "stream_id";
let stream_id = fields
.get(field)
Expand Down Expand Up @@ -125,7 +140,7 @@ fn fields_to_event(fields: HashMap<String, Value>) -> Result<Event, TryFromEvent
EventData::try_from(s.to_owned())
.map_err(|e| TryFromEventError::InvalidFormat(e.to_string()))
})?;
Ok(Event::new(id, stream_id, stream_seq, data))
Ok(Event::new(id, r#type, stream_id, stream_seq, data))
}

async fn find_events_by_event_id_after(
Expand Down Expand Up @@ -635,16 +650,18 @@ mod tests {
let credential = Credential::find_default(config).await?;

let id = EventId::generate();
let r#type = EventType::from_str("created")?;
let stream_id = EventStreamId::generate();
let stream_seq = EventStreamSeq::from(1_u32);
let data = EventData::try_from("{}".to_owned())?;
let event1 = Event::new(id, stream_id, stream_seq, data);
let event1 = Event::new(id, r#type, stream_id, stream_seq, data);
store(&project_id, &credential, None, vec![event1.clone()]).await?;

let stream_seq2 = stream_seq.next()?;
let id = EventId::generate();
let r#type = EventType::from_str("updated")?;
let data = EventData::try_from(r#"{"foo":"bar"}"#.to_owned())?;
let event2 = Event::new(id, stream_id, stream_seq2, data);
let event2 = Event::new(id, r#type, stream_id, stream_seq2, data);
store(
&project_id,
&credential,
Expand All @@ -659,14 +676,16 @@ mod tests {
assert_eq!(events, vec![event1.clone(), event2.clone()]);

let id = EventId::generate();
let r#type = EventType::from_str("created")?;
let stream_id = EventStreamId::generate();
let stream_seq = EventStreamSeq::from(1_u32);
let data = EventData::try_from("{}".to_owned())?;
let event3 = Event::new(id, stream_id, stream_seq, data);
let event3 = Event::new(id, r#type, stream_id, stream_seq, data);
let stream_seq2 = stream_seq.next()?;
let id = EventId::generate();
let r#type = EventType::from_str("updated")?;
let data = EventData::try_from(r#"{"foo":"bar"}"#.to_owned())?;
let event4 = Event::new(id, stream_id, stream_seq2, data);
let event4 = Event::new(id, r#type, stream_id, stream_seq2, data);
store(
&project_id,
&credential,
Expand Down
3 changes: 2 additions & 1 deletion twiq/crates/domain/src/aggregate/user/event/user_created.rs
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use event_store_core::{
event_id::EventId, event_stream_id::EventStreamId, event_stream_seq::EventStreamSeq,
Event as RawEvent, EventData,
Event as RawEvent, EventData, EventType,
};

use crate::value::{At, TwitterUserId};
Expand Down Expand Up @@ -38,6 +38,7 @@ impl From<UserCreated> for RawEvent {
fn from(event: UserCreated) -> Self {
RawEvent::new(
EventId::from_str(event.id.as_str()).expect("id"),
EventType::from_str("user_created").expect("event_type"),
EventStreamId::from_str(event.stream_id.as_str()).expect("stream_id"),
EventStreamSeq::from(event.stream_seq),
EventData::try_from(serde_json::to_string(&event).expect("event")).expect("data"),
Expand Down
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use event_store_core::{
event_id::EventId, event_stream_id::EventStreamId, event_stream_seq::EventStreamSeq,
Event as RawEvent, EventData,
Event as RawEvent, EventData, EventType,
};

use crate::value::{At, TwitterUserId, UserId, UserRequestId};
Expand Down Expand Up @@ -53,6 +53,7 @@ impl From<UserRequested> for RawEvent {
fn from(event: UserRequested) -> Self {
RawEvent::new(
EventId::from_str(event.id.as_str()).expect("id"),
EventType::from_str("user_requested").expect("event_type"),
EventStreamId::from_str(event.stream_id.as_str()).expect("stream_id"),
EventStreamSeq::from(event.stream_seq),
EventData::try_from(serde_json::to_string(&event).expect("event")).expect("data"),
Expand Down
3 changes: 2 additions & 1 deletion twiq/crates/domain/src/aggregate/user/event/user_updated.rs
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use event_store_core::{
event_id::EventId, event_stream_id::EventStreamId, event_stream_seq::EventStreamSeq,
Event as RawEvent, EventData,
Event as RawEvent, EventData, EventType,
};

use crate::{
Expand Down Expand Up @@ -44,6 +44,7 @@ impl From<UserUpdated> for RawEvent {
fn from(event: UserUpdated) -> Self {
RawEvent::new(
EventId::from_str(event.id.as_str()).expect("id"),
EventType::from_str("user_updated").expect("event_type"),
EventStreamId::from_str(event.stream_id.as_str()).expect("stream_id"),
EventStreamSeq::from(event.stream_seq),
EventData::try_from(serde_json::to_string(&event).expect("event")).expect("data"),
Expand Down
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use event_store_core::{
event_id::EventId, event_stream_id::EventStreamId, event_stream_seq::EventStreamSeq,
Event as RawEvent, EventData,
Event as RawEvent, EventData, EventType,
};

use crate::value::{At, TwitterUserId, UserId, UserRequestId};
Expand Down Expand Up @@ -49,6 +49,7 @@ impl From<UserRequestCreated> for RawEvent {
fn from(event: UserRequestCreated) -> Self {
RawEvent::new(
EventId::from_str(event.id.as_str()).expect("id"),
EventType::from_str("user_request_created").expect("event_type"),
EventStreamId::from_str(event.stream_id.as_str()).expect("stream_id"),
EventStreamSeq::from(event.stream_seq),
EventData::try_from(serde_json::to_string(&event).expect("event")).expect("data"),
Expand Down
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use event_store_core::{
event_id::EventId, event_stream_id::EventStreamId, event_stream_seq::EventStreamSeq,
Event as RawEvent, EventData,
Event as RawEvent, EventData, EventType,
};

use crate::{
Expand Down Expand Up @@ -62,6 +62,7 @@ impl From<UserRequestFinished> for RawEvent {
fn from(event: UserRequestFinished) -> Self {
RawEvent::new(
EventId::from_str(event.id.as_str()).expect("id"),
EventType::from_str("user_request_finished").expect("event_type"),
EventStreamId::from_str(event.stream_id.as_str()).expect("stream_id"),
EventStreamSeq::from(event.stream_seq),
EventData::try_from(serde_json::to_string(&event).expect("event")).expect("data"),
Expand Down
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;

use event_store_core::{
event_id::EventId, event_stream_id::EventStreamId, event_stream_seq::EventStreamSeq,
Event as RawEvent, EventData,
Event as RawEvent, EventData, EventType,
};

use crate::value::At;
Expand Down Expand Up @@ -35,6 +35,7 @@ impl From<UserRequestStarted> for RawEvent {
fn from(event: UserRequestStarted) -> Self {
RawEvent::new(
EventId::from_str(event.id.as_str()).expect("id"),
EventType::from_str("user_request_started").expect("event_type"),
EventStreamId::from_str(event.stream_id.as_str()).expect("stream_id"),
EventStreamSeq::from(event.stream_seq),
EventData::try_from(serde_json::to_string(&event).expect("event")).expect("data"),
Expand Down
13 changes: 11 additions & 2 deletions twiq/crates/event_store_core/src/event.rs
@@ -1,11 +1,12 @@
use crate::{
event_data::EventData, event_id::EventId, event_stream_id::EventStreamId,
event_stream_seq::EventStreamSeq,
event_stream_seq::EventStreamSeq, event_type::EventType,
};

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Event {
id: EventId,
r#type: EventType,
stream_id: EventStreamId,
stream_seq: EventStreamSeq,
data: EventData,
Expand All @@ -14,12 +15,14 @@ pub struct Event {
impl Event {
pub fn new(
id: EventId,
r#type: EventType,
stream_id: EventStreamId,
stream_seq: EventStreamSeq,
data: EventData,
) -> Self {
Self {
id,
r#type,
stream_id,
stream_seq,
data,
Expand All @@ -30,6 +33,10 @@ impl Event {
self.id
}

pub fn r#type(&self) -> &EventType {
&self.r#type
}

pub fn stream_id(&self) -> EventStreamId {
self.stream_id
}
Expand All @@ -50,11 +57,13 @@ mod tests {
#[test]
fn test() -> anyhow::Result<()> {
let id = EventId::generate();
let r#type = EventType::try_from("created".to_owned())?;
let stream_id = EventStreamId::generate();
let stream_seq = EventStreamSeq::from(1_u32);
let data = EventData::try_from(String::from("123"))?;
let event = Event::new(id, stream_id, stream_seq, data.clone());
let event = Event::new(id, r#type.clone(), stream_id, stream_seq, data.clone());
assert_eq!(event.id(), id);
assert_eq!(event.r#type(), &r#type);
assert_eq!(event.stream_id(), stream_id);
assert_eq!(event.stream_seq(), stream_seq);
assert_eq!(event.data(), &data);
Expand Down
38 changes: 29 additions & 9 deletions twiq/crates/event_store_core/src/event_stream.rs
@@ -1,4 +1,4 @@
use crate::{Event, EventData, EventId, EventStreamId, EventStreamSeq};
use crate::{event_type::EventType, Event, EventData, EventId, EventStreamId, EventStreamSeq};

#[derive(Debug, Eq, PartialEq, thiserror::Error)]
pub enum Error {
Expand All @@ -20,10 +20,11 @@ pub struct EventStream {
}

impl EventStream {
pub fn generate(event_data: EventData) -> Self {
pub fn generate(event_type: EventType, event_data: EventData) -> Self {
Self {
events: vec![Event::new(
EventId::generate(),
event_type,
EventStreamId::generate(),
EventStreamSeq::from(1_u32),
event_data,
Expand Down Expand Up @@ -68,9 +69,10 @@ impl EventStream {
self.events.clone()
}

pub fn push(&mut self, event_data: EventData) -> Result<()> {
pub fn push(&mut self, event_type: EventType, event_data: EventData) -> Result<()> {
self.events.push(Event::new(
EventId::generate(),
event_type,
self.id(),
self.seq()
.next()
Expand Down Expand Up @@ -102,7 +104,8 @@ mod tests {

#[test]
fn generate_test() -> anyhow::Result<()> {
let stream = EventStream::generate(EventData::from_str("{}")?);
let stream =
EventStream::generate(EventType::from_str("created")?, EventData::from_str("{}")?);
assert_eq!(stream.seq(), EventStreamSeq::from(1_u32));
Ok(())
}
Expand All @@ -117,6 +120,7 @@ mod tests {
fn new_test_an_event() -> anyhow::Result<()> {
let event = Event::new(
EventId::generate(),
EventType::from_str("created")?,
EventStreamId::generate(),
EventStreamSeq::from(1_u32),
EventData::from_str("{}")?,
Expand All @@ -132,12 +136,14 @@ mod tests {
fn new_test_invalid_event_stream_id() -> anyhow::Result<()> {
let event1 = Event::new(
EventId::generate(),
EventType::from_str("created")?,
EventStreamId::generate(),
EventStreamSeq::from(1_u32),
EventData::from_str("{}")?,
);
let event2 = Event::new(
EventId::generate(),
EventType::from_str("updated")?,
EventStreamId::generate(),
EventStreamSeq::from(2_u32),
EventData::from_str("{}")?,
Expand All @@ -151,12 +157,14 @@ mod tests {
let stream_id = EventStreamId::generate();
let event1 = Event::new(
EventId::generate(),
EventType::from_str("created")?,
stream_id,
EventStreamSeq::from(1_u32),
EventData::from_str("{}")?,
);
let event2 = Event::new(
EventId::generate(),
EventType::from_str("updated")?,
stream_id,
EventStreamSeq::from(1_u32),
EventData::from_str("{}")?,
Expand All @@ -170,12 +178,14 @@ mod tests {
let stream_id = EventStreamId::generate();
let event1 = Event::new(
EventId::generate(),
EventType::from_str("created")?,
stream_id,
EventStreamSeq::from(1_u32),
EventData::from_str("{}")?,
);
let event2 = Event::new(
EventId::generate(),
EventType::from_str("updated")?,
stream_id,
EventStreamSeq::from(2_u32),
EventData::from_str("{}")?,
Expand All @@ -189,17 +199,23 @@ mod tests {

#[test]
fn push_test() -> anyhow::Result<()> {
let mut stream = EventStream::generate(EventData::from_str("{}")?);
stream.push(EventData::from_str(r#"{"key":123}"#)?)?;
let mut stream =
EventStream::generate(EventType::from_str("created")?, EventData::from_str("{}")?);
stream.push(
EventType::from_str("updated")?,
EventData::from_str(r#"{"key":123}"#)?,
)?;
assert_eq!(stream.seq(), EventStreamSeq::from(2_u32));
Ok(())
}

#[test]
fn push_event_test() -> anyhow::Result<()> {
let mut stream = EventStream::generate(EventData::from_str("{}")?);
let mut stream =
EventStream::generate(EventType::from_str("created")?, EventData::from_str("{}")?);
stream.push_event(Event::new(
EventId::generate(),
EventType::from_str("updated")?,
stream.id(),
EventStreamSeq::from(2_u32),
EventData::from_str(r#"{"key":123}"#)?,
Expand All @@ -210,10 +226,12 @@ mod tests {

#[test]
fn push_event_test_invalid_event_stream_id() -> anyhow::Result<()> {
let mut stream = EventStream::generate(EventData::from_str("{}")?);
let mut stream =
EventStream::generate(EventType::from_str("created")?, EventData::from_str("{}")?);
assert!(stream
.push_event(Event::new(
EventId::generate(),
EventType::from_str("updated")?,
EventStreamId::generate(),
EventStreamSeq::from(2_u32),
EventData::from_str(r#"{"key":123}"#)?,
Expand All @@ -224,10 +242,12 @@ mod tests {

#[test]
fn push_event_test_invalid_event_stream_seq() -> anyhow::Result<()> {
let mut stream = EventStream::generate(EventData::from_str("{}")?);
let mut stream =
EventStream::generate(EventType::from_str("created")?, EventData::from_str("{}")?);
assert!(stream
.push_event(Event::new(
EventId::generate(),
EventType::from_str("updated")?,
stream.id(),
EventStreamSeq::from(1_u32),
EventData::from_str(r#"{"key":123}"#)?,
Expand Down
1 change: 1 addition & 0 deletions twiq/crates/event_store_core/src/lib.rs
Expand Up @@ -13,3 +13,4 @@ pub use self::event_data::EventData;
pub use self::event_id::EventId;
pub use self::event_stream_id::EventStreamId;
pub use self::event_stream_seq::EventStreamSeq;
pub use self::event_type::EventType;

0 comments on commit 52ec460

Please sign in to comment.