Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CT-1405: Refactor event logging code #6291

Merged
merged 13 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Under the Hood-20221118-145717.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Under the Hood
body: Functionality-neutral refactor of event logging system to improve encapsulation
and modularity.
time: 2022-11-18T14:57:17.792622-05:00
custom:
Author: peterallenwebb
Issue: "6139"
PR: "6291"
1 change: 0 additions & 1 deletion core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def cli_runner():
@p.cache_selected_only
@p.debug
@p.enable_legacy_logger
@p.event_buffer_size
@p.fail_fast
@p.log_cache_events
@p.log_format
Expand Down
8 changes: 0 additions & 8 deletions core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,6 @@
hidden=True,
)

event_buffer_size = click.option(
"--event-buffer-size",
envvar="DBT_EVENT_BUFFER_SIZE",
help="Sets the max number of events to buffer in EVENT_HISTORY.",
default=100000,
type=click.INT,
)

exclude = click.option("--exclude", envvar=None, help="Specify the nodes to exclude.")

fail_fast = click.option(
Expand Down
1 change: 0 additions & 1 deletion core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
static_parser: Optional[bool] = None
indirect_selection: Optional[str] = None
cache_selected_only: Optional[bool] = None
event_buffer_size: Optional[int] = None


@dataclass
Expand Down
41 changes: 26 additions & 15 deletions core/dbt/events/base_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from enum import Enum
import os
import threading
from datetime import datetime
Expand Down Expand Up @@ -43,6 +44,16 @@ def get_thread_name() -> str:
return threading.current_thread().name


# EventLevel is an Enum, but mixing in the 'str' type is suggested in the Python
# documentation, and provides support for json conversion, which fails otherwise.
class EventLevel(str, Enum):
DEBUG = "debug"
TEST = "test"
INFO = "info"
WARN = "warn"
ERROR = "error"


@dataclass
class BaseEvent:
"""BaseEvent for proto message generated python events"""
Expand All @@ -62,15 +73,15 @@ def __post_init__(self):
self.info.code = self.code()
self.info.name = type(self).__name__

def level_tag(self) -> str:
return "debug"

# This is here because although we know that info should always
# exist, mypy doesn't.
def log_level(self) -> str:
def log_level(self) -> EventLevel:
return self.info.level # type: ignore

def message(self):
def level_tag(self) -> EventLevel:
return EventLevel.DEBUG

def message(self) -> str:
raise Exception("message() not implemented for event")


Expand All @@ -85,32 +96,32 @@ class DynamicLevel(BaseEvent):
class TestLevel(BaseEvent):
__test__ = False

def level_tag(self) -> str:
return "test"
def level_tag(self) -> EventLevel:
return EventLevel.TEST


@dataclass # type: ignore[misc]
class DebugLevel(BaseEvent):
def level_tag(self) -> str:
return "debug"
def level_tag(self) -> EventLevel:
return EventLevel.DEBUG


@dataclass # type: ignore[misc]
class InfoLevel(BaseEvent):
def level_tag(self) -> str:
return "info"
def level_tag(self) -> EventLevel:
return EventLevel.INFO


@dataclass # type: ignore[misc]
class WarnLevel(BaseEvent):
def level_tag(self) -> str:
return "warn"
def level_tag(self) -> EventLevel:
return EventLevel.WARN


@dataclass # type: ignore[misc]
class ErrorLevel(BaseEvent):
def level_tag(self) -> str:
return "error"
def level_tag(self) -> EventLevel:
return EventLevel.ERROR


# Included to ensure classes with str-type message members are initialized correctly.
Expand Down
210 changes: 210 additions & 0 deletions core/dbt/events/eventmgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import betterproto
from colorama import Style
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import json
import logging
from logging.handlers import RotatingFileHandler
import threading
from typing import Any, Callable, List, Optional, TextIO
from uuid import uuid4

from dbt.events.base_types import BaseEvent, EventLevel


# A Filter is a function which takes a BaseEvent and returns True if the event
# should be logged, False otherwise.
Filter = Callable[[BaseEvent], bool]


# Default filter which logs every event
def NoFilter(_: BaseEvent) -> bool:
return True


# A Scrubber removes secrets from an input string, returning a sanitized string.
Scrubber = Callable[[str], str]


# Provide a pass-through scrubber implementation, also used as a default
def NoScrubber(s: str) -> str:
return s


class LineFormat(Enum):
PlainText = 1
DebugText = 2
Json = 3


# Map from dbt event levels to python log levels
_log_level_map = {
EventLevel.DEBUG: 10,
EventLevel.TEST: 10,
EventLevel.INFO: 20,
EventLevel.WARN: 30,
EventLevel.ERROR: 40,
}


@dataclass
class LoggerConfig:
name: str
filter: Filter = NoFilter
scrubber: Scrubber = NoScrubber
line_format: LineFormat = LineFormat.PlainText
level: EventLevel = EventLevel.WARN
use_colors: bool = False
output_stream: Optional[TextIO] = None
output_file_name: Optional[str] = None
logger: Optional[Any] = None


class _Logger:
def __init__(self):
self.name: str
self.filter: Filter
self.scrubber: Scrubber
self.level: EventLevel
self.event_manager: EventManager
self._python_logger: logging.Logger = None
self._stream: TextIO

def create_line(self, e: BaseEvent) -> str:
...

def write_line(self, e: BaseEvent):
line = self.create_line(e)
python_level = _log_level_map[e.log_level()]
if self._python_logger is not None:
self._python_logger.log(python_level, line)
elif self._stream is not None and _log_level_map[self.level] <= python_level:
self._stream.write(line + "\n")

def flush(self):
if self._python_logger is not None:
for handler in self._python_logger.handlers:
handler.flush()
elif self._stream is not None:
self._stream.flush()


class _TextLogger(_Logger):
def __init__(self):
super().__init__()
self.use_colors = True
self.use_debug_format = False

def create_line(self, e: BaseEvent) -> str:
return self.create_debug_line(e) if self.use_debug_format else self.create_info_line(e)

def create_info_line(self, e: BaseEvent) -> str:
ts: str = datetime.utcnow().strftime("%H:%M:%S")
scrubbed_msg: str = self.scrubber(e.message()) # type: ignore
return f"{self._get_color_tag()}{ts} {scrubbed_msg}"

def create_debug_line(self, e: BaseEvent) -> str:
log_line: str = ""
# Create a separator if this is the beginning of an invocation
# TODO: This is an ugly hack, get rid of it if we can
if type(e).__name__ == "MainReportVersion":
separator = 30 * "="
log_line = f"\n\n{separator} {datetime.utcnow()} | {self.event_manager.invocation_id} {separator}\n"
ts: str = datetime.utcnow().strftime("%H:%M:%S.%f")
scrubbed_msg: str = self.scrubber(e.message()) # type: ignore
log_line += f"{self._get_color_tag()}{ts} [{e.log_level():<5}]{self._get_thread_name()} {scrubbed_msg}"
return log_line

def _get_color_tag(self) -> str:
return "" if not self.use_colors else Style.RESET_ALL

def _get_thread_name(self) -> str:
thread_name = ""
if threading.current_thread().name:
thread_name = threading.current_thread().name
thread_name = thread_name[:10]
thread_name = thread_name.ljust(10, " ")
thread_name = f" [{thread_name}]:"
return thread_name


class _JsonLogger(_Logger):
def create_line(self, e: BaseEvent) -> str:
event_dict = self.event_to_dict(e)
raw_log_line = json.dumps(event_dict, sort_keys=True)
line = self.scrubber(raw_log_line) # type: ignore
return line

def event_to_dict(self, event: BaseEvent) -> dict:
event_dict = dict()
try:
# We could use to_json here, but it wouldn't sort the keys.
# The 'to_json' method just does json.dumps on the dict anyway.
event_dict = event.to_dict(casing=betterproto.Casing.SNAKE, include_default_values=True) # type: ignore
except AttributeError as exc:
event_type = type(event).__name__
raise Exception(f"type {event_type} is not serializable. {str(exc)}")
return event_dict


# Factory function which creates a logger from a config, hiding the gross details.
def _create_logger(config: LoggerConfig):
logger: _Logger
if config.line_format == LineFormat.Json:
logger = _JsonLogger()
else:
logger = _TextLogger()
logger.use_colors = config.use_colors
logger.use_debug_format = config.line_format == LineFormat.DebugText

logger.name = config.name
logger.filter = config.filter
logger.scrubber = config.scrubber
logger.level = config.level

if config.logger:
logger._python_logger = config.logger
elif config.output_stream:
logger._stream = config.output_stream
else:
log = logging.getLogger(logger.name)
log.setLevel(_log_level_map[config.level])
handler = RotatingFileHandler(
filename=str(config.output_file_name),
encoding="utf8",
maxBytes=10 * 1024 * 1024, # 10 mb
backupCount=5,
)

handler.setFormatter(logging.Formatter(fmt="%(message)s"))
log.handlers.clear()
log.addHandler(handler)

logger._python_logger = log

return logger


class EventManager:
def __init__(self):
self.loggers: List[_Logger] = []
self.callbacks: List[Callable[[BaseEvent], None]] = []
self.invocation_id: str = str(uuid4())

def fire_event(self, e: BaseEvent) -> None:
for logger in self.loggers:
if logger.filter(e): # type: ignore
logger.write_line(e)

for callback in self.callbacks:
callback(e)

def add_logger(self, config: LoggerConfig):
logger = _create_logger(config)
logger.event_manager = self
self.loggers.append(logger)

def flush(self):
for logger in self.loggers:
logger.flush()
Loading