Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Forward user's logging calls in submitted tasks to client session in Dask to capture logs in Prefect backend #40

Closed
anna-geller opened this issue Oct 15, 2022 · 3 comments

Comments

@anna-geller
Copy link

Currently, logs from Dask workers are not captured by Prefect because of logger serialization issues - the logger object gets serialized for submission to Dask, and then when deserialized, it reinstantiates itself and loses its configuration, preventing it from capturing these logs in the Prefect backend.

There is a PoC on this Dask issue showing how this could be accomplished: dask/distributed#2033 (comment)

Copy-pasted code snippet

import distributed
import logging

TOPIC = "forwarded-log-record"

# Handler class that gets installed inside workers. Each worker adds an instance
# of this handler to one or more loggers (possibly the root logger). Tasks
# running on that worker may then use the affected logger as normal, with the
# side effect that any LogRecords handled by the logger (or by a logger below it
# in the hierarhcy) will be published to the dask client as a
# "forwarded-log-record" event. 
class ForwardingLogHandler(logging.Handler):
    def prepare_record_attributes(self, record):
        # Adapted from the CPython standard library's logging.handlers.SocketHandler.makePickle; see
        # its source at: https://github.com/python/cpython/blob/main/Lib/logging/handlers.py
        ei = record.exc_info
        if ei:
            # just to get traceback text into record.exc_text ...
            dummy = self.format(record)
        # If msg or args are objects, they may not be available on the receiving
        # end. So we convert the msg % args to a string, save it as msg and zap
        # the args.
        d = dict(record.__dict__)
        d['msg'] = record.getMessage()
        d['args'] = None
        d['exc_info'] = None
        # delete 'message' if present: redundant with 'msg'
        d.pop('message', None)
        return d

    def emit(self, record):
        try:
            worker = distributed.get_worker()
        except ValueError:
            return
        attributes = self.prepare_record_attributes(record)
        worker.log_event(TOPIC, attributes)

# The client-side handler function for "forwarded-log-record" events. Sends the
# forwarded LogRecord to the client-side logger with the same name as that which
# originally handled the record on the worker-side.
def client_handle_forwarded_log_record(event):
    stamp, record_attrs = event
    # print(record_attrs, flush=True)
    record = logging.makeLogRecord(record_attrs)
    dest_logger = logging.getLogger(record.name)
    dest_logger.handle(record)

# User-facing interface. After creating a dask Client, call this on it to begin
# forwarding the given logger (by default the root) and all loggers under it
# from workers to the client process.
def forward_logging(client, logger_name=None, level=logging.NOTSET):
    client.subscribe_topic(TOPIC, client_handle_forwarded_log_record)

    def worker_start_log_forwarding():
        root = logging.getLogger(logger_name)
        root.addHandler(ForwardingLogHandler(level=level))

    client.register_worker_callbacks(worker_start_log_forwarding)

################
## Example usage

client = distributed.Client()
forward_logging(client)  # forward the root logger at any handled level

# Now let's configure client side logging as a user might
TYPICAL_LOGGING_CONFIG = """
version: 1
handlers:
  console:
    class : logging.StreamHandler
    formatter: default
    level   : INFO
formatters:
  default:
    format: '%(asctime)s %(levelname)-8s [worker %(worker)s] %(name)-15s %(message)s'
    datefmt: '%Y-%m-%d %H:%M:%S'
root:
  handlers:
    - console
"""
import io, yaml
config = yaml.safe_load(io.StringIO(TYPICAL_LOGGING_CONFIG))
logging.config.dictConfig(config)

# Submit a task that does some error logging. We should see output from the
# client-side StreamHandler in the output cell below.
def do_error():
    logging.getLogger("user.module").error("Hello error")

client.submit(do_error).result()

# Here's a nuance worth highlighting... even though our client-side root logger
# is configured with a level of INFO, the worker-side root loggers still have
# their default/initial level of ERROR (we haven't done any explicit logging
# configuration on the workers other than installing the ForwardingLogHandler).
# Therefore worker-side INFO logs will NOT be forwarded because they never even
# get handled on the worker-side, never giving the ForwardingLogHandler a chance
# to forward them.
def do_info_1():
    # no output on the client side
    logging.getLogger("user.module").info("Hello info the first time")

client.submit(do_info_1).result()

# It's necessary to set the client-side logger's level to INFO before the info
# message will be handled and forwarded to the client. In other words, the
# effective level of the client-side forwarded logging is the maximum of each
# logger's client-side and worker-side levels. (...and the optional ``level``
# kwarg of forward_logging, which sets a level on the ForwardingLogHandler
# itself!)
def do_info_2():
    logger = logging.getLogger("user.module")
    logger.setLevel(logging.INFO)
    # now produces output on the client side
    logger.info("Hello info the second time")

client.submit(do_info_2).result()
@zanieb
Copy link
Contributor

zanieb commented Oct 15, 2022

Currently, logs from Dask workers are not captured by Prefect because of logger serialization issues - the logger object gets serialized for submission to Dask, and then when deserialized, it reinstantiates itself and loses its configuration, preventing it from capturing these logs in the Prefect backend.

I don't think this is actually the case? We do not pass the logger object to Dask as described. I think this is caused by a separate issue with how we configure logging for a given process. I think forwarding logs back to the flow run may be interesting to pursue, but that will require a specific implementation per task runner type and has additional implications. We should just fix the actual bug which is affecting other distributed runners as well.

@anna-geller
Copy link
Author

Thanks a lot Michael. I have to admit I didn't research this at all but wanted to open an issue to keep track that retrieving logs from distributed Dask workers still seems to not be working and someone from Dask just added this workaround PoC so worth having this open issue to explore what's the root cause and whether this proposed option is useful

@ahuang11
Copy link
Contributor

I think this is fixed now:

import time

from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    logger = get_run_logger()
    logger.warning(f"#{number}")

@flow(task_runner=DaskTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

image

Please re-open if my minimal example doesn't cover the issue.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants