Skip to content

Commit

Permalink
Separate async item processor per session
Browse files Browse the repository at this point in the history
Improve daemon wait logic

Fixes kyuupichan#100
  • Loading branch information
Neil Booth committed Feb 19, 2017
1 parent 23b7ec3 commit 86f6a14
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 136 deletions.
6 changes: 1 addition & 5 deletions electrumx_rpc.py
Expand Up @@ -26,13 +26,9 @@ def __init__(self):
super().__init__(version=JSONRPCv2)
self.max_send = 0
self.max_buffer_size = 5*10**6
self.event = asyncio.Event()

def have_pending_items(self):
self.event.set()

async def wait_for_response(self):
await self.event.wait()
await self.items_event.wait()
await self.process_pending_items()

def send_rpc_request(self, method, params):
Expand Down
27 changes: 12 additions & 15 deletions lib/jsonrpc.py
Expand Up @@ -258,7 +258,7 @@ class JSONSessionBase(util.LoggedClass):
from empty
'''
_next_session_id = 0
_pending_reqs = {}
_pending_reqs = {} # Outgoing requests waiting for a response

@classmethod
def next_session_id(cls):
Expand Down Expand Up @@ -320,6 +320,7 @@ def __init__(self, version=JSONRPCCompat):
self.pause = False
# Handling of incoming items
self.items = collections.deque()
self.items_event = asyncio.Event()
self.batch_results = []
# Handling of outgoing requests
self.next_request_id = 0
Expand Down Expand Up @@ -461,10 +462,8 @@ def decode_message(self, payload):
self.send_error('empty batch', JSONRPC.INVALID_REQUEST)
return

# Incoming items get queued for later asynchronous processing.
if not self.items:
self.have_pending_items()
self.items.append(payload)
self.items_event.set()

async def process_batch(self, batch, count):
'''Processes count items from the batch according to the JSON 2.0
Expand Down Expand Up @@ -626,6 +625,9 @@ async def process_pending_items(self, limit=8):
if binary:
self.send_binary(binary)

if not self.items:
self.items_event.clear()

def count_pending_items(self):
'''Counts the number of pending items.'''
return sum(len(item) if isinstance(item, list) else 1
Expand Down Expand Up @@ -716,15 +718,6 @@ def send_bytes(self, binary):

# App layer

def have_pending_items(self):
'''Called to indicate there are items pending to be processed
asynchronously by calling process_pending_items.
This is *not* called every time an item is added, just when
there were previously none and now there is at least one.
'''
raise NotImplementedError

def using_bandwidth(self, amount):
'''Called as bandwidth is consumed.
Expand All @@ -749,8 +742,12 @@ class JSONSession(JSONSessionBase, asyncio.Protocol):
'''A JSONSessionBase instance specialized for use with
asyncio.protocol to implement the transport layer.
Derived classes must provide have_pending_items() and may want to
override the request and notification handlers.
The app should await on items_event, which is set when unprocessed
incoming items remain and cleared when the queue is empty, and
then arrange to call process_pending_items asynchronously.
Derived classes may want to override the request and notification
handlers.
'''

def __init__(self, version=JSONRPCCompat):
Expand Down
120 changes: 45 additions & 75 deletions server/controller.py
Expand Up @@ -69,9 +69,6 @@ def __init__(self, env):
self.next_stale_check = 0
self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8)
self.queue = asyncio.PriorityQueue()
self.delayed_sessions = []
self.next_queue_id = 0
self.cache_height = 0
env.max_send = max(350000, env.max_send)
self.setup_bands()
Expand Down Expand Up @@ -136,67 +133,6 @@ def session_priority(self, session):
def is_deprioritized(self, session):
return self.session_priority(session) > self.BANDS

async def enqueue_delayed_sessions(self):
while True:
now = time.time()
keep = []
for pair in self.delayed_sessions:
timeout, item = pair
priority, queue_id, session = item
if not session.pause and timeout <= now:
self.queue.put_nowait(item)
else:
keep.append(pair)
self.delayed_sessions = keep

# If paused and session count has fallen, start listening again
if (len(self.sessions) <= self.low_watermark
and self.state == self.PAUSED):
await self.start_external_servers()

# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions

await asyncio.sleep(1)

def enqueue_session(self, session):
# Might have disconnected whilst waiting
if session not in self.sessions:
return
priority = self.session_priority(session)
item = (priority, self.next_queue_id, session)
self.next_queue_id += 1

excess = max(0, priority - self.BANDS)
if excess != session.last_delay:
session.last_delay = excess
if excess:
session.log_info('high bandwidth use, deprioritizing by '
'delaying responses {:d}s'.format(excess))
else:
session.log_info('stopped delaying responses')
delay = max(int(session.pause), excess)
if delay:
self.delayed_sessions.append((time.time() + delay, item))
else:
self.queue.put_nowait(item)

async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
priority_, id_, session = await self.queue.get()
if session in self.sessions:
await session.process_pending_items()
# Re-enqueue the session if stuff is left
if session.items:
self.enqueue_session(session)

async def run_in_executor(self, func, *args):
'''Wait whilst running func in the executor.'''
return await self.loop.run_in_executor(None, func, *args)
Expand Down Expand Up @@ -225,24 +161,40 @@ def on_future_done(self, future):
except Exception:
self.log_error(traceback.format_exc())

async def check_request_timeouts(self):
'''Regularly check pending JSON requests for timeouts.'''
async def housekeeping(self):
'''Regular housekeeping checks.'''
n = 0
while True:
await asyncio.sleep(30)
n += 1
await asyncio.sleep(15)
JSONSessionBase.timeout_check()
if n % 10 == 0:
self.clear_stale_sessions()

# Start listening for incoming connections if paused and
# session count has fallen
if (self.state == self.PAUSED and
len(self.sessions) <= self.low_watermark):
await self.start_external_servers()

# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions

async def wait_for_bp_catchup(self):
'''Called when the block processor catches up.'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
self.ensure_future(self.peer_mgr.main_loop())
self.ensure_future(self.start_servers())
self.ensure_future(self.check_request_timeouts())
self.ensure_future(self.housekeeping())
self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.enqueue_delayed_sessions())
self.ensure_future(self.notify())
for n in range(4):
self.ensure_future(self.serve_requests())

async def main_loop(self):
'''Controller main loop.'''
Expand Down Expand Up @@ -379,11 +331,28 @@ def electrum_header(self, height):
self.header_cache[height] = header
return header

def session_delay(self, session):
priority = self.session_priority(session)
excess = max(0, priority - self.BANDS)
if excess != session.last_delay:
session.last_delay = excess
if excess:
session.log_info('high bandwidth use, deprioritizing by '
'delaying responses {:d}s'.format(excess))
else:
session.log_info('stopped delaying responses')
return max(int(session.pause), excess)

async def process_items(self, session):
'''Waits for incoming session items and processes them.'''
while True:
await session.items_event.wait()
await asyncio.sleep(self.session_delay(session))
if not session.pause:
await session.process_pending_items()

def add_session(self, session):
now = time.time()
if now > self.next_stale_check:
self.next_stale_check = now + 300
self.clear_stale_sessions()
session.items_future = self.ensure_future(self.process_items(session))
gid = int(session.start_time - self.start_time) // 900
self.groups[gid].append(session)
self.sessions[session] = gid
Expand All @@ -400,6 +369,7 @@ def add_session(self, session):

def remove_session(self, session):
'''Remove a session from our sessions list if there.'''
session.items_future.cancel()
if session in self.sessions:
gid = self.sessions.pop(session)
assert gid in self.groups
Expand Down
72 changes: 40 additions & 32 deletions server/daemon.py
Expand Up @@ -10,6 +10,7 @@

import asyncio
import json
import time
import traceback

import aiohttp
Expand Down Expand Up @@ -38,6 +39,8 @@ def __init__(self, urls):
# Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
self.workqueue_semaphore = asyncio.Semaphore(value=10)
self.down = False
self.last_error_time = 0

def set_urls(self, urls):
'''Set the URLS to the given list, and switch to the first one.'''
Expand Down Expand Up @@ -65,48 +68,56 @@ def failover(self):
return True
return False

async def _send_data(self, data):
async with self.workqueue_semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(self.url(), data=data) as resp:
# If bitcoind can't find a tx, for some reason
# it returns 500 but fills out the JSON.
# Should still return 200 IMO.
if resp.status in (200, 500):
return await resp.json()
return (resp.status, resp.reason)

async def _send(self, payload, processor):
'''Send a payload to be converted to JSON.
Handles temporary connection issues. Daemon reponse errors
are raise through DaemonError.
'''
self.prior_msg = None
self.skip_count = None

def log_error(msg, skip_once=False):
if skip_once and self.skip_count is None:
self.skip_count = 1
if msg != self.prior_msg or self.skip_count == 0:
self.skip_count = 10
self.prior_msg = msg
self.logger.error('{} Retrying between sleeps...'
.format(msg))
self.skip_count -= 1
def log_error(error):
self.down = True
now = time.time()
prior_time = self.last_error_time
if now - prior_time > 60:
self.last_error_time = now
if prior_time and self.failover():
secs = 0
else:
self.logger.error('{} Retrying occasionally...'
.format(error))

data = json.dumps(payload)
secs = 1
max_secs = 16
max_secs = 4
while True:
try:
async with self.workqueue_semaphore:
async with aiohttp.post(self.url(), data=data) as resp:
# If bitcoind can't find a tx, for some reason
# it returns 500 but fills out the JSON.
# Should still return 200 IMO.
if resp.status in (200, 500):
if self.prior_msg:
self.logger.info('connection restored')
result = processor(await resp.json())
return result
result = await self._send_data(data)
if not isinstance(result, tuple):
result = processor(result)
if self.down:
self.down = False
self.last_error_time = 0
self.logger.info('connection restored')
return result
log_error('HTTP error code {:d}: {}'
.format(resp.status, resp.reason))
.format(result[0], result[1]))
except asyncio.TimeoutError:
log_error('timeout error.', skip_once=True)
log_error('timeout error.')
except aiohttp.ClientHttpProcessingError:
log_error('HTTP error.', skip_once=True)
log_error('HTTP error.')
except aiohttp.ServerDisconnectedError:
log_error('disconnected.', skip_once=True)
log_error('disconnected.')
except aiohttp.ClientConnectionError:
log_error('connection problem - is your daemon running?')
except self.DaemonWarmingUpError:
Expand All @@ -116,11 +127,8 @@ def log_error(msg, skip_once=False):
except Exception:
self.log_error(traceback.format_exc())

if secs >= max_secs and self.failover():
secs = 1
else:
await asyncio.sleep(secs)
secs = min(max_secs, secs * 2)
await asyncio.sleep(secs)
secs = min(max_secs, secs * 2, 1)

def logged_url(self, url=None):
'''The host and port part, for logging.'''
Expand Down

0 comments on commit 86f6a14

Please sign in to comment.