Skip to content

Commit

Permalink
Convert can.Logger and can.LogReader into functions, improve docs (
Browse files Browse the repository at this point in the history
…#1703)

* turn can.Logger and can.LogReader into functions

* improve file io documentation

* fix doctest

* don't check if class is None, check length of suffixes

* fix typo

* fix issues after rebase

* fix test issues, use context manager to fix random PyPy failures

* align can.LogReader docstring to can.Logger

* replace deprecated `context` with `config_context`
  • Loading branch information
zariiii9003 committed Jan 14, 2024
1 parent e173bf1 commit ec105ac
Show file tree
Hide file tree
Showing 12 changed files with 448 additions and 419 deletions.
6 changes: 4 additions & 2 deletions can/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"CSVWriter",
"Logger",
"LogReader",
"MESSAGE_READERS",
"MESSAGE_WRITERS",
"MessageSync",
"MF4Reader",
"MF4Writer",
Expand All @@ -39,8 +41,8 @@
]

# Generic
from .logger import BaseRotatingLogger, Logger, SizedRotatingLogger
from .player import LogReader, MessageSync
from .logger import MESSAGE_WRITERS, BaseRotatingLogger, Logger, SizedRotatingLogger
from .player import MESSAGE_READERS, LogReader, MessageSync

# isort: split

Expand Down
197 changes: 98 additions & 99 deletions can/io/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Callable,
ClassVar,
Dict,
Final,
Literal,
Optional,
Set,
Expand All @@ -24,15 +25,13 @@
from typing_extensions import Self

from .._entry_points import read_entry_points
from ..listener import Listener
from ..message import Message
from ..typechecking import AcceptedIOType, FileLike, StringPathLike
from .asc import ASCWriter
from .blf import BLFWriter
from .canutils import CanutilsLogWriter
from .csv import CSVWriter
from .generic import (
BaseIOHandler,
BinaryIOMessageWriter,
FileIOMessageWriter,
MessageWriter,
Expand All @@ -42,20 +41,85 @@
from .sqlite import SqliteWriter
from .trc import TRCWriter

#: A map of file suffixes to their corresponding
#: :class:`can.io.generic.MessageWriter` class
MESSAGE_WRITERS: Final[Dict[str, Type[MessageWriter]]] = {
".asc": ASCWriter,
".blf": BLFWriter,
".csv": CSVWriter,
".db": SqliteWriter,
".log": CanutilsLogWriter,
".mf4": MF4Writer,
".trc": TRCWriter,
".txt": Printer,
}


def _update_writer_plugins() -> None:
"""Update available message writer plugins from entry points."""
for entry_point in read_entry_points("can.io.message_writer"):
if entry_point.key in MESSAGE_WRITERS:
continue

writer_class = entry_point.load()
if issubclass(writer_class, MessageWriter):
MESSAGE_WRITERS[entry_point.key] = writer_class


def _get_logger_for_suffix(suffix: str) -> Type[MessageWriter]:
try:
return MESSAGE_WRITERS[suffix]
except KeyError:
raise ValueError(
f'No write support for unknown log format "{suffix}"'
) from None


class Logger(MessageWriter):
def _compress(
filename: StringPathLike, **kwargs: Any
) -> Tuple[Type[MessageWriter], FileLike]:
"""
Logs CAN messages to a file.
Return the suffix and io object of the decompressed file.
File will automatically recompress upon close.
"""
suffixes = pathlib.Path(filename).suffixes
if len(suffixes) != 2:
raise ValueError(
f"No write support for unknown log format \"{''.join(suffixes)}\""
) from None

real_suffix = suffixes[-2].lower()
if real_suffix in (".blf", ".db"):
raise ValueError(
f"The file type {real_suffix} is currently incompatible with gzip."
)
logger_type = _get_logger_for_suffix(real_suffix)
append = kwargs.get("append", False)

if issubclass(logger_type, BinaryIOMessageWriter):
mode = "ab" if append else "wb"
else:
mode = "at" if append else "wt"

return logger_type, gzip.open(filename, mode)


def Logger( # noqa: N802
filename: Optional[StringPathLike], **kwargs: Any
) -> MessageWriter:
"""Find and return the appropriate :class:`~can.io.generic.MessageWriter` instance
for a given file suffix.
The format is determined from the file suffix which can be one of:
* .asc: :class:`can.ASCWriter`
* .asc :class:`can.ASCWriter`
* .blf :class:`can.BLFWriter`
* .csv: :class:`can.CSVWriter`
* .db: :class:`can.SqliteWriter`
* .db :class:`can.SqliteWriter`
* .log :class:`can.CanutilsLogWriter`
* .mf4 :class:`can.MF4Writer`
(optional, depends on `asammdf <https://github.com/danielhrisca/asammdf>`_)
* .trc :class:`can.TRCWriter`
* .txt :class:`can.Printer`
* .mf4 :class:`can.MF4Writer` (optional, depends on asammdf)
Any of these formats can be used with gzip compression by appending
the suffix .gz (e.g. filename.asc.gz). However, third-party tools might not
Expand All @@ -65,97 +129,33 @@ class Logger(MessageWriter):
The log files may be incomplete until `stop()` is called due to buffering.
:param filename:
the filename/path of the file to write to,
may be a path-like object or None to
instantiate a :class:`~can.Printer`
:raises ValueError:
if the filename's suffix is of an unknown file type
.. note::
This class itself is just a dispatcher, and any positional and keyword
This function itself is just a dispatcher, and any positional and keyword
arguments are passed on to the returned instance.
"""

fetched_plugins = False
message_writers: ClassVar[Dict[str, Type[MessageWriter]]] = {
".asc": ASCWriter,
".blf": BLFWriter,
".csv": CSVWriter,
".db": SqliteWriter,
".log": CanutilsLogWriter,
".mf4": MF4Writer,
".trc": TRCWriter,
".txt": Printer,
}

@staticmethod
def __new__( # type: ignore[misc]
cls: Any, filename: Optional[StringPathLike], **kwargs: Any
) -> MessageWriter:
"""
:param filename:
the filename/path of the file to write to,
may be a path-like object or None to
instantiate a :class:`~can.Printer`
:raises ValueError:
if the filename's suffix is of an unknown file type
"""
if filename is None:
return Printer(**kwargs)

if not Logger.fetched_plugins:
Logger.message_writers.update(
{
writer.key: cast(Type[MessageWriter], writer.load())
for writer in read_entry_points("can.io.message_writer")
}
)
Logger.fetched_plugins = True

suffix = pathlib.PurePath(filename).suffix.lower()

file_or_filename: AcceptedIOType = filename
if suffix == ".gz":
logger_type, file_or_filename = Logger.compress(filename, **kwargs)
else:
logger_type = cls._get_logger_for_suffix(suffix)

return logger_type(file=file_or_filename, **kwargs)

@classmethod
def _get_logger_for_suffix(cls, suffix: str) -> Type[MessageWriter]:
try:
logger_type = Logger.message_writers[suffix]
if logger_type is None:
raise ValueError(f'failed to import logger for extension "{suffix}"')
return logger_type
except KeyError:
raise ValueError(
f'No write support for this unknown log format "{suffix}"'
) from None

@classmethod
def compress(
cls, filename: StringPathLike, **kwargs: Any
) -> Tuple[Type[MessageWriter], FileLike]:
"""
Return the suffix and io object of the decompressed file.
File will automatically recompress upon close.
"""
real_suffix = pathlib.Path(filename).suffixes[-2].lower()
if real_suffix in (".blf", ".db"):
raise ValueError(
f"The file type {real_suffix} is currently incompatible with gzip."
)
logger_type = cls._get_logger_for_suffix(real_suffix)
append = kwargs.get("append", False)

if issubclass(logger_type, BinaryIOMessageWriter):
mode = "ab" if append else "wb"
else:
mode = "at" if append else "wt"
if filename is None:
return Printer(**kwargs)

return logger_type, gzip.open(filename, mode)
_update_writer_plugins()

def on_message_received(self, msg: Message) -> None:
pass
suffix = pathlib.PurePath(filename).suffix.lower()
file_or_filename: AcceptedIOType = filename
if suffix == ".gz":
logger_type, file_or_filename = _compress(filename, **kwargs)
else:
logger_type = _get_logger_for_suffix(suffix)
return logger_type(file=file_or_filename, **kwargs)


class BaseRotatingLogger(Listener, BaseIOHandler, ABC):
class BaseRotatingLogger(MessageWriter, ABC):
"""
Base class for rotating CAN loggers. This class is not meant to be
instantiated directly. Subclasses must implement the :meth:`should_rollover`
Expand Down Expand Up @@ -187,20 +187,15 @@ class BaseRotatingLogger(Listener, BaseIOHandler, ABC):
rollover_count: int = 0

def __init__(self, **kwargs: Any) -> None:
Listener.__init__(self)
BaseIOHandler.__init__(self, file=None)
super().__init__(**{**kwargs, "file": None})

self.writer_kwargs = kwargs

# Expected to be set by the subclass
self._writer: Optional[FileIOMessageWriter] = None

@property
@abstractmethod
def writer(self) -> FileIOMessageWriter:
"""This attribute holds an instance of a writer class which manages the actual file IO."""
if self._writer is not None:
return self._writer
raise ValueError(f"{self.__class__.__name__}.writer is None.")
raise NotImplementedError

def rotation_filename(self, default_name: StringPathLike) -> StringPathLike:
"""Modify the filename of a log file when rotating.
Expand Down Expand Up @@ -270,7 +265,7 @@ def _get_new_writer(self, filename: StringPathLike) -> FileIOMessageWriter:
logger = Logger(filename=filename, **self.writer_kwargs)
if isinstance(logger, FileIOMessageWriter):
return logger
if isinstance(logger, Printer) and logger.file is not None:
elif isinstance(logger, Printer) and logger.file is not None:
return cast(FileIOMessageWriter, logger)

raise ValueError(
Expand Down Expand Up @@ -373,6 +368,10 @@ def __init__(

self._writer = self._get_new_writer(self.base_filename)

@property
def writer(self) -> FileIOMessageWriter:
return self._writer

def should_rollover(self, msg: Message) -> bool:
if self.max_bytes <= 0:
return False
Expand Down
Loading

0 comments on commit ec105ac

Please sign in to comment.