/
main.rs
69 lines (52 loc) · 2.02 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use std::thread;
use std::time::Duration;
use sonr::prelude::*;
use sonr::errors::Result;
use sonr::net::tcp::{TcpStream, ReactiveTcpListener};
use sonr::sync::broadcast::Broadcast;
use sonr::sync::queue::{ReactiveQueue, ReactiveDeque};
use pubsub::publisher::Publisher;
use pubsub::subscriber::Subscriber;
use pubsub::timer::Timer;
fn listener(addr: &str) -> Result<impl Reactor<Output=TcpStream>> {
let l = ReactiveTcpListener::bind(addr)?
.map(|(s, _)| s);
Ok(l)
}
fn main() -> Result<()> {
System::init()?;
let thread_count = 8;
let buffer_threshold = 256;
let publish_timeout = Duration::from_millis(20);
// Publisher
let pub_listener = listener("127.0.0.1:8000")?;
let broadcast = Broadcast::unbounded();
let mut timer = Timer::new(publish_timeout);
let mut pub_connection_queue = ReactiveQueue::unbounded();
// Subscriber
let sub_listener = listener("127.0.0.1:9000")?;
let mut sub_connection_queue = ReactiveQueue::unbounded();
for _ in 0..thread_count {
let broadcast = broadcast.clone();
let timer_notifier = timer.receiver();
let pub_deque = pub_connection_queue.deque();
let sub_deque = sub_connection_queue.deque();
thread::spawn(move || -> Result<()> {
System::init()?;
let sub_connection_deque = ReactiveDeque::new(sub_deque)?;
let subscriber = Subscriber::new(broadcast.subscriber())?;
let sub_run = sub_connection_deque.chain(subscriber);
let pub_connection_deque = ReactiveDeque::new(pub_deque)?;
let publisher = Publisher::new(broadcast, buffer_threshold, timer_notifier)?;
let pub_run = pub_connection_deque.chain(publisher);
let run = pub_run.and(sub_run);
System::start(run)?;
Ok(())
});
}
timer.start();
let pub_run = pub_listener.chain(pub_connection_queue);
let sub_run = sub_listener.chain(sub_connection_queue);
System::start(pub_run.and(sub_run))?;
Ok(())
}