From 00f6c0a98d264255d52b3ec8d8d3715c3ba82f2e Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Sun, 25 Oct 2020 22:27:53 +0100 Subject: [PATCH] Initialize shell as standalone worker and redis server --- galileo/cli/shell.py | 60 ++++++++++++++++++++++++++++++++++++++-- galileo/worker/daemon.py | 2 ++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/galileo/cli/shell.py b/galileo/cli/shell.py index ff8de12..2494097 100644 --- a/galileo/cli/shell.py +++ b/galileo/cli/shell.py @@ -1,9 +1,11 @@ -import redis +import argparse from galileo.shell.shell import * -def main(): +def _init(): + import redis + rds = redis.Redis( host=os.getenv('galileo_redis_host', 'localhost'), port=int(os.getenv('galileo_redis_port', 6379)), @@ -11,6 +13,58 @@ def main(): ) init_module(rds, __name__) + +def _init_standalone(): + from tempfile import mktemp + from threading import Thread + from shutil import rmtree + from galileo.worker.context import Context + from galileo.worker.daemon import WorkerDaemon + + ctx = Context() + + # prepare embedded redis instance + filename = mktemp(prefix='galileo_shell_', suffix='.db') + os.environ['galileo_redis_host'] = 'file://' + filename # Context resolves this correctly using redislite + rds = ctx.create_redis() + rds.randomkey() + + # init this module with global variables + init_module(rds, __name__) + + # start worker daemon + global eventbus + daemon = WorkerDaemon(ctx=ctx, eventbus=eventbus) + t_daemon = Thread(target=daemon.run, daemon=True) + t_daemon.start() + + # register cleanup method + def cleanup(): + daemon.close() + eventbus.close() + + # remove redislite db + rds.shutdown() + os.remove(filename) + os.remove(rds.redis_configuration_filename) + os.remove(rds.settingregistryfile) + rmtree(rds.redis_dir) + + atexit.register(cleanup) + + +def _main(): + parser = argparse.ArgumentParser() + parser.add_argument('--standalone', required=False, action="store_true", + help='also start an embedded redis server and a worker instance') + args = parser.parse_args() + + if args.standalone: + print('running in standalone mode') + _init_standalone() + else: + _init() + if is_interactive: sys.ps1 = prompt @@ -25,4 +79,4 @@ def main(): if __name__ == '__main__': # python -i shell.py will execute to here and then drop into the interactive shell - main() + _main() diff --git a/galileo/worker/daemon.py b/galileo/worker/daemon.py index b217b5c..dcd919b 100644 --- a/galileo/worker/daemon.py +++ b/galileo/worker/daemon.py @@ -53,6 +53,8 @@ def _create_trace_queue(self): return multiprocessing.Queue() def run(self): + self.rds.ping() + self.eventbus.subscribe(self._on_create_client_command) self.eventbus.subscribe(self._on_close_client_command) self.eventbus.subscribe(self._on_register_command)