Skip to content

Commit

Permalink
v1 of in-memory mempool (MemMempool)
Browse files Browse the repository at this point in the history
  • Loading branch information
jotapea committed Dec 16, 2023
1 parent 77e5c7f commit aa9a375
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 47 deletions.
205 changes: 158 additions & 47 deletions counterpartylib/lib/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import inspect
from xmltodict import unparse as serialize_to_xml

import itertools

from counterpartylib.lib import config
from counterpartylib.lib import exceptions
from counterpartylib.lib import util
Expand Down Expand Up @@ -84,6 +86,11 @@
current_api_status_code = None #is updated by the APIStatusPoller
current_api_status_response_json = None #is updated by the APIStatusPoller

# is updated by MemMempool ('maxmempool' bitcoin conf directly affects this, keeping 300MB default works well)
memmempool_txids_non_cntrprty = set()
memmempool_txids_cntrprty = set()
memmempool_cached_response = []

class APIError(Exception):
pass

Expand Down Expand Up @@ -512,6 +519,150 @@ def run(self):
current_api_status_response_json = None
time.sleep(config.BACKEND_POLL_INTERVAL)

class MemMempool(threading.Thread):
"""In-memory mempool, maintains a ready-to-serve response. Alternative to in-DB mempool that locks it up for too long during high mempool transaction count."""
def __init__(self):
self.last_mempool_check = 0
threading.Thread.__init__(self)
self.stop_event = threading.Event()

def stop(self):
self.stop_event.set()

def run(self):
logger.info('Starting MemMempool.')
global memmempool_txids_non_cntrprty
global memmempool_txids_cntrprty
global memmempool_cached_response
db = database.get_connection(read_only=True, integrity_check=False)

while self.stop_event.is_set() != True:

if time.time() - self.last_mempool_check > 1 * 60: # 1 minute since last check.

status = get_running_info_shared(db)
if status["server_ready"]:

# TODO logs only showing in the first iteration
# logger.info('Mempool processing start...')

batch_size = 500
raw_mempool = set(backend.getrawmempool())

# | = set union (+ does not work)
cached_txids = memmempool_txids_non_cntrprty | memmempool_txids_cntrprty

to_remove = set()
for tx_hash in cached_txids:
if tx_hash not in raw_mempool:
to_remove.add(tx_hash)

to_process = set()
for tx_hash in raw_mempool:
if tx_hash not in cached_txids:
to_process.add(tx_hash)

to_process_batch = set(itertools.islice(to_process, batch_size))
to_add = {}
for tx_hash in to_process_batch:
tx_hex = None
try:
tx_hex = backend.getrawtransaction(tx_hash)
# if tx_hex is None:
# raise Exception('TODO if necessary{}'.format(tx_hash))
except Exception as e:
# TODO logs only showing in the first iteration
logger.warning('Failed to fetch raw TX, continue; %s', (e, ))
continue

if tx_hex is None:
continue

source, destination, btc_amount, fee, data, decoded_tx = blocks.get_tx_info(tx_hex)
data_hex = None
if data is not None:
data_hex = util.hexlify(data)

if data_hex is not None:
to_add[tx_hash] = {
"tx_hash": tx_hash,
"source": source,
"destination": destination,
"btc_amount": btc_amount,
"fee": fee,
"data": data_hex,
"decoded_tx": decoded_tx,
}

processed_txids_cntrprty = set(to_add.keys())
processed_txids_non_cntrprty = to_process_batch - processed_txids_cntrprty

new_cached_response = []

for cached_item in memmempool_cached_response:
if cached_item["tx_hash"] not in to_remove:
new_cached_response.append(cached_item)

for key in to_add:
new_cached_response.append(to_add[key])

memmempool_txids_non_cntrprty = (memmempool_txids_non_cntrprty - to_remove) | processed_txids_non_cntrprty
memmempool_txids_cntrprty = (memmempool_txids_cntrprty - to_remove) | processed_txids_cntrprty
memmempool_cached_response = new_cached_response

self.last_mempool_check = time.time()

time.sleep(config.BACKEND_POLL_INTERVAL)

def get_running_info_shared(db):
latestBlockIndex = backend.getblockcount()

try:
check_database_state(db, latestBlockIndex)
except DatabaseError:
caught_up = False
else:
caught_up = True

try:
cursor = db.cursor()
blocks = list(cursor.execute('''SELECT * FROM blocks WHERE block_index = ?''', (util.CURRENT_BLOCK_INDEX, )))
assert len(blocks) == 1
last_block = blocks[0]
cursor.close()
except:
last_block = None

try:
last_message = util.last_message(db)
except:
last_message = None

try:
indexd_blocks_behind = backend.getindexblocksbehind()
except:
indexd_blocks_behind = latestBlockIndex if latestBlockIndex > 0 else 999999
indexd_caught_up = indexd_blocks_behind <= 1

server_ready = caught_up and indexd_caught_up

return {
'server_ready': server_ready,
'db_caught_up': caught_up,
'bitcoin_block_count': latestBlockIndex,
'last_block': last_block,
'indexd_caught_up': indexd_caught_up,
'indexd_blocks_behind': indexd_blocks_behind,
'last_message_index': last_message['message_index'] if last_message else -1,
'api_limit_rows': config.API_LIMIT_ROWS,
'running_testnet': config.TESTNET,
'running_regtest': config.REGTEST,
'running_testcoin': config.TESTCOIN,
'version_major': config.VERSION_MAJOR,
'version_minor': config.VERSION_MINOR,
'version_revision': config.VERSION_REVISION
}

class APIServer(threading.Thread):
"""Handle JSON-RPC API calls."""
def __init__(self, db=None):
Expand Down Expand Up @@ -539,6 +690,12 @@ def get_pw(username):
######################
#READ API

@dispatcher.add_method
def get_memmempool():
return {
'cached_response': memmempool_cached_response,
}

# Generate dynamically get_{table} methods
def generate_get_method(table):
def get_method(**kwargs):
Expand Down Expand Up @@ -754,53 +911,7 @@ def get_blocks(block_indexes, min_message_index=None):

@dispatcher.add_method
def get_running_info():
latestBlockIndex = backend.getblockcount()

try:
check_database_state(self.db, latestBlockIndex)
except DatabaseError:
caught_up = False
else:
caught_up = True

try:
cursor = self.db.cursor()
blocks = list(cursor.execute('''SELECT * FROM blocks WHERE block_index = ?''', (util.CURRENT_BLOCK_INDEX, )))
assert len(blocks) == 1
last_block = blocks[0]
cursor.close()
except:
last_block = None

try:
last_message = util.last_message(self.db)
except:
last_message = None

try:
indexd_blocks_behind = backend.getindexblocksbehind()
except:
indexd_blocks_behind = latestBlockIndex if latestBlockIndex > 0 else 999999
indexd_caught_up = indexd_blocks_behind <= 1

server_ready = caught_up and indexd_caught_up

return {
'server_ready': server_ready,
'db_caught_up': caught_up,
'bitcoin_block_count': latestBlockIndex,
'last_block': last_block,
'indexd_caught_up': indexd_caught_up,
'indexd_blocks_behind': indexd_blocks_behind,
'last_message_index': last_message['message_index'] if last_message else -1,
'api_limit_rows': config.API_LIMIT_ROWS,
'running_testnet': config.TESTNET,
'running_regtest': config.REGTEST,
'running_testcoin': config.TESTCOIN,
'version_major': config.VERSION_MAJOR,
'version_minor': config.VERSION_MINOR,
'version_revision': config.VERSION_REVISION
}
return get_running_info_shared(self.db)

@dispatcher.add_method
def get_element_counts():
Expand Down
5 changes: 5 additions & 0 deletions counterpartylib/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ def start_all(db):
api_status_poller.daemon = True
api_status_poller.start()

# In-memory mempool.
in_memory_mempool = api.MemMempool()
in_memory_mempool.daemon = True
in_memory_mempool.start()

# API Server.
api_server = api.APIServer()
api_server.daemon = True
Expand Down
1 change: 1 addition & 0 deletions counterpartylib/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def api_server(request, cp_server):
server.configure_rpc(config.RPC_PASSWORD)

# start RPC server and wait for server to be ready
# TODO MemMempool
api_server = api.APIServer()
api_server.daemon = True
api_server.start()
Expand Down

0 comments on commit aa9a375

Please sign in to comment.