/
event_log.rs
132 lines (112 loc) · 4.11 KB
/
event_log.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use crate::persistence::{self, Connection};
use anyhow::{format_err, Result};
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{convert::TryFrom, sync::Arc, time::Duration};
mod in_memory;
pub use self::in_memory::*;
mod util {
use parking_lot::{Condvar, Mutex, RwLockReadGuard};
use std::time::Duration;
#[derive(Default)]
// https://github.com/Amanieu/parking_lot/issues/165#issuecomment-515991706
pub struct CondvarAny {
c: Condvar,
m: Mutex<()>,
}
impl CondvarAny {
pub fn wait<T>(&self, g: &mut RwLockReadGuard<'_, T>) {
let guard = self.m.lock();
RwLockReadGuard::unlocked(g, || {
// Move the guard in so it gets unlocked before we re-lock g
let mut guard = guard;
self.c.wait(&mut guard);
});
}
pub fn wait_for<T>(&self, g: &mut RwLockReadGuard<'_, T>, timeout: Duration) {
let guard = self.m.lock();
RwLockReadGuard::unlocked(g, || {
// Move the guard in so it gets unlocked before we re-lock g
let mut guard = guard;
self.c.wait_for(&mut guard, timeout);
});
}
pub fn notify_all(&self) -> usize {
self.c.notify_all()
}
}
}
use crate::service::{auction_house, bidding_engine, ui};
pub type Offset = u64;
// TODO: This type makes everything cyclical:
// All services depend on it, and it depends
// on events of each of the services. Not a
// big deal for this small program, but something
// to take care of in a more realistic implementation.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EventDetails {
AuctionHouse(auction_house::Event),
BiddingEngine(bidding_engine::Event),
Ui(ui::Event),
#[cfg(test)]
Test,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Event {
pub offset: Offset,
pub details: EventDetails,
}
pub trait Reader {
type Persistence: persistence::Persistence;
fn get_start_offset(&self) -> Result<Offset>;
fn read_tr<'a>(
&self,
conn: &mut <<<Self as Reader>::Persistence as persistence::Persistence>::Connection as persistence::Connection>::Transaction<'a>,
offset: Offset,
limit: usize,
timeout: Option<Duration>,
) -> Result<(Offset, Vec<Event>)>;
fn read<'a>(
&self,
conn: &mut <<Self as Reader>::Persistence as persistence::Persistence>::Connection,
offset: Offset,
limit: usize,
timeout: Option<Duration>,
) -> Result<(Offset, Vec<Event>)> {
self.read_tr(&mut conn.start_transaction()?, offset, limit, timeout)
}
fn read_one_tr<'a>(
&self,
conn: &mut <<<Self as Reader>::Persistence as persistence::Persistence>::Connection as persistence::Connection>::Transaction<'a>,
offset: Offset,
) -> Result<(Offset, Option<Event>)> {
let (offset, v) = self.read_tr(conn, offset, 1, Some(Duration::from_millis(0)))?;
assert!(v.len() <= 1);
Ok((offset, v.into_iter().next()))
}
fn read_one<'a>(
&self,
conn: &mut <<Self as Reader>::Persistence as persistence::Persistence>::Connection,
offset: Offset,
) -> Result<(Offset, Option<Event>)> {
let (offset, v) = self.read(conn, offset, 1, Some(Duration::from_millis(0)))?;
assert!(v.len() <= 1);
Ok((offset, v.into_iter().next()))
}
}
pub trait Writer {
type Persistence: persistence::Persistence;
fn write(
&self,
conn: &mut <<Self as Writer>::Persistence as persistence::Persistence>::Connection,
events: &[EventDetails],
) -> Result<Offset> {
self.write_tr(&mut conn.start_transaction()?, events)
}
fn write_tr<'a>(
&self,
conn: &mut <<<Self as Writer>::Persistence as persistence::Persistence>::Connection as persistence::Connection>::Transaction<'a>,
events: &[EventDetails],
) -> Result<Offset>;
}
pub type SharedReader<P> = Arc<dyn Reader<Persistence = P> + Sync + Send + 'static>;
pub type SharedWriter<P> = Arc<dyn Writer<Persistence = P> + Sync + Send + 'static>;