forked from nanocurrency/nano-node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.hpp
508 lines (497 loc) · 18.7 KB
/
node.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
#pragma once
#include <rai/lib/work.hpp>
#include <rai/node/bootstrap.hpp>
#include <rai/node/logging.hpp>
#include <rai/node/nodeconfig.hpp>
#include <rai/node/peers.hpp>
#include <rai/node/portmapping.hpp>
#include <rai/node/stats.hpp>
#include <rai/node/voting.hpp>
#include <rai/node/wallet.hpp>
#include <rai/secure/ledger.hpp>
#include <condition_variable>
#include <boost/iostreams/device/array.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/thread/thread.hpp>
namespace rai
{
class node;
class election_status
{
public:
std::shared_ptr<rai::block> winner;
rai::amount tally;
};
class vote_info
{
public:
std::chrono::steady_clock::time_point time;
uint64_t sequence;
rai::block_hash hash;
};
class election_vote_result
{
public:
election_vote_result ();
election_vote_result (bool, bool);
bool replay;
bool processed;
};
class election : public std::enable_shared_from_this<rai::election>
{
std::function<void(std::shared_ptr<rai::block>)> confirmation_action;
void confirm_once (rai::transaction const &);
public:
election (rai::node &, std::shared_ptr<rai::block>, std::function<void(std::shared_ptr<rai::block>)> const &);
rai::election_vote_result vote (rai::account, uint64_t, rai::block_hash);
rai::tally_t tally (rai::transaction const &);
// Check if we have vote quorum
bool have_quorum (rai::tally_t const &, rai::uint128_t);
// Change our winner to agree with the network
void compute_rep_votes (rai::transaction const &);
// Confirm this block if quorum is met
void confirm_if_quorum (rai::transaction const &);
void log_votes (rai::tally_t const &);
bool publish (std::shared_ptr<rai::block> block_a);
void stop ();
rai::node & node;
std::unordered_map<rai::account, rai::vote_info> last_votes;
std::unordered_map<rai::account, std::shared_ptr<rai::vote>> our_last_votes;
std::unordered_map<rai::block_hash, std::shared_ptr<rai::block>> blocks;
rai::block_hash root;
rai::election_status status;
std::atomic<bool> confirmed;
bool stopped;
std::unordered_map<rai::block_hash, rai::uint128_t> last_tally;
unsigned announcements;
};
class conflict_info
{
public:
rai::block_hash root;
std::shared_ptr<rai::election> election;
std::pair<std::shared_ptr<rai::block>, std::shared_ptr<rai::block>> confirm_req_options;
};
// Core class for determining consensus
// Holds all active blocks i.e. recently added blocks that need confirmation
class active_transactions
{
public:
active_transactions (rai::node &);
~active_transactions ();
// Start an election for a block
// Call action with confirmed block, may be different than what we started with
bool start (std::shared_ptr<rai::block>, std::function<void(std::shared_ptr<rai::block>)> const & = [](std::shared_ptr<rai::block>) {});
// Also supply alternatives to block, to confirm_req reps with if the boolean argument is true
// Should only be used for old elections
// The first block should be the one in the ledger
bool start (std::pair<std::shared_ptr<rai::block>, std::shared_ptr<rai::block>>, std::function<void(std::shared_ptr<rai::block>)> const & = [](std::shared_ptr<rai::block>) {});
bool add (std::pair<std::shared_ptr<rai::block>, std::shared_ptr<rai::block>>, std::function<void(std::shared_ptr<rai::block>)> const & = [](std::shared_ptr<rai::block>) {});
// If this returns true, the vote is a replay
// If this returns false, the vote may or may not be a replay
bool vote (std::shared_ptr<rai::vote>, bool = false);
// Is the root of this block in the roots container
bool active (rai::block const &);
std::deque<std::shared_ptr<rai::block>> list_blocks (bool = false);
void erase (rai::block const &);
void stop ();
bool publish (std::shared_ptr<rai::block> block_a);
boost::multi_index_container<
rai::conflict_info,
boost::multi_index::indexed_by<
boost::multi_index::hashed_unique<boost::multi_index::member<rai::conflict_info, rai::block_hash, &rai::conflict_info::root>>>>
roots;
std::unordered_map<rai::block_hash, std::shared_ptr<rai::election>> successors;
std::deque<rai::election_status> confirmed;
rai::node & node;
std::mutex mutex;
// Maximum number of conflicts to vote on per interval, lowest root hash first
static unsigned constexpr announcements_per_interval = 32;
// Minimum number of block announcements
static unsigned constexpr announcement_min = 2;
// Threshold to start logging blocks haven't yet been confirmed
static unsigned constexpr announcement_long = 20;
static unsigned constexpr announce_interval_ms = (rai::rai_network == rai::rai_networks::rai_test_network) ? 10 : 16000;
static size_t constexpr election_history_size = 2048;
private:
void announce_loop ();
void announce_votes (std::unique_lock<std::mutex> &);
std::condition_variable condition;
bool started;
bool stopped;
boost::thread thread;
};
class operation
{
public:
bool operator> (rai::operation const &) const;
std::chrono::steady_clock::time_point wakeup;
std::function<void()> function;
};
class alarm
{
public:
alarm (boost::asio::io_service &);
~alarm ();
void add (std::chrono::steady_clock::time_point const &, std::function<void()> const &);
void run ();
boost::asio::io_service & service;
std::mutex mutex;
std::condition_variable condition;
std::priority_queue<operation, std::vector<operation>, std::greater<operation>> operations;
boost::thread thread;
};
class gap_information
{
public:
std::chrono::steady_clock::time_point arrival;
rai::block_hash hash;
std::unordered_set<rai::account> voters;
};
class gap_cache
{
public:
gap_cache (rai::node &);
void add (rai::transaction const &, std::shared_ptr<rai::block>);
void vote (std::shared_ptr<rai::vote>);
rai::uint128_t bootstrap_threshold (rai::transaction const &);
boost::multi_index_container<
rai::gap_information,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::member<gap_information, std::chrono::steady_clock::time_point, &gap_information::arrival>>,
boost::multi_index::hashed_unique<boost::multi_index::member<gap_information, rai::block_hash, &gap_information::hash>>>>
blocks;
size_t const max = 256;
std::mutex mutex;
rai::node & node;
};
class work_pool;
class send_info
{
public:
uint8_t const * data;
size_t size;
rai::endpoint endpoint;
std::function<void(boost::system::error_code const &, size_t)> callback;
};
class block_arrival_info
{
public:
std::chrono::steady_clock::time_point arrival;
rai::block_hash hash;
};
// This class tracks blocks that are probably live because they arrived in a UDP packet
// This gives a fairly reliable way to differentiate between blocks being inserted via bootstrap or new, live blocks.
class block_arrival
{
public:
// Return `true' to indicated an error if the block has already been inserted
bool add (rai::block_hash const &);
bool recent (rai::block_hash const &);
boost::multi_index_container<
rai::block_arrival_info,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::member<rai::block_arrival_info, std::chrono::steady_clock::time_point, &rai::block_arrival_info::arrival>>,
boost::multi_index::hashed_unique<boost::multi_index::member<rai::block_arrival_info, rai::block_hash, &rai::block_arrival_info::hash>>>>
arrival;
std::mutex mutex;
static size_t constexpr arrival_size_min = 8 * 1024;
static std::chrono::seconds constexpr arrival_time_min = std::chrono::seconds (300);
};
class rep_last_heard_info
{
public:
std::chrono::steady_clock::time_point last_heard;
rai::account representative;
};
class online_reps
{
public:
online_reps (rai::node &);
void vote (std::shared_ptr<rai::vote> const &);
void recalculate_stake ();
rai::uint128_t online_stake ();
rai::uint128_t online_stake_total;
std::vector<rai::account> list ();
boost::multi_index_container<
rai::rep_last_heard_info,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::member<rai::rep_last_heard_info, std::chrono::steady_clock::time_point, &rai::rep_last_heard_info::last_heard>>,
boost::multi_index::hashed_unique<boost::multi_index::member<rai::rep_last_heard_info, rai::account, &rai::rep_last_heard_info::representative>>>>
reps;
private:
std::mutex mutex;
rai::node & node;
};
class udp_data
{
public:
uint8_t * buffer;
size_t size;
rai::endpoint endpoint;
};
/**
* A circular buffer for servicing UDP datagrams. This container follows a producer/consumer model where the operating system is producing data in to buffers which are serviced by internal threads.
* If buffers are not serviced fast enough they're internally dropped.
* This container has a maximum space to hold N buffers of M size and will allocate them in round-robin order.
* All public methods are thread-safe
*/
class udp_buffer
{
public:
// Size - Size of each individual buffer
// Count - Number of buffers to allocate
// Stats - Statistics
udp_buffer (rai::stat & stats, size_t, size_t);
// Return a buffer where UDP data can be put
// Method will attempt to return the first free buffer
// If there are no free buffers, an unserviced buffer will be dequeued and returned
// Function will block if there are no free or unserviced buffers
// Return nullptr if the container has stopped
rai::udp_data * allocate ();
// Queue a buffer that has been filled with UDP data and notify servicing threads
void enqueue (rai::udp_data *);
// Return a buffer that has been filled with UDP data
// Function will block until a buffer has been added
// Return nullptr if the container has stopped
rai::udp_data * dequeue ();
// Return a buffer to the freelist after is has been serviced
void release (rai::udp_data *);
// Stop container and notify waiting threads
void stop ();
private:
rai::stat & stats;
std::mutex mutex;
std::condition_variable condition;
boost::circular_buffer<rai::udp_data *> free;
boost::circular_buffer<rai::udp_data *> full;
std::vector<uint8_t> slab;
std::vector<rai::udp_data> entries;
bool stopped;
};
class network
{
public:
network (rai::node &, uint16_t);
~network ();
void receive ();
void process_packets ();
void start ();
void stop ();
void receive_action (rai::udp_data *);
void rpc_action (boost::system::error_code const &, size_t);
void republish_vote (std::shared_ptr<rai::vote>);
void republish_block (std::shared_ptr<rai::block>);
static unsigned const broadcast_interval_ms = (rai::rai_network == rai::rai_networks::rai_test_network) ? 10 : 50;
void republish_block_batch (std::deque<std::shared_ptr<rai::block>>, unsigned = broadcast_interval_ms);
void republish (rai::block_hash const &, std::shared_ptr<std::vector<uint8_t>>, rai::endpoint);
void confirm_send (rai::confirm_ack const &, std::shared_ptr<std::vector<uint8_t>>, rai::endpoint const &);
void merge_peers (std::array<rai::endpoint, 8> const &);
void send_keepalive (rai::endpoint const &);
void send_node_id_handshake (rai::endpoint const &, boost::optional<rai::uint256_union> const & query, boost::optional<rai::uint256_union> const & respond_to);
void broadcast_confirm_req (std::shared_ptr<rai::block>);
void broadcast_confirm_req_base (std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>, unsigned, bool = false);
void broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<rai::block>, std::shared_ptr<std::vector<rai::peer_information>>>>, unsigned = broadcast_interval_ms);
void send_confirm_req (rai::endpoint const &, std::shared_ptr<rai::block>);
void send_buffer (uint8_t const *, size_t, rai::endpoint const &, std::function<void(boost::system::error_code const &, size_t)>);
rai::endpoint endpoint ();
rai::udp_buffer buffer_container;
boost::asio::ip::udp::socket socket;
std::mutex socket_mutex;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
rai::node & node;
bool on;
static uint16_t const node_port = rai::rai_network == rai::rai_networks::rai_live_network ? 7075 : 54000;
static size_t const buffer_size = 512;
};
class node_init
{
public:
node_init ();
bool error ();
bool block_store_init;
bool wallet_init;
};
class node_observers
{
public:
rai::observer_set<std::shared_ptr<rai::block>, rai::account const &, rai::uint128_t const &, bool> blocks;
rai::observer_set<bool> wallet;
rai::observer_set<rai::transaction const &, std::shared_ptr<rai::vote>, rai::endpoint const &> vote;
rai::observer_set<rai::account const &, bool> account_balance;
rai::observer_set<rai::endpoint const &> endpoint;
rai::observer_set<> disconnect;
rai::observer_set<> started;
};
class vote_processor
{
public:
vote_processor (rai::node &);
void vote (std::shared_ptr<rai::vote>, rai::endpoint);
// node.active.mutex lock required
rai::vote_code vote_blocking (rai::transaction const &, std::shared_ptr<rai::vote>, rai::endpoint, bool = false);
void verify_votes (std::deque<std::pair<std::shared_ptr<rai::vote>, rai::endpoint>> &);
void flush ();
rai::node & node;
void stop ();
private:
void process_loop ();
std::deque<std::pair<std::shared_ptr<rai::vote>, rai::endpoint>> votes;
std::condition_variable condition;
std::mutex mutex;
bool started;
bool stopped;
bool active;
boost::thread thread;
};
// The network is crawled for representatives by occasionally sending a unicast confirm_req for a specific block and watching to see if it's acknowledged with a vote.
class rep_crawler
{
public:
void add (rai::block_hash const &);
void remove (rai::block_hash const &);
bool exists (rai::block_hash const &);
std::mutex mutex;
std::unordered_set<rai::block_hash> active;
};
// Processing blocks is a potentially long IO operation
// This class isolates block insertion from other operations like servicing network operations
class block_processor
{
public:
block_processor (rai::node &);
~block_processor ();
void stop ();
void flush ();
bool full ();
void add (std::shared_ptr<rai::block>, std::chrono::steady_clock::time_point);
void force (std::shared_ptr<rai::block>);
bool should_log ();
bool have_blocks ();
void process_blocks ();
rai::process_return process_receive_one (rai::transaction const &, std::shared_ptr<rai::block>, std::chrono::steady_clock::time_point = std::chrono::steady_clock::now (), bool = false);
rai::vote_generator generator;
private:
void queue_unchecked (rai::transaction const &, rai::block_hash const &);
void process_receive_many (std::unique_lock<std::mutex> &);
void verify_state_blocks (std::unique_lock<std::mutex> &);
bool stopped;
bool active;
std::chrono::steady_clock::time_point next_log;
std::deque<std::pair<std::shared_ptr<rai::block>, std::chrono::steady_clock::time_point>> blocks;
std::deque<std::pair<std::shared_ptr<rai::block>, std::chrono::steady_clock::time_point>> state_blocks;
std::unordered_set<rai::block_hash> blocks_hashes;
std::deque<std::shared_ptr<rai::block>> forced;
std::condition_variable condition;
rai::node & node;
std::mutex mutex;
};
class node : public std::enable_shared_from_this<rai::node>
{
public:
node (rai::node_init &, boost::asio::io_service &, uint16_t, boost::filesystem::path const &, rai::alarm &, rai::logging const &, rai::work_pool &);
node (rai::node_init &, boost::asio::io_service &, boost::filesystem::path const &, rai::alarm &, rai::node_config const &, rai::work_pool &);
~node ();
template <typename T>
void background (T action_a)
{
alarm.service.post (action_a);
}
void send_keepalive (rai::endpoint const &);
bool copy_with_compaction (boost::filesystem::path const &);
void keepalive (std::string const &, uint16_t);
void start ();
void stop ();
std::shared_ptr<rai::node> shared ();
int store_version ();
void process_confirmed (std::shared_ptr<rai::block>);
void process_message (rai::message &, rai::endpoint const &);
void process_active (std::shared_ptr<rai::block>);
rai::process_return process (rai::block const &);
void keepalive_preconfigured (std::vector<std::string> const &);
rai::block_hash latest (rai::account const &);
rai::uint128_t balance (rai::account const &);
std::shared_ptr<rai::block> block (rai::block_hash const &);
std::pair<rai::uint128_t, rai::uint128_t> balance_pending (rai::account const &);
rai::uint128_t weight (rai::account const &);
rai::account representative (rai::account const &);
void ongoing_keepalive ();
void ongoing_syn_cookie_cleanup ();
void ongoing_rep_crawl ();
void ongoing_bootstrap ();
void ongoing_store_flush ();
void backup_wallet ();
void search_pending ();
int price (rai::uint128_t const &, int);
void work_generate_blocking (rai::block &);
uint64_t work_generate_blocking (rai::uint256_union const &);
void work_generate (rai::uint256_union const &, std::function<void(uint64_t)>);
void add_initial_peers ();
void block_confirm (std::shared_ptr<rai::block>);
void process_fork (rai::transaction const &, std::shared_ptr<rai::block>);
bool validate_block_by_previous (rai::transaction const &, std::shared_ptr<rai::block>);
rai::uint128_t delta ();
boost::asio::io_service & service;
rai::node_config config;
rai::alarm & alarm;
rai::work_pool & work;
boost::log::sources::logger_mt log;
std::unique_ptr<rai::block_store> store_impl;
rai::block_store & store;
rai::gap_cache gap_cache;
rai::ledger ledger;
rai::active_transactions active;
rai::network network;
rai::bootstrap_initiator bootstrap_initiator;
rai::bootstrap_listener bootstrap;
rai::peer_container peers;
boost::filesystem::path application_path;
rai::node_observers observers;
rai::wallets wallets;
rai::port_mapping port_mapping;
rai::vote_processor vote_processor;
rai::rep_crawler rep_crawler;
unsigned warmed_up;
rai::block_processor block_processor;
boost::thread block_processor_thread;
rai::block_arrival block_arrival;
rai::online_reps online_reps;
rai::stat stats;
rai::keypair node_id;
rai::block_uniquer block_uniquer;
rai::vote_uniquer vote_uniquer;
static double constexpr price_max = 16.0;
static double constexpr free_cutoff = 1024.0;
static std::chrono::seconds constexpr period = std::chrono::seconds (60);
static std::chrono::seconds constexpr cutoff = period * 5;
static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5);
static std::chrono::minutes constexpr backup_interval = std::chrono::minutes (5);
static std::chrono::seconds constexpr search_pending_interval = (rai::rai_network == rai::rai_networks::rai_test_network) ? std::chrono::seconds (1) : std::chrono::seconds (5 * 60);
};
class thread_runner
{
public:
thread_runner (boost::asio::io_service &, unsigned);
~thread_runner ();
void join ();
std::vector<boost::thread> threads;
};
class inactive_node
{
public:
inactive_node (boost::filesystem::path const & path = rai::working_path ());
~inactive_node ();
boost::filesystem::path path;
std::shared_ptr<boost::asio::io_service> service;
rai::alarm alarm;
rai::logging logging;
rai::node_init init;
rai::work_pool work;
std::shared_ptr<rai::node> node;
};
}