diff --git a/parsl/config.py b/parsl/config.py index 964c7d6e44..cc3a340c0a 100644 --- a/parsl/config.py +++ b/parsl/config.py @@ -30,6 +30,8 @@ class Config(RepresentationMixin): data_management_max_threads : int, optional Maximum number of threads to allocate for the data manager to use for managing input and output transfers. Default is 10. + db_logger_config : LoggerConfig, optional + The config to use for database monitoring. Default is None which does not log to a database. lazy_errors : bool, optional If True, errors from task failures will not be raised until `future.result()` is called. Otherwise, they will be raised as soon as the task returns. Default is True. diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index eaefe47f87..2ed5176f75 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -8,10 +8,11 @@ import inspect import sys import multiprocessing -import getpass + +from getpass import getuser +from uuid import uuid4 from datetime import datetime from socket import gethostname - from concurrent.futures import Future from functools import partial @@ -90,30 +91,30 @@ def __init__(self, config=Config()): # ES logging self.tasks_completed_count = 0 self.tasks_failed_count = 0 - self.db_logger_config_object = config.db_logger_config - if self.db_logger_config_object is not None and self.db_logger_config_object.logger_type == 'local_database'\ - and self.db_logger_config_object.eng_link is None: - # uses the rundir as the default location. Should report to users? + self.db_logger_config = config.db_logger_config + if self.db_logger_config is not None and self.db_logger_config.logger_type == 'local_database'\ + and self.db_logger_config.eng_link is None: + # uses the rundir as the default location. logger.info('Local monitoring database can be found inside the run_dir at: {}'.format(self.run_dir)) - self.db_logger_config_object.eng_link = "sqlite:///{}".format(os.path.join(os.path.abspath(self.run_dir), 'parsl.db')) - if self.db_logger_config_object is None: + self.db_logger_config.eng_link = "sqlite:///{}".format(os.path.join(os.path.abspath(self.run_dir), 'monitoring.db')) + if self.db_logger_config is None: self.db_logger = get_db_logger() else: - self.db_logger = get_db_logger(db_logger_config_object=self.db_logger_config_object) + self.db_logger = get_db_logger(db_logger_config=self.db_logger_config) self.workflow_name = os.path.basename(str(inspect.stack()[1][1])) - if self.db_logger_config_object is not None and self.db_logger_config_object.workflow_name is not None: - self.workflow_name = self.db_logger_config_object.workflow_name + if self.db_logger_config is not None and self.db_logger_config.workflow_name is not None: + self.workflow_name = self.db_logger_config.workflow_name self.time_began = datetime.now() self.time_completed = None - self.run_id = self.workflow_name + "-" + str(self.time_began.minute) - self.dashboard = self.db_logger_config_object.dashboard_link if self.db_logger_config_object is not None else None + self.run_id = str(uuid4()) + self.dashboard = self.db_logger_config.dashboard_link if self.db_logger_config is not None else None # TODO: make configurable logger.info("Run id is: " + self.run_id) if self.dashboard is not None: logger.info("Dashboard is found at " + self.dashboard) # start tornado logging server - if self.db_logger_config_object is not None and self.db_logger_config_object.logger_type == 'local_database': - self.logging_server = multiprocessing.Process(target=logging_server.run, kwargs={'db_logger_config_object': self.db_logger_config_object}) + if self.db_logger_config is not None and self.db_logger_config.logger_type == 'local_database': + self.logging_server = multiprocessing.Process(target=logging_server.run, kwargs={'db_logger_config': self.db_logger_config}) self.logging_server.start() else: self.logging_server = None @@ -122,12 +123,12 @@ def __init__(self, config=Config()): 'parsl_version': get_version(), 'libsumbit_version': libsubmit.__version__, "time_began": str(self.time_began.strftime('%Y-%m-%d %H:%M:%S')), - 'time_completed': str(self.time_completed), - 'task_run_id': self.run_id, + 'time_completed': str(None), + 'run_id': self.run_id, 'rundir': self.run_dir, 'tasks_completed_count': self.tasks_completed_count, 'tasks_failed_count': self.tasks_failed_count, - 'user': getpass.getuser(), + 'user': getuser(), 'host': gethostname(), } self.db_logger.info("DFK start", extra=workflow_info) @@ -174,6 +175,7 @@ def _create_task_log_info(self, task_id, fail_mode=None): Create the dictionary that will be included in the log. """ task_log_info = {"task_" + k: v for k, v in self.tasks[task_id].items()} + task_log_info['run_id'] = self.run_id task_log_info['task_status_name'] = self.tasks[task_id]['status'].name task_log_info['tasks_failed_count'] = self.tasks_failed_count task_log_info['tasks_completed_count'] = self.tasks_completed_count @@ -243,7 +245,7 @@ def handle_update(self, task_id, future, memo_cbk=False): if not self._config.lazy_errors: logger.debug("Eager fail, skipping retry logic") self.tasks[task_id]['status'] = States.failed - if self.db_logger_config_object is not None: + if self.db_logger_config is not None: task_log_info = self._create_task_log_info(task_id, 'eager') self.db_logger.info("Task Fail", extra=task_log_info) raise e @@ -251,7 +253,7 @@ def handle_update(self, task_id, future, memo_cbk=False): if self.tasks[task_id]['fail_count'] <= self._config.retries: self.tasks[task_id]['status'] = States.pending logger.debug("Task {} marked for retry".format(task_id)) - if self.db_logger_config_object is not None: + if self.db_logger_config is not None: task_log_info = self._create_task_log_info(task_id, 'lazy') self.db_logger.info("Task Retry", extra=task_log_info) @@ -263,7 +265,7 @@ def handle_update(self, task_id, future, memo_cbk=False): self.tasks_failed_count += 1 self.tasks[task_id]['time_completed'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) - if self.db_logger_config_object is not None: + if self.db_logger_config is not None: task_log_info = self._create_task_log_info(task_id, 'lazy') self.db_logger.info("Task Retry Failed", extra=task_log_info) @@ -274,7 +276,7 @@ def handle_update(self, task_id, future, memo_cbk=False): logger.info("Task {} completed".format(task_id)) self.tasks[task_id]['time_completed'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) - if self.db_logger_config_object is not None: + if self.db_logger_config is not None: task_log_info = self._create_task_log_info(task_id) self.db_logger.info("Task Done", extra=task_log_info) @@ -335,7 +337,7 @@ def handle_update(self, task_id, future, memo_cbk=False): "Task {} deferred due to dependency failure".format(tid)) # Raise a dependency exception self.tasks[tid]['status'] = States.dep_fail - if self.db_logger_config_object is not None: + if self.db_logger_config is not None: task_log_info = self._create_task_log_info(task_id, 'lazy') self.db_logger.info("Task Dep Fail", extra=task_log_info) @@ -386,12 +388,12 @@ def launch_task(self, task_id, executable, *args, **kwargs): executor = self.executors[executor_label] except Exception as e: logger.exception("Task {} requested invalid executor {}: config is\n{}".format(task_id, executor_label, self._config)) - if self.db_logger_config_object is not None: - executable = app_monitor.monitor_wrapper(executable, task_id, self.db_logger_config_object, self.run_id) + if self.db_logger_config is not None: + executable = app_monitor.monitor_wrapper(executable, task_id, self.db_logger_config, self.run_id) exec_fu = executor.submit(executable, *args, **kwargs) self.tasks[task_id]['status'] = States.running self.tasks[task_id]['time_started'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) - if self.db_logger_config_object is not None: + if self.db_logger_config is not None: task_log_info = self._create_task_log_info(task_id) self.db_logger.info("Task Launch", extra=task_log_info) exec_fu.retries_left = self._config.retries - \ @@ -572,7 +574,6 @@ def submit(self, func, *args, executors='all', fn_hash=None, cache=False, **kwar 'id': task_id, 'time_started': None, 'time_completed': None, - 'run_id': self.run_id, 'app_fu': None} if task_id in self.tasks: @@ -744,7 +745,7 @@ def cleanup(self): self.db_logger.info("DFK end", extra={'tasks_failed_count': self.tasks_failed_count, 'tasks_completed_count': self.tasks_completed_count, "time_began": str(self.time_began.strftime('%Y-%m-%d %H:%M:%S')), 'time_completed': str(self.time_completed.strftime('%Y-%m-%d %H:%M:%S')), - 'task_run_id': self.run_id, 'rundir': self.run_dir}) + 'run_id': self.run_id, 'rundir': self.run_dir}) if self.logging_server is not None: self.logging_server.terminate() self.logging_server.join() diff --git a/parsl/monitoring/app_monitor.py b/parsl/monitoring/app_monitor.py index c0e414091a..1e5a4c26d1 100644 --- a/parsl/monitoring/app_monitor.py +++ b/parsl/monitoring/app_monitor.py @@ -5,17 +5,17 @@ from parsl.monitoring.db_logger import get_db_logger -def monitor(pid, task_id, db_logger_config_object, run_id): +def monitor(pid, task_id, db_logger_config, run_id): """Internal Monitors the Parsl task's resources by pointing psutil to the task's pid and watching it and its children. """ - if db_logger_config_object is None: + if db_logger_config is None: logger = get_db_logger() else: - logger = get_db_logger(logger_name=run_id + str(task_id), db_logger_config_object=db_logger_config_object) + logger = get_db_logger(logger_name=run_id + str(task_id), db_logger_config=db_logger_config) - sleep_duration = db_logger_config_object.resource_loop_sleep_duration + sleep_duration = db_logger_config.resource_loop_sleep_duration time.sleep(sleep_duration) # these values are simple to log. Other information is available in special formats such as memory below. simple = ["cpu_num", 'cpu_percent', 'create_time', 'cwd', 'exe', 'memory_percent', 'nice', 'name', 'num_threads', 'pid', 'ppid', 'status', 'username'] @@ -31,7 +31,7 @@ def to_mb(some_bytes): while True: try: d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple} - d["task_run_id"] = run_id + d["run_id"] = run_id d["task_id"] = task_id children = pm.children(recursive=True) d["psutil_cpu_count"] = psutil.cpu_count() @@ -69,16 +69,16 @@ def to_mb(some_bytes): finally: logger.info("task resource update", extra=d) - sleep_duration = db_logger_config_object.resource_loop_sleep_duration + sleep_duration = db_logger_config.resource_loop_sleep_duration time.sleep(sleep_duration) -def monitor_wrapper(f, task_id, db_logger_config_object, run_id): +def monitor_wrapper(f, task_id, db_logger_config, run_id): """ Internal Wrap the Parsl app with a function that will call the monitor function and point it at the correct pid when the task begins. """ def wrapped(*args, **kwargs): - p = Process(target=monitor, args=(os.getpid(), task_id, db_logger_config_object, run_id)) + p = Process(target=monitor, args=(os.getpid(), task_id, db_logger_config, run_id)) p.start() try: return f(*args, **kwargs) diff --git a/parsl/monitoring/db_local.py b/parsl/monitoring/db_local.py index 838e642c5e..9a07bd0198 100644 --- a/parsl/monitoring/db_local.py +++ b/parsl/monitoring/db_local.py @@ -2,6 +2,8 @@ import time import json +logger = logging.getLogger(__name__) + try: from tornado import httpclient import sqlalchemy as sa @@ -14,7 +16,7 @@ def create_workflows_table(meta): return Table( 'workflows', meta, - Column('task_run_id', Text, nullable=False, primary_key=True), + Column('run_id', Text, nullable=False, primary_key=True), Column('time_began', Text, nullable=False), Column('time_completed', Text), Column('host', Text, nullable=False), @@ -32,19 +34,18 @@ def create_task_status_table(meta): Column('task_id', Text, sa.ForeignKey('task.task_id'), nullable=False), Column('task_status', Text, nullable=False), Column('task_status_name', Text, nullable=False), - # Column('timestamp', Text, nullable=False, primary_key=True), Column('timestamp', Text, nullable=False), - Column('task_run_id', Text, sa.ForeignKey('workflows.task_run_id'), nullable=False), + Column('run_id', Text, sa.ForeignKey('workflows.run_id'), nullable=False), Column('task_fail_count', Text, nullable=False), Column('task_fail_history', Text, nullable=True), ) -def create_workflow_table(meta): +def create_task_table(meta): return Table( 'task', meta, Column('task_id', Text, nullable=False), - Column('task_run_id', Text, sa.ForeignKey('workflows.task_run_id'), nullable=False), + Column('run_id', Text, sa.ForeignKey('workflows.run_id'), nullable=False), Column('task_executor', Text, nullable=False), Column('task_fn_hash', Text, nullable=False), Column('task_time_started', Text, nullable=False), @@ -61,9 +62,8 @@ def create_task_resource_table(meta): return Table( 'task_resources', meta, Column('task_id', Text, sa.ForeignKey('task.task_id'), nullable=False), - # Column('timestamp', Text, nullable=False, primary_key=True), Column('timestamp', Text, nullable=False), - Column('task_run_id', Text, sa.ForeignKey('workflows.task_run_id'), nullable=False), + Column('run_id', Text, sa.ForeignKey('workflows.run_id'), nullable=False), Column('psutil_process_pid', Text, nullable=True), Column('psutil_process_cpu_percent', Text, nullable=True), Column('psutil_process_memory_percent', Text, nullable=True), @@ -102,18 +102,18 @@ def emit(self, record): # formating values to convert from python or parsl to db standards info['task_fail_history'] = str(info.get('task_fail_history', None)) info['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(record.created)) - run_id = info['task_run_id'] + run_id = info['run_id'] # if workflow or task has completed, update their entries with the time. if 'time_completed' in info.keys() and info['time_completed'] != 'None': workflows = self.meta.tables['workflows'] - up = workflows.update().values(time_completed=info['time_completed']).where(workflows.c.task_run_id == run_id) + up = workflows.update().values(time_completed=info['time_completed']).where(workflows.c.run_id == run_id) con.execute(up) return if 'task_time_completed' in info.keys() and info['task_time_completed'] is not None: workflow = self.meta.tables['task'] up = workflow.update().values(task_time_completed=info['task_time_completed']).where(workflow.c.task_id == info['task_id'])\ - .where(workflow.c.task_run_id == run_id) + .where(workflow.c.run_id == run_id) con.execute(up) # create workflows table if this is a new database without one @@ -121,7 +121,7 @@ def emit(self, record): workflows = create_workflows_table(self.meta) self.meta.create_all(con) # if this is the first sight of the workflow, add it to the workflows table - if len(con.execute(self.meta.tables['workflows'].select(self.meta.tables['workflows'].c.task_run_id == run_id)).fetchall()) == 0: + if len(con.execute(self.meta.tables['workflows'].select(self.meta.tables['workflows'].c.run_id == run_id)).fetchall()) == 0: workflows = self.meta.tables['workflows'] ins = workflows.insert().values(**{k: v for k, v in info.items() if k in workflows.c}) con.execute(ins) @@ -129,23 +129,23 @@ def emit(self, record): # if log has task counts, update the workflow entry in the workflows table if 'tasks_completed_count' in info.keys(): workflows = self.meta.tables['workflows'] - up = workflows.update().values(tasks_completed_count=info['tasks_completed_count']).where(workflows.c.task_run_id == run_id) + up = workflows.update().values(tasks_completed_count=info['tasks_completed_count']).where(workflows.c.run_id == run_id) con.execute(up) if 'tasks_failed_count' in info.keys(): workflows = self.meta.tables['workflows'] - up = workflows.update().values(tasks_failed_count=info['tasks_failed_count']).where(workflows.c.task_run_id == run_id) + up = workflows.update().values(tasks_failed_count=info['tasks_failed_count']).where(workflows.c.run_id == run_id) con.execute(up) - # create workflow table if this is a new run without one + # create task table if this is a new run without one if 'task' not in self.meta.tables.keys(): - workflow = create_workflow_table(self.meta) + workflow = create_task_table(self.meta) self.meta.create_all(con) # check to make sure it is a task log and not just a workflow overview log if info.get('task_id', None) is not None: # if this is the first sight of the task in the workflow, add it to the workflow table if len(con.execute(self.meta.tables['task'].select(self.meta.tables['task'].c.task_id == info['task_id']) - .where(self.meta.tables['task'].c.task_run_id == run_id)).fetchall()) == 0: + .where(self.meta.tables['task'].c.run_id == run_id)).fetchall()) == 0: if 'psutil_process_pid' in info.keys(): # this is the race condition that a resource log is before a status log so ignore this resource update return @@ -179,7 +179,7 @@ def emit(self, record): return except Exception as e: - print(str(e)) + logger.error("Try a couple times since some known issues can occur. Number of Failures: {} Error: {}".format(t, str(e))) failed = True time.sleep(5) if not failed: @@ -210,7 +210,7 @@ def emit(self, record): http_client.fetch(self.addr, method='POST', body=bod, request_timeout=self.request_timeout) except Exception as e: # Other errors are possible, such as IOError. - print("Error: " + str(e)) + logger.error(str(e)) time.sleep(self.on_fail_sleep_duration) else: break diff --git a/parsl/monitoring/db_logger.py b/parsl/monitoring/db_logger.py index c7128b5f3d..6edac78f6c 100644 --- a/parsl/monitoring/db_logger.py +++ b/parsl/monitoring/db_logger.py @@ -35,8 +35,8 @@ def emit(self, record): class LoggerConfig(): """ This is a config class for creating a logger. """ def __init__(self, - host='search-parsl-logging-test-2yjkk2wuoxukk2wdpiicl7mcrm.us-east-1.es.amazonaws.com', - port=443, + host=None, + port=None, enable_ssl=True, logger_type='local_database', index_name="my_python_index", @@ -51,10 +51,10 @@ def __init__(self, Parameters ---------- - host : str, optional - Used with Elasticsearch logging, the location of where to access Elasticsearch. - port : int, optional - Used with Elasticsearch logging, the port of where to access Elasticsearch. + host : str + Used with Elasticsearch logging, the location of where to access Elasticsearch. Required when using logging_type = 'elasticsearch'. + port : int + Used with Elasticsearch logging, the port of where to access Elasticsearch. Required when using logging_type = 'elasticsearch'. enable_ssl : Bool, optional Used with Elasticsearch logging, whether to use ssl when connecting to Elasticsearch. logger_type : str, optional @@ -63,7 +63,7 @@ def __init__(self, Used with Elasticsearch logging, the name of the index to log to. logger_name : str, optional Used with both Elasticsearch and local db logging to define naming conventions for loggers. - eng_ling : str, optional + eng_link : str, optional Used with local database logging, SQLalchemy engine link to define where to connect to the database. If not set, DFK init will use a sqlite3 database inside the rundir. version : str, optional @@ -77,14 +77,19 @@ def __init__(self, workflow_name : str, optional Name to record as the workflow base name, defaults to the name of the parsl script file if left as None. """ - if host.startswith('http'): - raise ValueError('Do not include "http(s)://" in elasticsearch host string.') - self.host = host - self.port = port - self.enable_ssl = enable_ssl if logger_type not in ['local_database', 'elasticsearch']: raise ValueError('Value of logger type was invalid, choices arei ' + str(['local_database', 'elasticsearch'])) self.logger_type = logger_type + if logger_type == 'elasticsearch': + if host is None: + raise ValueError('If using elastic search must specify a host location of the elasticsearch instance.') + if port is None: + raise ValueError('If using elastic search must specify a port of the elasticsearch instance.') + if host.startswith('http'): + raise ValueError('Do not include "http(s)://" in elasticsearch host string.') + self.host = host + self.port = port + self.enable_ssl = enable_ssl self.index_name = index_name self.logger_name = logger_name self.eng_link = eng_link @@ -100,7 +105,7 @@ def __init__(self, def get_db_logger( logger_name='parsl_db_logger', is_logging_server=False, - db_logger_config_object=None, + db_logger_config=None, **kwargs): """ Parameters @@ -109,8 +114,8 @@ def get_db_logger( Name of the logger to use. Prevents adding repeat handlers or incorrect handlers is_logging_server : Bool, optional Used internally to determine which handler to return when using local db logging - db_logger_config_object : LoggerConfig, optional - Pass in a logger class object instead of a dict to use for generating loggers. + db_logger_config : LoggerConfig, optional + Pass in a logger class object to use for generating loggers. Returns ------- @@ -122,42 +127,42 @@ def get_db_logger( """ logger = logging.getLogger(logger_name) - if db_logger_config_object is None: + if db_logger_config is None: logger.addHandler(NullHandler()) return logger - if db_logger_config_object.logger_type == 'elasticsearch': + if db_logger_config.logger_type == 'elasticsearch': if not _es_logging_enabled: raise OptionalModuleMissing( ['CMRESHandler'], "Logging to ElasticSearch requires the cmreslogging module") - handler = CMRESHandler(hosts=[{'host': db_logger_config_object.host, - 'port': db_logger_config_object.port}], - use_ssl=db_logger_config_object.enable_ssl, + handler = CMRESHandler(hosts=[{'host': db_logger_config.host, + 'port': db_logger_config.port}], + use_ssl=db_logger_config.enable_ssl, auth_type=CMRESHandler.AuthType.NO_AUTH, - es_index_name=db_logger_config_object.index_name, + es_index_name=db_logger_config.index_name, es_additional_fields={ 'Campaign': "test", - 'Version': db_logger_config_object.version, + 'Version': db_logger_config.version, 'Username': getpass.getuser()}) - logger = logging.getLogger(db_logger_config_object.logger_name) + logger = logging.getLogger(db_logger_config.logger_name) logger.setLevel(logging.INFO) logger.addHandler(handler) - elif db_logger_config_object.logger_type == 'local_database' and not is_logging_server: + elif db_logger_config.logger_type == 'local_database' and not is_logging_server: # add a handler that will pass logs to the logging server - handler = RemoteHandler(db_logger_config_object.web_app_host, db_logger_config_object.web_app_port) + handler = RemoteHandler(db_logger_config.web_app_host, db_logger_config.web_app_port) # use the specific name generated by the server or the monitor wrapper logger = logging.getLogger(logger_name) logger.setLevel(logging.INFO) logger.addHandler(handler) - elif db_logger_config_object.logger_type == 'local_database' and is_logging_server: + elif db_logger_config.logger_type == 'local_database' and is_logging_server: # add a handler that will take logs being recieved on the server and log them to the database - handler = DatabaseHandler(db_logger_config_object.eng_link) + handler = DatabaseHandler(db_logger_config.eng_link) # use the specific name generated by the server or the monitor wrapper logger = logging.getLogger(logger_name) logger.setLevel(logging.INFO) logger.addHandler(handler) else: - logger.addHandler(NullHandler()) + raise ValueError('logger_type must be one of ["local_database", "elasticsearch"]') return logger diff --git a/parsl/monitoring/logging_server.py b/parsl/monitoring/logging_server.py index 18c161fb8e..f36c9c7396 100644 --- a/parsl/monitoring/logging_server.py +++ b/parsl/monitoring/logging_server.py @@ -6,9 +6,9 @@ class MainHandler(tornado.web.RequestHandler): """ A handler for all requests/logs sent to the logging server.""" - def initialize(self, db_logger_config_object): + def initialize(self, db_logger_config): """This function is called on every request which is not ideal but __init__ does not appear to work.""" - self.db_logger_config_object = db_logger_config_object + self.db_logger_config = db_logger_config def get(self): """Defines responses to get requests for the / ending. Not used by Parsl but could be.""" @@ -25,16 +25,16 @@ def post(self): try: self.application.logger.info('from tornado task ' + str(arg.get('task_id', 'NO TASK')), extra=arg) except AttributeError as e: - self.application.logger = get_db_logger(logger_name='loggingserver', is_logging_server=True, db_logger_config_object=self.db_logger_config_object) + self.application.logger = get_db_logger(logger_name='loggingserver', is_logging_server=True, db_logger_config=self.db_logger_config) self.application.logger.info('from tornado task ' + str(arg.get('task_id', 'NO TASK')), extra=arg) -def run(db_logger_config_object): +def run(db_logger_config): """ Set up the logging server according to configurations the user specified. This is the function launched as a separate process from the DFK in order to start logging. """ - # Assumtion that db_logger_config_object is not none because if it were this server should not have been started - app = tornado.web.Application([(r"/", MainHandler, dict(db_logger_config_object=db_logger_config_object))]) - app.listen(db_logger_config_object.web_app_port) + # Assumtion that db_logger_config is not none because if it were this server should not have been started + app = tornado.web.Application([(r"/", MainHandler, dict(db_logger_config=db_logger_config))]) + app.listen(db_logger_config.web_app_port) tornado.ioloop.IOLoop.current().start() diff --git a/requirements.txt b/requirements.txt index e82b22abf8..79da2cecaa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ ipykernel requests psutil CMRESHandler +sqlalchemy tabulate