3131import json
3232import logging
3333import os
34+ import requests
3435import sys
3536import time
3637from binascii import hexlify , unhexlify
38+ from collections import Counter
3739from ConfigParser import ConfigParser
3840
3941from utils import new_redis_conn
4244CONF = {}
4345
4446
45- def get_row ( node ):
47+ class Export ( object ):
4648 """
47- Returns enumerated row data from Redis for the specified node.
49+ Exports nodes into timestamp-prefixed JSON file and sets consensus height
50+ using the most common height from these nodes.
4851 """
49- # address, port, version, user_agent, timestamp, services
50- node = eval (node )
51- address = node [0 ]
52- port = node [1 ]
53- services = node [- 1 ]
54-
55- height = REDIS_CONN .get ('height:{}-{}-{}' .format (address , port , services ))
56- if height is None :
57- height = (0 ,)
58- else :
59- height = (int (height ),)
60-
61- hostname = REDIS_CONN .hget ('resolve:{}' .format (address ), 'hostname' )
62- hostname = (hostname ,)
63-
64- geoip = REDIS_CONN .hget ('resolve:{}' .format (address ), 'geoip' )
65- if geoip is None :
66- # city, country, latitude, longitude, timezone, asn, org
67- geoip = (None , None , 0.0 , 0.0 , None , None , None )
68- else :
69- geoip = eval (geoip )
70-
71- return node + height + hostname + geoip
72-
73-
74- def export_nodes (nodes , timestamp ):
75- """
76- Merges enumerated data for the specified nodes and exports them into
77- timestamp-prefixed JSON file.
78- """
79- rows = []
80- start = time .time ()
81- for node in nodes :
82- row = get_row (node )
83- rows .append (row )
84- end = time .time ()
85- elapsed = end - start
86- logging .info ("Elapsed: %d" , elapsed )
87-
88- dump = os .path .join (CONF ['export_dir' ], "{}.json" .format (timestamp ))
89- open (dump , 'w' ).write (json .dumps (rows , encoding = "latin-1" ))
90- logging .info ("Wrote %s" , dump )
52+ def __init__ (self , timestamp , nodes ):
53+ self .start_t = time .time ()
54+ self .timestamp = timestamp
55+ self .nodes = nodes
56+ self .heights = self .get_heights ()
57+
58+ def export_nodes (self ):
59+ """
60+ Merges enumerated data for the nodes and exports them into
61+ timestamp-prefixed JSON file and then sets consensus height in Redis
62+ using the most common height from these nodes.
63+ """
64+ rows = []
65+ for node in self .nodes :
66+ row = self .get_row (node )
67+ rows .append (row )
68+
69+ if self .heights :
70+ height = Counter (self .heights .values ()).most_common (1 )[0 ][0 ]
71+ logging .info ("Consensus height: %s" , height )
72+ REDIS_CONN .set ('height' , height )
73+
74+ dump = os .path .join (
75+ CONF ['export_dir' ], "{}.json" .format (self .timestamp ))
76+ open (dump , 'w' ).write (json .dumps (rows , encoding = "latin-1" ))
77+ logging .info ("Wrote %s" , dump )
78+
79+ logging .info ("Elapsed: %d" , time .time () - self .start_t )
80+
81+ def get_row (self , node ):
82+ """
83+ Returns enumerated row data from Redis for the specified node.
84+ """
85+ # address, port, version, user_agent, timestamp, services
86+ node = eval (node )
87+ address = node [0 ]
88+ port = node [1 ]
89+ services = node [- 1 ]
90+
91+ n = '{}-{}' .format (address , port )
92+ if n in self .heights :
93+ # Height from received block inv message in ping.py.
94+ height = (self .heights [n ],)
95+ else :
96+ # Height from handshake in crawl.py.
97+ height = REDIS_CONN .get (
98+ 'height:{}-{}-{}' .format (address , port , services ))
99+ if height is None :
100+ height = (0 ,)
101+ else :
102+ height = (int (height ),)
103+ logging .debug ("Using handshake height %s: %s" , node , height )
104+
105+ hostname = REDIS_CONN .hget ('resolve:{}' .format (address ), 'hostname' )
106+ hostname = (hostname ,)
107+
108+ geoip = REDIS_CONN .hget ('resolve:{}' .format (address ), 'geoip' )
109+ if geoip is None :
110+ # city, country, latitude, longitude, timezone, asn, org
111+ geoip = (None , None , 0.0 , 0.0 , None , None , None )
112+ else :
113+ geoip = eval (geoip )
114+
115+ return node + height + hostname + geoip
116+
117+ def get_heights (self ):
118+ """
119+ Returns the latest heights based on received block inv messages.
120+ """
121+ heights = {}
122+ recent_blocks = []
123+ timestamp_ms = self .timestamp * 1000
124+
125+ try :
126+ response = requests .get (CONF ['block_heights_url' ], timeout = 15 )
127+ except requests .exceptions .RequestException as err :
128+ logging .warning (err )
129+ else :
130+ if response .status_code == 200 :
131+ recent_blocks = response .json ()['blocks' ]
132+
133+ for block in recent_blocks :
134+ block_height , block_time , block_hash = block
135+ if block_time > self .timestamp :
136+ continue
137+
138+ key = "binv:{}" .format (block_hash )
139+ # [('ADDRESS-PORT', EPOCH_MS),..]
140+ nodes = REDIS_CONN .zrangebyscore (
141+ key , '-inf' , '+inf' , withscores = True , score_cast_func = int )
142+ for node in nodes :
143+ n , t = node
144+ if n not in heights and t <= timestamp_ms :
145+ heights [n ] = block_height
146+
147+ logging .info ("Heights: %d" , len (heights ))
148+ return heights
91149
92150
93151def init_conf (argv ):
@@ -101,6 +159,7 @@ def init_conf(argv):
101159 CONF ['db' ] = conf .getint ('export' , 'db' )
102160 CONF ['debug' ] = conf .getboolean ('export' , 'debug' )
103161 CONF ['export_dir' ] = conf .get ('export' , 'export_dir' )
162+ CONF ['block_heights_url' ] = conf .get ('export' , 'block_heights_url' )
104163 if not os .path .exists (CONF ['export_dir' ]):
105164 os .makedirs (CONF ['export_dir' ])
106165
@@ -146,7 +205,8 @@ def main(argv):
146205 logging .info ("Timestamp: %d" , timestamp )
147206 nodes = REDIS_CONN .smembers ('opendata' )
148207 logging .info ("Nodes: %d" , len (nodes ))
149- export_nodes (nodes , timestamp )
208+ export = Export (timestamp , nodes )
209+ export .export_nodes ()
150210 REDIS_CONN .publish (publish_key , timestamp )
151211
152212 return 0
0 commit comments