Skip to content

Commit

Permalink
Merge pull request #139 from CounterpartyXCP/develop
Browse files Browse the repository at this point in the history
1.3.1
  • Loading branch information
Robby Dermody committed Jan 24, 2016
2 parents afadf6a + f70c929 commit daf9629
Show file tree
Hide file tree
Showing 16 changed files with 127 additions and 85 deletions.
7 changes: 6 additions & 1 deletion ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
## Client Versions ##
## Changelog ##
* v1.3.1 (2016-01-24)
* Modify blockfeed logic to work with reorgs properly again with the undolog fix introduced in `counterparty-lib` 9.53.0.
* Enhance blockfeed error recovery logic to make more robust and minimize/remove chance of blockfeed hangs.
* `/_api` handler includes extra fields: `counterparty-server_caught_up` and improved reporting of error conditions.
* Code upgraded to work with pymongo >= 3.1
* v1.3.0 (2015-10-31)
* Fixes periodic `blockfeed` hanging issue (where `counterblock` would still run, but not process new blocks from `counterparty-server`)
* Block processing is much more robust now if an exception is encountered (e.g. counterparty-server goes down). Should prevent additional hanging-type issues
Expand Down
10 changes: 9 additions & 1 deletion counterblock/lib/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,17 @@ def getaddressinfo(address):
}

def gettransaction_batch(txhash_list):
raw_txes = util.call_jsonrpc_api("getrawtransaction_batch", {'txhash_list': txhash_list, 'verbose': True}, abort_on_error=True)['result']
raw_txes = util.call_jsonrpc_api("getrawtransaction_batch", {
'txhash_list': txhash_list,
'verbose': True,
'skip_missing': True}, abort_on_error=True)['result']
txes = {}

for tx_hash, tx in raw_txes.iteritems():
if tx is None:
txes[tx_hash] = None
continue

valueOut = 0
for vout in tx['vout']:
valueOut += vout['value']
Expand Down
61 changes: 38 additions & 23 deletions counterblock/lib/blockfeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def process_cp_blockfeed():
def publish_mempool_tx():
"""fetch new tx from mempool"""
tx_hashes = []
mempool_txs = config.mongo_db.mempool.find(fields={'tx_hash': True})
mempool_txs = config.mongo_db.mempool.find(projection={'tx_hash': True})
for mempool_tx in mempool_txs:
tx_hashes.append(str(mempool_tx['tx_hash']))

Expand Down Expand Up @@ -86,10 +86,14 @@ def publish_mempool_tx():
logger.debug('starting {} (mempool)'.format(function['function']))
# TODO: Better handling of double parsing
try:
cmd = function['function'](tx, json.loads(tx['bindings'])) or None
result = function['function'](tx, json.loads(tx['bindings'])) or None
except pymongo.errors.DuplicateKeyError, e:
logging.exception(e)
if cmd == 'continue': break
if result == 'ABORT_THIS_MESSAGE_PROCESSING' or result == 'continue':
break
elif result:
raise Exception("Message processor returned unknown code -- processor: '%s', result: '%s'" %
(function, result))

def clean_mempool_tx():
"""clean mempool transactions older than MAX_REORG_NUM_BLOCKS blocks"""
Expand All @@ -101,34 +105,40 @@ def parse_message(msg):
logger.debug("Received message %s: %s ..." % (msg['message_index'], msg))

#out of order messages should not happen (anymore), but just to be sure
assert msg['message_index'] == config.state['last_message_index'] + 1 or config.state['last_message_index'] == -1
if msg['message_index'] != config.state['last_message_index'] + 1 and config.state['last_message_index'] != -1:
raise Exception("Message index mismatch. Next message's message_index: %s, last_message_index: %s" % (
msg['message_index'], config.state['last_message_index']))

for function in MessageProcessor.active_functions():
logger.debug('starting {}'.format(function['function']))
logger.debug('MessageProcessor: starting {}'.format(function['function']))
# TODO: Better handling of double parsing
try:
cmd = function['function'](msg, msg_data) or None
result = function['function'](msg, msg_data) or None
except pymongo.errors.DuplicateKeyError, e:
logging.exception(e)
#break or *return* (?) depends on whether we want config.last_message_index to be updated
if cmd == 'continue': break
elif cmd == 'break': return 'break'


if result in ('ABORT_THIS_MESSAGE_PROCESSING', 'continue', #just abort further MessageProcessors for THIS message
'ABORT_BLOCK_PROCESSING'): #abort all further block processing, including that of all messages in the block
break
elif result not in (True, False, None):
raise Exception("Message processor returned unknown code -- processor: '%s', result: '%s'" %
(function, result))

config.state['last_message_index'] = msg['message_index']
return 'ABORT_BLOCK_PROCESSING' if result == 'ABORT_BLOCK_PROCESSING' else None

def parse_block(block_data):
config.state['cur_block'] = block_data
config.state['cur_block']['block_time_obj'] \
= datetime.datetime.utcfromtimestamp(config.state['cur_block']['block_time'])
config.state['cur_block']['block_time_str'] = config.state['cur_block']['block_time_obj'].isoformat()
cmd = None

for msg in config.state['cur_block']['_messages']:
cmd = parse_message(msg)
if cmd == 'break': break
#logger.debug("*config.state* {}".format(config.state))
result = parse_message(msg)
if result == 'ABORT_BLOCK_PROCESSING': #reorg
return False

#Run Block Processor Functions
#run block processor Functions
BlockProcessor.run_active_functions()
#block successfully processed, track this in our DB
new_block = {
Expand All @@ -145,9 +155,7 @@ def parse_block(block_data):
config.state['cp_backend_block_index'] \
if config.state['cp_backend_block_index'] else '???',
config.state['last_message_index'] if config.state['last_message_index'] != -1 else '???'))

if config.state['cp_latest_block_index'] - cur_block_index < config.MAX_REORG_NUM_BLOCKS: #only when we are near the tip
clean_mempool_tx()
return True

#grab our stored preferences, and rebuild the database if necessary
app_config = config.mongo_db.app_config.find()
Expand Down Expand Up @@ -188,7 +196,8 @@ def parse_block(block_data):
if iteration % 10 == 0:
logger.info("Heartbeat (%s, block: %s, caught up: %s)" % (
iteration, config.state['my_latest_block']['block_index'], fuzzy_is_caught_up()))
logger.info("iteration: ap %s/%s" % (autopilot, autopilot_runner))
logger.info("iteration: ap %s/%s, cp_latest_block_index: %s, my_latest_block: %s" % (autopilot, autopilot_runner,
config.state['cp_latest_block_index'], config.state['my_latest_block']['block_index']))

if not autopilot or autopilot_runner == 0:
try:
Expand Down Expand Up @@ -273,20 +282,21 @@ def parse_block(block_data):
cur_block_index = config.state['my_latest_block']['block_index'] + 1
try:
block_data = cache.get_block_info(cur_block_index,
min(100, (config.state['cp_latest_block_index'] - config.state['my_latest_block']['block_index'])))
prefetch=min(100, (config.state['cp_latest_block_index'] - config.state['my_latest_block']['block_index'])),
min_message_index=config.state['last_message_index'] + 1 if config.state['last_message_index'] != -1 else None)
except Exception, e:
logger.warn(str(e) + " Waiting 3 seconds before trying again...")
time.sleep(3)
continue

# clean api cache
# clean api block cache
if config.state['cp_latest_block_index'] - cur_block_index <= config.MAX_REORG_NUM_BLOCKS: #only when we are near the tip
cache.clean_block_cache(cur_block_index)

try:
parse_block(block_data)
result = parse_block(block_data)
except Exception as e: #if anything bubbles up
logger.exception("Unhandled exception while processing block. Rolling back, waiting 3 seconds and retrying...: %s" % e)
logger.exception("Unhandled exception while processing block. Rolling back, waiting 3 seconds and retrying. Error was: %s" % e)

#counterparty-server might have gone away...
my_latest_block = config.mongo_db.processed_blocks.find_one(sort=[("block_index", pymongo.DESCENDING)])
Expand All @@ -299,6 +309,11 @@ def parse_block(block_data):

time.sleep(3)
continue
if result is False: #reorg, or block processing otherwise not completed
autopilot = False

if config.state['cp_latest_block_index'] - cur_block_index < config.MAX_REORG_NUM_BLOCKS: #only when we are near the tip
clean_mempool_tx()
elif config.state['my_latest_block']['block_index'] > config.state['cp_latest_block_index']:
# should get a reorg message. Just to be on the safe side, prune back MAX_REORG_NUM_BLOCKS blocks
# before what counterpartyd is saying if we see this
Expand Down
13 changes: 6 additions & 7 deletions counterblock/lib/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from counterblock.lib import config, util

logger = logging.getLogger(__name__)
blockinfo_cache = {}

##
## REDIS-RELATED
Expand All @@ -21,14 +22,15 @@ def get_redis_connection():
##
## NOT REDIS RELATED
##
blockinfo_cache = {}
def get_block_info(block_index, prefetch=0):
def get_block_info(block_index, prefetch=0, min_message_index=None):
global blockinfo_cache
if block_index in blockinfo_cache:
return blockinfo_cache[block_index]

blockinfo_cache.clear()
blocks = util.call_jsonrpc_api('get_blocks',
{'block_indexes': range(block_index, block_index + prefetch)},
{'block_indexes': range(block_index, block_index + prefetch),
'min_message_index': min_message_index},
abort_on_error=True)['result']
for block in blocks:
blockinfo_cache[block['block_index']] = block
Expand All @@ -37,11 +39,9 @@ def get_block_info(block_index, prefetch=0):
def block_cache(func):
"""decorator"""
def cached_function(*args, **kwargs):

function_signature = hashlib.sha256(func.__name__ + str(args) + str(kwargs)).hexdigest()

sql = "SELECT block_index FROM blocks ORDER BY block_index DESC LIMIT 1"
block_index = util.call_jsonrpc_api('sql', {'query': sql, 'bindings': []})['result'][0]['block_index']
function_signature = hashlib.sha256(func.__name__ + str(args) + str(kwargs)).hexdigest()

cached_result = config.mongo_db.counterblockd_cache.find_one({'block_index': block_index, 'function': function_signature})

Expand All @@ -64,7 +64,6 @@ def cached_function(*args, **kwargs):

return cached_function


def clean_block_cache(block_index):
#logger.info("clean block cache lower than {}".format(block_index))
config.mongo_db.counterblockd_cache.remove({'block_index': {'$lt': block_index}})
2 changes: 1 addition & 1 deletion counterblock/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
##
## CONSTANTS
##
VERSION = "1.3.0" #should keep up with counterblockd repo's release tag
VERSION = "1.3.1" #should keep up with counterblockd repo's release tag

DB_VERSION = 23 #a db version increment will cause counterblockd to rebuild its database off of counterpartyd

Expand Down
23 changes: 18 additions & 5 deletions counterblock/lib/modules/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def asset_fetch_complete_hook(urls_data):

#compose and fetch all info URLs in all assets with them
for asset in assets:
if not asset['info_url']: continue
if not asset['info_url']:
continue

if asset.get('disabled', False):
logger.info("ExtendedAssetInfo: Skipping disabled asset %s" % asset['asset'])
Expand Down Expand Up @@ -563,7 +564,7 @@ def parse_balance_change(msg, msg_data):
asset_info = config.mongo_db.tracked_assets.find_one({ 'asset': msg_data['asset'] })
if asset_info is None:
logger.warn("Credit/debit of %s where asset ('%s') does not exist. Ignoring..." % (msg_data['quantity'], msg_data['asset']))
return 'continue'
return 'ABORT_THIS_MESSAGE_PROCESSING'
quantity = msg_data['quantity'] if msg['category'] == 'credits' else -msg_data['quantity']
quantity_normalized = blockchain.normalize_quantity(quantity, asset_info['divisible'])

Expand All @@ -581,7 +582,10 @@ def parse_balance_change(msg, msg_data):
last_bal_change['new_balance'] += quantity
last_bal_change['new_balance_normalized'] += quantity_normalized
config.mongo_db.balance_changes.save(last_bal_change)
logger.info("Procesed %s bal change (UPDATED) from tx %s :: %s" % (actionName, msg['message_index'], last_bal_change))
logger.info("%s (UPDATED) %s %s %s %s (new bal: %s, msgID: %s)" % (
actionName.capitalize(), ('%f' % last_bal_change['quantity_normalized']).rstrip('0').rstrip('.'), last_bal_change['asset'],
'from' if actionName == 'debit' else 'to',
last_bal_change['address'], ('%f' % last_bal_change['new_balance_normalized']).rstrip('0').rstrip('.'), msg['message_index'],))
bal_change = last_bal_change
else: #new balance change record for this block
bal_change = {
Expand All @@ -595,7 +599,10 @@ def parse_balance_change(msg, msg_data):
'new_balance_normalized': last_bal_change['new_balance_normalized'] + quantity_normalized if last_bal_change else quantity_normalized,
}
config.mongo_db.balance_changes.insert(bal_change)
logger.info("Procesed %s bal change from tx %s :: %s" % (actionName, msg['message_index'], bal_change))
logger.info("%s %s %s %s %s (new bal: %s, msgID: %s)" % (
actionName.capitalize(), ('%f' % bal_change['quantity_normalized']).rstrip('0').rstrip('.'), bal_change['asset'],
'from' if actionName == 'debit' else 'to',
bal_change['address'], ('%f' % bal_change['new_balance_normalized']).rstrip('0').rstrip('.'), msg['message_index'],))


@StartUpProcessor.subscribe()
Expand All @@ -609,8 +616,14 @@ def init():
config.mongo_db.balance_changes.ensure_index([
("address", pymongo.ASCENDING),
("asset", pymongo.ASCENDING),
("block_time", pymongo.ASCENDING)
("block_index", pymongo.DESCENDING),
("_id", pymongo.DESCENDING)
])
try: #drop unnecessary indexes if they exist
config.mongo_db.balance_changes.drop_index('address_1_asset_1_block_time_1')
except:
pass

#tracked_assets
config.mongo_db.tracked_assets.ensure_index('asset', unique=True)
config.mongo_db.tracked_assets.ensure_index('_at_block') #for tracked asset pruning
Expand Down
4 changes: 2 additions & 2 deletions counterblock/lib/modules/betting.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def sanitize_json_data(data):

def get_feeds_by_source_addresses(addresses):
conditions = { 'source': { '$in': addresses }}
feeds = config.mongo_db.feeds.find(spec=conditions, fields={'_id': False})
feeds = config.mongo_db.feeds.find(spec=conditions, projection={'_id': False})
feeds_by_source = {}
for feed in feeds: feeds_by_source[feed['source']] = feed
return feeds_by_source
Expand Down Expand Up @@ -114,7 +114,7 @@ def get_feed(address_or_url = ''):
'info_status': 'valid'
}
result = {}
feeds = config.mongo_db.feeds.find(spec=conditions, fields={'_id': False}, limit=1)
feeds = config.mongo_db.feeds.find(spec=conditions, projection={'_id': False}, limit=1)
for feed in feeds:
if 'targets' not in feed['info_data'] or ('type' in feed['info_data'] and feed['info_data']['type'] in ['all', 'cfd']):
feed['info_data']['next_broadcast'] = util.next_interval_date(feed['info_data']['broadcast_date'])
Expand Down
3 changes: 0 additions & 3 deletions counterblock/lib/modules/counterwallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ def gen_stats_for_network(network):
"new_count": {"$sum": 1}
}}
])
new_wallets = [] if not new_wallets['ok'] else new_wallets['result']
for e in new_wallets:
ts = time.mktime(datetime.datetime(e['_id']['year'], e['_id']['month'], e['_id']['day']).timetuple())
new_entries[ts] = { #a future wallet_stats entry
Expand All @@ -354,7 +353,6 @@ def gen_stats_for_network(network):
"count": {"$sum": 1}
}}
])
referer_counts = [] if not referer_counts['ok'] else referer_counts['result']
for e in referer_counts:
ts = time.mktime(datetime.datetime(e['_id']['year'], e['_id']['month'], e['_id']['day']).timetuple())
assert ts in new_entries
Expand All @@ -381,7 +379,6 @@ def gen_stats_for_network(network):
"distinct_wallets": {"$addToSet": "$wallet_id"},
}}
])
logins = [] if not logins['ok'] else logins['result']
for e in logins:
ts = time.mktime(datetime.datetime(e['_id']['year'], e['_id']['month'], e['_id']['day']).timetuple())
if ts not in new_entries:
Expand Down
8 changes: 4 additions & 4 deletions counterblock/lib/modules/counterwallet_iofeeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,17 +486,17 @@ def handle_invalid(msg, msg_data):
event = messages.decorate_message_for_feed(msg, msg_data=msg_data)
zmq_publisher_eventfeed.send_json(event)
config.state['last_message_index'] = msg['message_index']
return 'continue'
return 'ABORT_THIS_MESSAGE_PROCESSING'

@MessageProcessor.subscribe(priority=CORE_FIRST_PRIORITY - 1.5)
@MessageProcessor.subscribe(priority=CORE_FIRST_PRIORITY - 0.9) #should run BEFORE processor.messages.handle_reorg()
def handle_reorg(msg, msg_data):
if msg['command'] == 'reorg':
#send out the message to listening clients (but don't forward along while we're catching up)
if config.state['cp_latest_block_index'] - config.state['my_latest_block']['block_index'] < config.MAX_REORG_NUM_BLOCKS:
msg_data['_last_message_index'] = config.state['last_message_index']
event = messages.decorate_message_for_feed(msg, msg_data=msg_data)
zmq_publisher_eventfeed.send_json(event)
return 'break' #break out of inner loop
zmq_publisher_eventfeed.send_json(event)
#processor.messages.handle_reorg() will run immediately after this and handle the rest

@MessageProcessor.subscribe(priority=CWIOFEEDS_PRIORITY_PARSE_FOR_SOCKETIO)
def parse_for_socketio(msg, msg_data):
Expand Down
Loading

0 comments on commit daf9629

Please sign in to comment.