forked from mit-pdos/noria
-
Notifications
You must be signed in to change notification settings - Fork 1
/
vote-server.rs
153 lines (132 loc) · 4.45 KB
/
vote-server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#[macro_use]
extern crate clap;
extern crate distributary;
extern crate gulaschkanone;
#[macro_use]
extern crate slog;
mod graph;
use distributary::{srv, Blender};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::{thread, time};
fn main() {
use clap::{App, Arg};
let args = App::new("vote")
.version("0.1")
.about(
"Benchmarks user-curated news aggregator throughput for different storage \
backends.",
)
.arg(
Arg::with_name("ADDR")
.index(1)
.help("Address and port to listen on")
.required(true),
)
.arg(
Arg::with_name("distributed")
.long("distributed")
.requires("NUM_WORKERS")
.takes_value(false)
.help("Run in distributed mode."),
)
.arg(
Arg::with_name("durability")
.long("durability")
.takes_value(false)
.help("Enable durability for Base nodes"),
)
.arg(
Arg::with_name("sharded")
.long("sharded")
.takes_value(false)
.help("Enable sharding of the graph."),
)
.arg(
Arg::with_name("NUM_WORKERS")
.long("workers")
.requires("distributed")
.takes_value(true)
.help(
"Number of workers to expect. Once this many workers are present, \
data-flow graph is set up.",
),
)
.get_matches();
let addr = args.value_of("ADDR").unwrap();
let durability = if args.is_present("durability") {
distributary::DurabilityMode::DeleteOnExit
} else {
distributary::DurabilityMode::MemoryOnly
};
println!("Attempting to start soup on {}", addr);
let persistence_params = distributary::PersistenceParameters::new(
durability,
512,
time::Duration::from_millis(1),
Some(String::from("vote")),
);
let sock_addr: SocketAddr = addr.parse()
.expect("ADDR must be a valid HOST:PORT combination");
let blender = Arc::new(Mutex::new(Blender::with_listen(sock_addr.ip())));
let jh = if args.is_present("distributed") {
use gulaschkanone::{Config, Controller};
let num_workers_expected = value_t_or_exit!(args, "NUM_WORKERS", usize);
let config = Config {
hostname: String::from("localhost"),
addr: sock_addr.ip().to_string(),
port: 9999,
controller: None, // we are the controller
heartbeat_freq: 1000, // 1s
healthcheck_freq: 10000, // 10s
};
let log = distributary::logger_pls();
let mut controller = Controller::new(
blender.clone(),
&config.addr,
config.port,
time::Duration::from_millis(config.heartbeat_freq),
time::Duration::from_millis(config.healthcheck_freq),
log.clone(),
);
// run controller in the background
let builder = thread::Builder::new().name("gulaschkanone-ctrl".into());
let jh = builder
.spawn(move || {
controller.listen();
})
.unwrap();
// wait for a worker to connect
info!(
log,
"waiting for {} workers to connect...",
num_workers_expected
);
let mut wc = 0;
while wc < num_workers_expected {
// need this nesting so that we don't hold the lock for too long (worker
// registration needs it!)
{
let blender = blender.lock().unwrap();
wc = blender.worker_count();
}
thread::sleep(time::Duration::from_millis(1000));
}
info!(log, "workers are here; let's get going!");
Some(jh)
} else {
None
};
// scoped needed to ensure lock is released
let mut s = graph::Setup::default().with_logging();
s.sharding = args.is_present("sharded");
let g = graph::make(blender.clone(), s, persistence_params);
// start processing
// TODO: what about the node indices?
srv::run(g.graph, addr.to_socket_addrs().unwrap().next().unwrap());
if jh.is_some() {
jh.unwrap()
.join()
.expect("failed to join controller thread");
}
}