# Bootstrax

First draft: Jelle, November 2018

Watch for runs to appear from the DAQ, then starts a strax process to process it. If a run fails, retry with  exponential backoff.

Multiple bootstrax instances can be run, but only one per machine. If you start a second one on the same machine, it will try to kill the first one.

For now this is a notebook for development convenience, but of course we should make it into a script eventually.

### Philosophy

Bootstrax has a crash-only / recovery first philosophy. Any error in the core code causes a crash; there is no nice exit or mandatory cleanup. Boostrax focuses on recovery after restarts: before starting work, we look for and fix any mess left by crashes.

This ensures that hangs and hard crashes do not require expert tinkering to repair databases. Plus, you can just stop the program with ctrl-c (or pulling the machine's power plug) at any time.

Errors during run processing are assumed to be retry-able. We track the number of failures per run to decide how long to wait until retries; only if a user marks the run as 'abandoned' (using an external system, e.g. the website) do we stop retrying.

### Mongo documents

Bootstrax records its status in a document in the 'bootstrax' collection in (currently) the runs db. These documents contain:
  - **host**: socket.getfqdn()
  - **time**: last time this bootstrax showed lifesigns
  - **state**: one of the following:
    - **starting**: very rare, only set briefly during start
    - **busy**: doing something
    - **idle**: NOT doing something but available for processing new runs

Additionally, bootstrax tracks information with each run in the 'bootstrax' field of the run doc. We could also put this elsewhere, but it seemed convenient. This field contains the following subfields:
  - **state**: one of the following:
    - **considering**: a boostrax is deciding what to do with it
    - **busy**: a strax process is working on it
    - **failed**: something is wrong, but we will retry after some amount of time
    - **abandoned**: bootstrax will ignore this run
  - **reason**: reason for last failure, if there ever was one (otherwise this field does not exists). Tracking failure history is really the DAQ log's reponsibility; this is only provided for convenience.
  - **n_failures**: number of failures on this run, if there ever was one (otherwise this field does not exist).
  - **next_retry**: time after which bootstrax might retry processing this run.

## Initialization

In [None]:
import logging
import multiprocessing
import os
import os.path as osp
from psutil import pid_exists
import signal
import socket
import time

import pymongo
import straxen

In [None]:
mongo_url = 'mongodb://{username}:{password}@ds263172.mlab.com:63172/run'
run_collname = 'run'
dbname = 'run'

# TODO: Check ssh mount?
output_folder = '/home/aalbers/strax_data'

# Timeouts in seconds
timeouts = {
    # Waiting between escalating SIGTERM -> SIGKILL -> crashing bootstrax
    # when trying to kill another process (usually child strax)
    'signal_escalate': 3,
    # Minimum waiting time to retry a failed run
    # Escalates exponentially on repeated failures: 1x, 5x, 25x, 125x, 125x, 125x, ...
    'retry_run': 60, 
    # Maximum time for strax to complete a processing
    # if exceeded, strax will be killed by bootstrax
    'max_processing_time': 120,
    # Sleep between checking whether a strax process is alive
    'check_on_strax': 10,
    # Maximum time a run is in the 'busy' state
    # if exceeded, will be labeled as an untracked failure
    # Must be larger than max_processing_time!
    'max_busy_time': 180,
    # Maximum time a run is in the 'considering' state
    # if exceeded, will be labeled as an untracked failure
    'max_considering_time': 60, 
    # Minimum time to wait between database cleanup operations
    'cleanup_spacing': 60,
    # Sleep time when there is nothing to do
    'idle_nap': 10,
    # If we don't hear from a bootstrax on another host for this long,
    # remove its entry from the bootstrax status collection
    # Must be much longer than idle_nap and check_on_strax!
    'bootstrax_presumed_dead': 300
}

In [None]:
logging.basicConfig(level=logging.INFO)
log = logging.getLogger()
hostname = socket.getfqdn()

# We use exactly the logic of straxen to access the runs DB;
# this avoids duplication, and ensures strax can access the runs DB iff we can
def new_context():
    st = straxen.XENONContext(
        storage = straxen.RunDB(
            mongo_url=mongo_url,
            mongo_collname=run_collname,
            runid_field='number',
            new_data_path=output_folder,
            register_new=True,
            mongo_dbname=dbname),
        register_all=straxen.plugins.plugins)
    return st

st = new_context()

run_db = st.storage[0].client[dbname]
run_coll = run_db[run_collname]
bs_coll = run_db['bootstrax']

# Unfortunately the logs are in the 'dax' db rather than the 'run' db
# and we must specify the DB when making the client, or we get auth errors.
# So we  have to make another client, and there is duplication after all...
dax_db = pymongo.MongoClient(
    mongo_url.replace('/run', '/dax'),
    username=straxen.get_secret('rundb_username'),
    password=straxen.get_secret('rundb_password'))['dax']
log_coll = dax_db['log']

# Ping the databases to ensure the mongo connections are working
run_db.command('ping')
dax_db.command('ping')

In [None]:
def kill_process(pid, wait_time=None):
    """Kill process pid, or raise RuntimeError if we cannot
    :param wait_time: time to wait before escalating signal strength
    """
    if wait_time is None:
        wait_time = timeouts['signal_escalate']
    if not pid_exists(pid):
        return

    for signal in [signal.SIGTERM, signal.SIGKILL, 'die']:
        time.sleep(wait_time)
        if not pid_exists(x['pid']):
            return
        if signal == 'die':
            raise RuntimeError(f"Could not kill process {pid}")
        os.kill(x['pid'], signal)
        
# Ensure we're the only bootstrax on this host
any_running = list(bs_coll.find({'host': hostname}))
for x in any_running:
    if pid_exists(x['pid']):
        log.warning(f'Bootstrax already running with PID {x["pid"]}, trying to kill it')
        # Disampled while running in notebook
        # kill_process(x['pid'])
    bs_coll.delete_one({'_id': x['_id']})
                    
# Register ourselves
state_doc_id = bs_coll.insert_one(dict(
    host=hostname,
    started=time.time(),
    # last_seen=time.time(),
    pid=os.getpid())).inserted_id
                    
def set_state(state):
    """Inform the bootstrax collection we're in a different state
    
    if state is None, leave state unchanged, just update heartbeat time
    """
    update = {'time': time.time()}
    if state is not None:
        update['state'] = state
    bs_coll.find_one_and_update(
        {'_id': state_doc_id},
        {'$set': update})
                    
def send_heartbeat():
    """Inform the bootstrax collection we're still here
    Use during long-running tasks where state doesn't change
    """
    # Same as set_state, just don't change state
    set_state(None)
                    
set_state('starting')

In [None]:
#st.storage[0].client.database_names()

# Task functions

## Helpers

In [None]:
def set_run_state(rd, state, return_new_doc=True, **kwargs):
    bd = rd['bootstrax']
    bd.update({
        'state': state,
        'host': hostname,
        'time': time.time(),
        **kwargs})
    
    if state == 'failed':
        bd['n_failures'] = bd.get('n_failures', 0) + 1
        
    return run_coll.find_one_and_update(
        {'_id': rd['_id']},
        {'$set': {'bootstrax': bd}},
        return_document=return_new_doc)

def get_run(query, return_new_doc=True):
    # We must first do an atomic find-and-update to set the run's state 
    # to "considering", to ensure the run doesn't get picked up by a 
    # bootstrax on another host.
    rd = run_coll.find_one_and_update(
        query,
        {"$set": {'bootstrax.state': 'considering'}},
        projection=['bootstrax'],
        return_document=True,
        sort=[('start', pymongo.DESCENDING)])
    
    # Next, we can update the bootstrax entry properly with set_run_state
    # (preserving things like n_failures)
    if rd is None:
        return None
    return set_run_state(rd, 'considering', return_new_doc=return_new_doc)

def fail_run(rd, reason):
    if 'number' not in rd:
        long_run_id = f"run <no run number!!?>:{rd['_id']}"
    else:    
        long_run_id = f"run {rd['number']}:{rd['_id']}"    
        
    if 'n_failures' in rd['bootstrax']:
        fail_name = 'Repeated failure'
        failure_message_level = 'info'
    else:
        fail_name = 'New failure'
        failure_message_level = 'warning'
                
    # Report to run db
    set_run_state(rd, 'failed', 
                  reason=reason,
                  next_retry=(
                      time.time() 
                      + timeouts['retry_run'] * 5**min(rd.get('n_failures', 0), 3)))
        
    # Report to DAQ log and screen
    log_warning(f"{fail_name} on run {long_run_id}: {reason}", 
                priority=failure_message_level)
    
def log_warning(message, priority='warning'):
    getattr(log, priority)(message)
    log_coll.insert_one({
        'message': message,
        'user': f'bootstrax_{hostname}',
        'priority': dict(warning=2, info=1).get(priority, 3)})

## Run processing

In [None]:
def run_strax(run_id, input_dir):
    log.info(f"Starting strax to make {run_id} with input dir {input_dir}")
    st = new_context()
    st.make(run_id, 'raw_records', 
            config=dict(input_dir=input_dir))
    
def process_run(rd):
    log.info(f"Starting processing of run {rd['number']}")
    if rd is None:
        raise RuntimeError("Pass a valid rundoc, not None!")
             
    # Shortcuts for failing
    class RunFailed(Exception):
        pass
    def fail(reason):
        fail_run(rd, reason)
        raise RunFailed

    try:

        try:
            run_id = '%06d' % rd['number']
        except Exception as e:
            fail(f"Could not format run number: {str(e)}")
    
        for dd in rd['data']:
            if dd['type'] == 'live':
                break
            else:
                fail("Non-live data already registered; untracked failure?")
        else:
            fail(f"No live data entry in rundoc")

        if not osp.exists(dd['location']):
            fail(f"No access to live data folder {dd['location']}")

        loc = osp.join(dd['location'], run_id)
        if not osp.exists(loc):
            fail(f"No live data at claimed location {loc}")
                 
        strax_proc = multiprocessing.Process(
            target=run_strax, 
            args=(run_id, loc))

        t0 = time.time()
        info = dict(started_processing = t0)
        strax_proc.start()
                 
        while True:
            send_heartbeat()

            ec = strax_proc.exitcode
            if ec is None:
                if time.time() > t0 + timeouts['max_processing_time']:
                    fail_run(f"Processing took longer than {timeouts['max_processing_time']}")
                    kill_process(strax_proc.pid)
                
                # Still working, check in later
                # TODO: check for hangs by looking for activity in output dir
                log.info(f"Still processing run {run_id}")
                set_run_state(rd, 'busy', **info)
                time.sleep(timeouts['check_on_strax'])
                continue
                 
            elif ec == 0:
                log.info(f"Run {run_id} processed succesfully")
                set_run_state(rd, 'done', **info)
                break
                 
            else:
                log.info(f"Failure while procesing run {run_id}")   # No warning, that comes later
                try:
                    exc_info = st.get_meta(run_id, 'raw_records')['exception']
                except Exception as e:
                    exc_info = None
                    fail(f"Strax exited with exit code {ec}. "
                         "Could not get exception info from raw_recor metadata: "
                         + str(e))

                fail(f"Strax exited with exit code {ec}. "
                     f"Exception info from raw_records: {exc_info}")

            # TODO: Strax should update run db with metadata
            # currently it doesn't, even if processing succeeds...

    except RunFailed:
        return

## Cleanup

In [None]:
def cleanup_db():
    """Find various pathological runs and clean them from the db"""
    log.info("Checking for bad stuff in database")
    
    # Bootstrax instances that say they are active, but haven't reported in for a while
    # (these are on other hosts, or we would have killed them already)
    while True:
        send_heartbeat()
        bd = bs_coll.find_one_and_delete(
            {'time': 
             {'$lt': time.time() - timeouts['bootstrax_presumed_dead']}})
        if bd is None:
            break
        log_warning("Bootstrax on host {bd['host']} presumed dead. Rest in peace")
    
    # Runs that say they are 'considering' or 'busy' but nothing happened for a while
    for state, timeout in [
            ('considering', timeouts['max_considering_time']),
            # TODO: This should be two hours in production
            ('busy', timeouts['max_busy_time'])]:
        while True:
            send_heartbeat()
            rd = get_run({'bootstrax.state': state,
                          'bootstrax.time': {'$lt': time.time() - timeout}},
                         return_new_doc=False)
            if rd is None:
                break
            fail_run(rd, 
                     f"Host {rd['bootstrax']['host']} said it was {state} "
                     f"at {rd['bootstrax']['time']}, but then didn't get further; "
                     f"perhaps it crashed on this run or is still stuck?")
    
    # Runs that say they are done, but no data (beyond, perhaps, the live data)
    # is registered
    while True:
        send_heartbeat()
        rd = get_run(
            {'bootstrax.state': 'done',
             'data': {
                 '$not': {
                     '$elemMatch': {
                         "type": {
                             '$ne': 'live'}}}}})
        if rd is None:
            break
        fail_run(rd, "Bootstrax state was done, but no non-live data registered.")

In [None]:
# cleanup_db()

# Temp hacks you probably don't need

In [None]:
# Erase all bootstrax info for newish runs
# for rund in run_coll.find({'number': {'$gt': 1733}}):
#     data = [d for d in rund['data'] if d['type'] == 'live']
#     run_coll.find_one_and_update(
#         {'_id': rund['_id']},
#         {'$set': {'data': data},
#          '$unset': {'bootstrax': ""}})

# Run loop

In [None]:
next_cleanup_time = 0

while True:
    log.info("Starting loop")
    set_state('busy')
    # Check resources are still OK, otherwise crash / reboot program
    
    # Process new runs
    rd = get_run({"bootstrax": {"$exists": False}})
    if rd is not None:
        process_run(rd)
        continue
 
    # Is another bootstrax instance idle? Then start riskier stuff.
    # TODO: implement check, make optional
    
    # Scan DB for runs with unusual problems
    if time.time() > next_cleanup_time:
        cleanup_db()
        next_cleanup_time = time.time() + timeouts['cleanup_spacing']

    # Any failed runs to retry?
    rd = get_run({"bootstrax.state": 'failed',
                  "bootstrax.next_retry": {'$lt': time.time()}})
    if rd is not None:
        process_run(rd)
        continue
        
    log.info("Nothing left to do, sleeping")
    set_state('idle')
    time.sleep(timeouts['idle_nap'])