# Rich Logging and printing

In [1]:
# | default_exp logger

In [2]:
# | export
# | hide
from rich.console import Console
from rich.theme import Theme
from loguru import logger
from datetime import datetime
from fastcore.basics import patch_to, ifnone
from rich.logging import RichHandler
from pathlib import Path
from contextlib import contextmanager
from rich._spinners import SPINNERS
import random

# from torch_snippets.ipython import is_in_notebook

from functools import wraps
import time

In [3]:
# | export
# | hide
def get_console(width=None):
    return Console(
        width=width,
        theme=Theme(
            {
                "repr.number": "bold cyan",
                "repr.string": "bold green",
                "logging.level.info": "dim yellow",
                "logging.level.warning": "dim red",
                "logging.level.exception": "bold red",
            }
        ),
    )


console = get_console()
# print = console.print

In [4]:
print("The Number is 128")
print(
    {
        "a": 1,
        "b": [
            {1, 2, 3},
            "lskjdf",
        ],
        "c": 1,
        "d": [
            {1, 2, 3},
            "lskjdf",
        ],
        "ae": 1,
        "bf": [
            {1, 2, 3},
            "lskjdf",
        ],
        "ag": 1,
        "ba": [
            {1, 2, 3},
            "lskjdf",
        ],
    }
)

In [5]:
print(r"\x86")

In [6]:
# | export
# | hide
@patch_to(RichHandler)
def render(
    self,
    *,
    record,
    traceback,
    message_renderable: "ConsoleRenderable",
) -> "ConsoleRenderable":
    """patched the renderer to print function name as well"""
    path = Path(record.pathname).name
    level = self.get_level_text(record)
    time_format = None if self.formatter is None else self.formatter.datefmt
    log_time = datetime.fromtimestamp(record.created)

    log_renderable = self._log_render(
        self.console,
        [message_renderable] if not traceback else [message_renderable, traceback],
        log_time=log_time,
        time_format=time_format,
        level=level,
        path=path,
        line_no=f"{record.funcName}:{record.lineno}",
        link_path=(
            f"{record.pathname}:{record.lineno}" if self.enable_link_path else None
        ),
    )
    return log_renderable


def reset_logger(level="INFO", width=172, silent=True, disable_stdout=False):
    if level is not None:
        [logger.remove() for _ in range(100)]
        if not disable_stdout:
            logger.configure(
                handlers=[
                    {
                        "sink": RichHandler(
                            rich_tracebacks=True,
                            console=console,
                            tracebacks_show_locals=False,
                        ),
                        "format": "<level>{message}</level>",
                        "backtrace": True,
                        "level": level.upper(),
                    }
                ],
            )
    if width is not None:
        for handler_id in logger._core.handlers:
            try:
                handler = logger._core.handlers[handler_id]
                handler._sink._handler.console = get_console(width=width)
            except:
                ...
    if not silent:
        logger.info(f"reset logger's console width to {width} and level to {level}!")


reset_logger_width = lambda width: reset_logger(width=width)

reset_logger()

logger = logger

Debug = lambda *x, depth=0: logger.opt(depth=depth + 1).log(
    "DEBUG", x[0] if len(x) == 1 else "; ".join([str(i) for i in x])
)
Info = lambda *x, depth=0: logger.opt(depth=depth + 1).log(
    "INFO", x[0] if len(x) == 1 else "; ".join([str(i) for i in x])
)
Warn = lambda *x, depth=0: logger.opt(depth=depth + 1).log(
    "WARNING", x[0] if len(x) == 1 else "; ".join([str(i) for i in x])
)
Excep = lambda *x, depth=0: logger.opt(depth=depth + 1).log(
    "ERROR", x[0] if len(x) == 1 else "; ".join([str(i) for i in x])
)

In [7]:
a = {1: 1221, 2: 2342}
del a[1]
a

{2: 2342}

In [8]:
Debug("TESTING {1,2,3}")
Info("TESTING {1,2,3}")
Warn("TESTING {1,2,3}")
Excep("TESTING {1,2,3}")

In [9]:
# | export
def enter_exit(func):
    """
    Logs the time taken to execute a function along with entry & exit time stamps
    """
    logger_ = logger.opt(depth=1)

    @wraps(func)
    def function_timer(*args, **kwargs):
        tic = time.time()
        logger_.log("DEBUG", f"Entered function `{func.__name__}`")
        o = func(*args, **kwargs)
        toc = time.time()
        logger_.log(
            "DEBUG", f"Exiting function `{func.__name__}` after {toc-tic:.3f} seconds"
        )
        return o

    return function_timer

In [10]:
@enter_exit
def add(x, y):
    print("sleeping...")
    time.sleep(2)
    return x + y


add(1, 23)

24

In [11]:
def do():
    try:
        1 / 0
    except Exception as e:
        # console.print_exception(max_frames=20)
        logger.exception(e)


def do2():
    do()


do2()

In [12]:
# | export
def get_logger_level():
    lv = [
        l
        for l, v in logger._core.levels_lookup.items()
        if v[2] == logger._core.min_level
    ][0]
    return lv.lower()


@contextmanager
def _logger_mode_context(level):
    lv = get_logger_level()
    try:
        reset_logger(level.upper())
        yield
    finally:
        reset_logger(lv.upper())


def logger_mode(level):
    if callable(level):
        # Used as a decorator without arguments
        func = level

        @wraps(func)
        def wrapper(*args, **kwargs):
            with _logger_mode_context("DEBUG"):
                return func(*args, **kwargs)

        return wrapper
    else:
        # Used as a decorator with arguments or as a context manager
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                with _logger_mode_context(level):
                    return func(*args, **kwargs)

            return wrapper

        return _logger_mode_context(level) if not callable(level) else decorator


def in_logger_mode(level):
    return get_logger_level() == level


warn_mode = lambda: logger_mode("warning")
info_mode = lambda: logger_mode("info")
debug_mode = lambda: logger_mode("debug")
excep_mode = lambda: logger_mode("error")

in_warn_mode = lambda: in_logger_mode("warning")
in_info_mode = lambda: in_logger_mode("info")
in_debug_mode = lambda: in_logger_mode("debug")
in_excep_mode = lambda: in_logger_mode("error")

In [None]:
def do():
    Debug(1)
    Info(2)
    Warn(3)
    Excep(4)


def line(x):
    sep = "=" * 20
    print(f"{sep}{x}{sep}")
    print(f"{in_excep_mode()=}")
    print(f"{in_warn_mode()=}")
    print(f"{in_info_mode()=}")
    print(f"{in_debug_mode()=}")


reset_logger()

with excep_mode():
    line("Excep mode")
    do()

with warn_mode():
    line("Warn mode")
    do()

with info_mode():
    line("Info mode")
    do()

with debug_mode():
    line("Debug mode")
    do()

In [14]:
reset_logger("debug")
print(in_debug_mode())
reset_logger()
print(in_debug_mode())
with debug_mode():
    print(in_debug_mode())
print(in_debug_mode())

In [15]:
# | export

frames = [
    "Guess the anime (while you wait): 🍥🍃🔥🥋🦊🍜🌆🌄    ",  # Naruto
    "Guess the anime (while you wait): 🐉💥🥋🚀🔥🐲🌌🍃    ",  # Dragon Ball Z
    "Guess the anime (while you wait): 👒☠️⛵🏴‍☠️🍖🍗🌊🏝️🐒  ",  # One Piece
    "Guess the anime (while you wait): 🪶🏰🧑‍🔬🦿🪓🌅🐦👹    ",  # Attack on Titan
    "Guess the anime (while you wait): 📓✒️🧑‍⚖️☠️🕊️🔍🌃      ",  # Death Note
    "Guess the anime (while you wait): 🔧🔮⚗️🤖👁️‍🗨️💥🔥      ",  # Fullmetal Alchemist
    "Guess the anime (while you wait): 🦸‍♂️🦸‍♀️💥🏫🌟🌆🔥      ",  # My Hero Academia
    "Guess the anime (while you wait): 🤖🌃🌌🛰️🪐👁️‍🗨️💥      ",  # Neon Genesis Evangelion
    "Guess the anime (while you wait): 🤠🚀🌌🎷🌟🔫💰      ",  # Cowboy Bebop
    "Guess the anime (while you wait): 👧🐉🏯🌟🍜🛁🌌      ",  # Spirited Away
    "Guess the movie (while you wait): 🌌🚀🌠🪐🤖👽🔱✨🚁🌑",  # Star Wars
    "Guess the movie (while you wait): 🧙🏰🌋🗡️👑🐉🪶🍃🦅💍",  # The Lord of the Rings
    "Guess the movie (while you wait): 🧙‍♂️⚡🏰🪄🦉🔮🚂🐍🧹📚",  # Harry Potter
    "Guess the movie (while you wait): 🌿🌏🚁💎💫🌞🦋🐉🍃🔶",  # Avatar
    "Guess the movie (while you wait): 🏴‍☠️⚓🦜🌊💰🚢🔫🌴🗺️🔪",  # Pirates of the Caribbean
    "Guess the movie (while you wait): 🕶️🌐💊🏙️🧑‍💻🔫🪑🕊️🦹‍♂️🔵",  # The Matrix
    "Guess the movie (while you wait): 🦸‍♂️🦸‍♀️🌟🦹‍♂️🦹‍♀️💥🌆🤖🔨🛡️",  # The Avengers
    "Guess the movie (while you wait): 🦖🦕🌴🪶🪶🪶🚁🧑‍🔬🪶🧑‍🔬",  # Jurassic Park
    "Guess the movie (while you wait): 🦁👑🌅🌴🐗🐾🎶👑🦏🐘",  # The Lion King
    "Guess the movie (while you wait): 🧸🤠🚀🧒🐍🪁🎈🎉🤖👾",  # Toy Story
    "Guess the movie (while you wait): ❄️👑⛄❄️🧝‍♀️🦌🏰🎶🏔️🌬️",  # Frozen
    "Guess the movie (while you wait): 🚢🌊❄️💔🎻🔱⚓🚢🌟🌅",  # Titanic
    "Guess the movie (while you wait): 🚲👽🌕🌱🌌🌠🌟🪐👨‍🔬🛸",  # E.T. the Extra-Terrestrial
    "Guess the movie (while you wait): 📜🗝️🏢🔓🧱🧔💰🚶‍♂️⛏️🌅",  # The Shawshank Redemption
    "Guess the movie (while you wait): 🏃🌳🎓🏈🍫🏅🍤🚌🍤🦐",  # Forrest Gump
    "Guess the movie (while you wait): 🍊🎩👨‍👩‍👦🔫🍷🇮🇹🕶️📝🚬💔",  # The Godfather
]


@contextmanager
def notify_waiting(message):
    random.shuffle(frames)
    SPINNERS["guess"] = {"interval": 3000, "frames": frames}

    status = console.status(f"[red]\n{message:10}", spinner="clock")
    with status as _:
        s = time.perf_counter()
        yield
        time_taken = time.perf_counter() - s
        time.sleep(0.1)
    Info(f"{message} - Completed in {time_taken:.2f} s", depth=1)