Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make the ping queue use a more compact data structure #16

Merged
merged 2 commits into from Aug 14, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
251 changes: 130 additions & 121 deletions main.cpp
Expand Up @@ -390,19 +390,6 @@ struct bound_socket
char packet[1500];
};

// this is the type of each node queued up
// to be pinged at some point in the future
struct queued_node_t
{
udp::endpoint ep;
node_id_type node_id;
// the socket this node was seen on
bound_socket* incoming_socket;

// the time when this node should be pinged
time_point expire;
};

template <typename T, typename K>
void erase_one(T& container, K const& key)
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this down a bit and stripped out fields that aren't used externally

Expand Down Expand Up @@ -442,97 +429,110 @@ struct ip_set
std::unordered_set<decltype(extract_key(std::declval<Address>()))> m_ips;
};

struct queued_node_t
{
udp::endpoint ep;
int sock_idx;
};

template <typename Address>
struct ping_queue_t
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while working on this increasingly more fields started having an IPv4 and IPv6 version, so making it a template made that a lot cleaner. I also moved in the queue entry type as an inner class

{
ping_queue_t()
: m_round_robin(0)
, m_queue_time(0) {}
ping_queue_t() {}

// asks the ping-queue if there is another node that
// should be pinged right now. Returns false if not.
bool need_ping(queued_node_t* out)
{
if (m_queue.empty()) return false;

time_point now = steady_clock::now();
if (m_queue.front().expire > now)
return false;
time_point const now = steady_clock::now();

*out = m_queue.front();
m_queue.pop_front();
if (out->ep.protocol() == udp::v4())
m_ip4s.erase(out->ep.address().to_v4());
else
m_ip6s.erase(out->ep.address().to_v6());
// this is the number of seconds since the queue was constructed. This is
// compared against the expiration times on the queue entries
int const time_out = duration_cast<std::chrono::seconds>(
now - m_created).count();

assert(out->ep.address() != address_v4::any());
if (m_queue.front().expire > time_out) return false;
auto ent = m_queue.front();
m_queue.pop_front();
m_ips.erase(Address(ent.addr));

time_point time_added = out->expire - minutes(15);
m_queue_time = duration_cast<std::chrono::minutes>(now - time_added).count();
out->ep = udp::endpoint(Address(ent.addr), ent.port);
out->sock_idx = ent.sock_idx;

return true;
}

void insert_node(udp::endpoint const& ep, char const* node_id
, bound_socket& incoming_socket)
void insert_node(Address const& addr, std::uint16_t const port, int const sock_idx)
{
assert(ep.address() != address_v4::any());

if (ep.protocol() == udp::v4()
&& m_ip4s.count(ep.address().to_v4()))
return;
else if (ep.protocol() == udp::v6()
&& m_ip6s.count(ep.address().to_v6()))
return;

// don't let the queue get too big
if (m_queue.size() > ping_queue_size) return;

// as the size approaches the limit, increasingly reject
// new nodes, to distribute nodes we ping more evenly
// over time
++m_round_robin;
m_round_robin &= 0xff;
if (m_round_robin < m_queue.size() * 256 / ping_queue_size)
return;
assert(addr != Address::any());

// prevent duplicate entries
if (m_ips.count(addr)) return;

// the number of nodes we allow in the queue before we start dropping
// nodes (to stay below the limi)
size_t const low_watermark = ping_queue_size / 2;

if (m_queue.size() > low_watermark) {
// as the size approaches the limit, increasingly reject
// new nodes, to distribute nodes we ping more evenly
// over time.
// we don't start dropping nodes until we exceed the low watermark, but
// once we do, we increase the drop rate the closer we get to the limit
++m_round_robin;
if (m_round_robin < (m_queue.size() - low_watermark) * 256 / (ping_queue_size - low_watermark))
return;
}

queued_node_t e;
e.ep = ep;
memcpy(e.node_id.data(), node_id, e.node_id.size());
e.incoming_socket = &incoming_socket;
// we primarily want to keep quality nodes in our list.
// in 10 minutes, any pin-hole the node may have had open to
// us is likely to have been closed. If the node responds
// in 10 minutes from now, it's likely to either have a full-cone
// in 15 minutes from now, it's likely to either have a full-cone
// NAT or not be NATed at all (which is the way we like our nodes).
// also, it still being up is a good predictor for it staying up
// longer as well.
e.expire = steady_clock::now() + minutes(15);
std::uint32_t const expire = duration_cast<seconds>(
steady_clock::now() + minutes(15) - m_created).count();

m_queue.push_back(e);
if (ep.protocol() == udp::v4())
m_ip4s.insert(ep.address().to_v4());
else
m_ip6s.insert(ep.address().to_v6());
m_queue.push_back({addr.to_bytes(), expire, std::uint16_t(sock_idx), port});
m_ips.insert(addr);
}

private:

// this is the type of each node queued up to be pinged at some point in the
// future
struct queue_entry_t
{
typename Address::bytes_type addr;

// expiration in seconds (relative to the ping_queue creation time)
std::uint32_t expire:30;
Copy link
Contributor Author

@arvidn arvidn Aug 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose technically there will always be 2 bytes padding here, so the bitfield may be a bit redundant

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as-is the bitfield is just generating slower code for no gain.


// the index of the socket this node was seen on, and should be pinged
// back via
std::uint16_t sock_idx;

std::uint16_t port;
};

// the set of IPs in the queue. An IP is only allowed to appear once
ip_set<address_v4> m_ip4s;
ip_set<address_v6> m_ip6s;
ip_set<Address> m_ips;

// the queue of nodes we should ping, ordered by the
// time they were added
// TODO: it would be nice to keep the ping queue in a memory mapped file as
// well
std::deque<queued_node_t> m_queue;
std::deque<queue_entry_t> m_queue;

int m_round_robin;
steady_clock::time_point const m_created = steady_clock::now();

// the number of seconds nodes stay in the queue
// before being pinged
int m_queue_time;
// this is a wrapping counter used to determine the probability of dropping
// this node when the queue is under pressure. It's deliberately meant to
// wrap at 256.
std::uint8_t m_round_robin = 0;
};

// this is the type of each node entry
Expand Down Expand Up @@ -602,7 +602,7 @@ struct node_buffer_t
return ret;
}

void insert_node(address_type const& addr, uint16_t port, char const* node_id)
void insert_node(address_type const& addr, uint16_t const port, char const* node_id)
{
node_entry_type e;
e.ip = addr.to_bytes();
Expand Down Expand Up @@ -838,6 +838,54 @@ struct router_thread
ios.stop();
}

void send_ping(queued_node_t const& n)
{
//fprintf(stderr, "pinging node\n");
char remote_ip[18];
size_t remote_ip_len = build_ip_field(n.ep, remote_ip);
std::string const transaction_id = compute_tid(secret1, remote_ip, remote_ip_len);
bound_socket& sock = socks[n.sock_idx];

// send the ping to this node
bencoder b(response, sizeof(response));
b.open_dict();

b.add_string("ip"); b.add_string(remote_ip, remote_ip_len);

// args dict
b.add_string("a");
b.open_dict();
b.add_string("id"); b.add_string(sock.node_id.data(), sock.node_id.size());
b.close_dict();

b.add_string("t"); b.add_string(transaction_id);
b.add_string("q"); b.add_string("ping");

if (version[0] != 0)
{
b.add_string("v");
b.add_string(version.data(), 4);
}

b.add_string("y"); b.add_string("q");

b.close_dict();

error_code ec;
int const len = sock.sock.send_to(buffer(response, b.end() - response), n.ep, 0, ec);
if (ec) {
fprintf(stderr, "PING send_to failed: (%d) %s (%s:%d)\n"
, ec.value(), ec.message().c_str()
, n.ep.address().to_string(ec).c_str(), n.ep.port());
}
else if (len <= 0) {
fprintf(stderr, "PING send_to failed: return=%d\n", len);
}
else {
++outgoing_pings;
}
}

void start()
{
printf("starting thread %d\n", threadid);
Expand All @@ -850,63 +898,20 @@ struct router_thread

for (;;)
{
error_code ec;

#ifdef DEBUG_STATS
nodebuf_size[threadid] = node_buffer.size();
#endif

// if we need to ping nodes, do that now
queued_node_t n;
while (ping_queue.need_ping(&n))
{
// fprintf(stderr, "pinging node\n");
char remote_ip[18];
size_t const remote_ip_len = build_ip_field(n.ep, remote_ip);
std::string transaction_id = compute_tid(secret1, remote_ip, remote_ip_len);

// send the ping to this node
bencoder b(response, sizeof(response));
b.open_dict();
while (ping_queue4.need_ping(&n))
send_ping(n);

b.add_string("ip"); b.add_string(remote_ip, remote_ip_len);

// args dict
b.add_string("a");
b.open_dict();
b.add_string("id"); b.add_string(n.incoming_socket->node_id.data()
, n.incoming_socket->node_id.size());
b.close_dict();

b.add_string("t"); b.add_string(transaction_id);
b.add_string("q"); b.add_string("ping");

if (version[0] != 0)
{
b.add_string("v");
b.add_string(version.data(), 4);
}

b.add_string("y"); b.add_string("q");

b.close_dict();

int len = n.incoming_socket->sock.send_to(buffer(response, b.end() - response), n.ep, 0, ec);
if (ec) {
fprintf(stderr, "PING send_to failed: (%d) %s (%s:%d)\n"
, ec.value(), ec.message().c_str()
, n.ep.address().to_string(ec).c_str(), n.ep.port());
}
else if (len <= 0) {
fprintf(stderr, "PING send_to failed: return=%d\n", len);
}
else {
++outgoing_pings;
}
}

size_t executed = ios.run_one(ec);
while (ping_queue6.need_ping(&n))
send_ping(n);

error_code ec;
size_t const executed = ios.run_one(ec);
if (ec)
{
fprintf(stderr, "error in io service: (%d) %s\n", ec.value(), ec.message().c_str());
Expand Down Expand Up @@ -1190,6 +1195,9 @@ struct router_thread
e.port = htons(ep.port());
std::memcpy(e.node_id.data(), node_id.string_ptr(), e.node_id.size());
last_nodes4.push_back(e);

// ping this node later, we may want to add it to our node buffer
ping_queue4.insert_node(ep.address().to_v4(), ep.port(), &sock - &socks[0]);
}
else if (!is_v4 &&
(last_nodes6.empty() || last_nodes6.back().ip != ep.address().to_v6().to_bytes()))
Expand All @@ -1199,17 +1207,18 @@ struct router_thread
e.port = htons(ep.port());
std::memcpy(e.node_id.data(), node_id.string_ptr(), e.node_id.size());
last_nodes6.push_back(e);
}

// ping this node later, we may want to add it to our node buffer
ping_queue.insert_node(ep, node_id.string_ptr(), sock);
// ping this node later, we may want to add it to our node buffer
ping_queue6.insert_node(ep.address().to_v6(), ep.port(), &sock - &socks[0]);
}
}
}

io_service ios;
std::vector<bound_socket> socks;

ping_queue_t ping_queue;
ping_queue_t<address_v4> ping_queue4;
ping_queue_t<address_v6> ping_queue6;
node_buffer_v4 node_buffer4;
node_buffer_v6 node_buffer6;

Expand Down