Skip to content

Commit

Permalink
Refresh version information in keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeowch committed Jan 25, 2022
1 parent 5fa4ac3 commit acb1bf6
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 36 deletions.
5 changes: 4 additions & 1 deletion conf/ping.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ ipv6_prefix = 64
nodes_per_ipv6_prefix = 1

# Redis TTL for cached RTT
ttl = 10800
rtt_ttl = 10800

# Check version every given interval
version_delay = 86400

# Attempt to establish connection with .onion nodes
onion = True
Expand Down
14 changes: 12 additions & 2 deletions crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,24 @@ def connect(redis_conn, key):
addr_msgs = msgs
break

version = version_msg.get('version', "")
user_agent = version_msg.get('user_agent', "")
from_services = version_msg.get('services', 0)
height = version_msg.get('height', 0)

if from_services != services:
logging.debug("%s Expected %d, got %d for services", conn.to_addr,
services, from_services)
key = "node:{}-{}-{}".format(address, port, from_services)

height_key = "height:{}-{}-{}".format(address, port, from_services)
redis_pipe.setex(height_key, CONF['max_age'],
version_msg.get('height', 0))
redis_pipe.setex(height_key, CONF['max_age'], height)

version_key = "version:{}-{}".format(address, port)
redis_pipe.setex(version_key,
CONF['max_age'],
(version, user_agent, from_services))

now = int(time.time())
(peers, excluded) = enumerate_node(redis_pipe, addr_msgs, now)
logging.debug("%s Peers: %d (Excluded: %d)",
Expand Down
122 changes: 89 additions & 33 deletions ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,65 +62,120 @@ class Keepalive(object):
def __init__(self, conn, version_msg):
self.conn = conn
self.node = conn.to_addr
self.version_msg = version_msg
self.last_ping = int(time.time())
self.keepalive_time = 60

self.start_time = int(time.time())
self.last_ping = self.start_time
self.last_version = self.start_time

self.ping_delay = 60
self.version_delay = CONF['version_delay']

version = version_msg.get('version', "")
user_agent = version_msg.get('user_agent', "")
services = version_msg.get('services', "")

# Open connections are tracked in open set with the associated data
# stored in opendata set in Redis.
self.data = self.node + (
version,
user_agent,
self.start_time,
services)
REDIS_CONN.sadd('opendata', self.data)

def keepalive(self):
"""
Periodically sends ping message.
Open connections are tracked in open set with the associated data
stored in opendata set in Redis.
Periodically sends ping message and refreshes version information.
"""
version = self.version_msg.get('version', "")
user_agent = self.version_msg.get('user_agent', "")
services = self.version_msg.get('services', "")
data = self.node + (version, user_agent, self.last_ping, services)
while True:
now = time.time()

REDIS_CONN.sadd('opendata', data)
if now > self.last_ping + self.ping_delay:
if not self.ping(now):
break

while True:
if time.time() > self.last_ping + self.keepalive_time:
try:
self.ping()
except socket.error as err:
logging.info("ping: Closing %s (%s)", self.node, err)
if now > self.last_version + self.version_delay:
if not self.version(now):
break

# Sink received messages to flush them off socket buffer
try:
self.conn.get_messages()
except socket.timeout:
pass
except (ProtocolError, ConnectionError, socket.error) as err:
logging.info("get_messages: Closing %s (%s)", self.node, err)
if not self.sink():
break

gevent.sleep(0.1)

REDIS_CONN.srem('opendata', data)
self.close()

def close(self):
REDIS_CONN.srem('opendata', self.data)
self.conn.close()

def ping(self):
def ping(self, now):
"""
Sends a ping message. Ping time is stored in Redis for round-trip time
(RTT) calculation.
"""
self.last_ping = now

nonce = random.getrandbits(64)
try:
self.conn.ping(nonce=nonce)
except socket.error:
raise
except socket.error as err:
logging.info("Closing %s (%s)", self.node, err)
return False
logging.debug("%s (%s)", self.node, nonce)

self.last_ping = time.time()
key = "ping:{}-{}:{}".format(self.node[0], self.node[1], nonce)
REDIS_CONN.lpush(key, int(self.last_ping * 1000)) # in ms
REDIS_CONN.expire(key, CONF['ttl'])
REDIS_CONN.expire(key, CONF['rtt_ttl'])

try:
self.keepalive_time = int(REDIS_CONN.get('elapsed'))
self.ping_delay = int(REDIS_CONN.get('elapsed'))
except TypeError:
pass

return True

def version(self, now):
"""
Refreshes version information using response from latest handshake.
"""
self.last_version = now

version_key = 'version:{}-{}'.format(self.node[0], self.node[1])
version_data = REDIS_CONN.get(version_key)

if version_data is None:
return True

version, user_agent, services = eval(version_data)
if all([version, user_agent, services]):
data = self.node + (
version,
user_agent,
self.start_time,
services)

if self.data != data:
REDIS_CONN.srem('opendata', self.data)
REDIS_CONN.sadd('opendata', data)
self.data = data

return True

def sink(self):
"""
Sinks received messages to flush them off socket buffer.
"""
try:
self.conn.get_messages()
except socket.timeout:
pass
except (ProtocolError, ConnectionError, socket.error) as err:
logging.info("Closing %s (%s)", self.node, err)
return False

return True


def task():
"""
Expand Down Expand Up @@ -191,7 +246,7 @@ def task():
REDIS_CONN.set('onion:{}'.format(local_port), conn.to_addr)

Keepalive(conn=conn, version_msg=version_msg).keepalive()
conn.close()

if cidr_key:
nodes = REDIS_CONN.decr(cidr_key)
logging.info("-CIDR %s: %d", cidr, nodes)
Expand Down Expand Up @@ -307,7 +362,8 @@ def init_conf(argv):
CONF['relay'] = conf.getint('ping', 'relay')
CONF['socket_timeout'] = conf.getint('ping', 'socket_timeout')
CONF['cron_delay'] = conf.getint('ping', 'cron_delay')
CONF['ttl'] = conf.getint('ping', 'ttl')
CONF['rtt_ttl'] = conf.getint('ping', 'rtt_ttl')
CONF['version_delay'] = conf.getint('ping', 'version_delay')
CONF['ipv6_prefix'] = conf.getint('ping', 'ipv6_prefix')
CONF['nodes_per_ipv6_prefix'] = conf.getint('ping',
'nodes_per_ipv6_prefix')
Expand Down

0 comments on commit acb1bf6

Please sign in to comment.