-
Notifications
You must be signed in to change notification settings - Fork 0
/
collector.rs
87 lines (55 loc) · 1.92 KB
/
collector.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#[tokio::main]
async fn main() {
let (colletor_sender, collector_recv) = channel::<i32>(500);
// fill buffer until buffer become full or timeout happen
let timeout = Duration::from_secs(1);
let producer_factory = || Collector::new(collector_recv, timeout);
let producer_concurrency = 1;
let producer_router = RouterType::RoundRobin;
let producer_buffer_pool = 100;
let proc_factory = || Layer1Process;
let proc_concurrency = 3;
let proc_buffer_size = 10;
// 1. create X processor instances by 'proc_concurrency'
//
// 2. create X producer instances by 'producer_concurrency'
//
// 3. create topology and syncing
//
// processor-1
// /
// producer-1 \ /
// producer-2 -> Collector -> ----- processor-2
// producer-x / \
// \
// processor-x
//
let safe_shutdown =
run_topology_1(
producer_factory,
producer_concurrency,
producer_router,
producer_buffer_pool,
proc_factory,
proc_concurrency,
proc_buffer_size,
);
// Burn Collector
for elem in 0..500 {
let _ = colletor_sender.send(elem).await;
}
// Safe Shutdown from (Producer) to (Layer_X_Processor)
safe_shutdown.send(());
}
struct Layer1Process;
#[async_trait]
impl Processor<i32, ()> for Layer1Process {
async fn init(&mut self) {}
async fn terminate(&mut self) {}
async fn handle_message(&mut self, msg: i32) -> ProcResult<()> {
// print
println!("==> {}", msg);
// return
ProcResult::Continue
}
}