Skip to content

Commit

Permalink
The Rotterdam release
Browse files Browse the repository at this point in the history
  • Loading branch information
githuib committed Feb 18, 2024
1 parent 32134da commit 680e195
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 117 deletions.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "powerchord"
version = "0.0.9"
version = "0.1.0"
description = "Concurrent CLI task runner"
authors = ["Huib Piguillet <huib@proton.me>"]
maintainers = ["Huib Piguillet"]
Expand Down Expand Up @@ -123,6 +123,8 @@ select = [
ignore = [
"RUF013", # PEP 484 prohibits implicit `Optional`
"G004", # Logging statement uses f-string
"TRY003", # Avoid specifying long messages outside the exception class
"EM101", # Exception must not use a string literal, assign to variable first
]

[tool.ruff.lint.per-file-ignores]
Expand Down
102 changes: 4 additions & 98 deletions src/powerchord/cli.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,13 @@
import argparse
import asyncio
import logging
import sys
import tomllib
from collections.abc import Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from chili import TypeDecoder, decode

from .logging import LogLevel, LogLevels, logging_context
from .config import load_config
from .logging import logging_context
from .runner import TaskRunner

log = logging.getLogger(__name__)


class LogLevelDecoder(TypeDecoder):
def decode(self, value: str) -> LogLevel:
return LogLevel.decode(value)


@dataclass
class Config:
tasks: dict[str, str] = field(default_factory=dict)
log_levels: LogLevels = field(default_factory=lambda: LogLevels())


class ParseDict(argparse.Action):
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str = None,
) -> None:
value_seq = [values] if isinstance(values, str) else [str(v) for v in values or []]
d = getattr(namespace, self.dest) or {}
try:
pairs = (item.split('=', 1) for item in value_seq)
d |= {key.strip(): value for key, value in pairs}
except ValueError:
parser.error(
f'argument {option_string}: not matching key1="some val" [key2="another val" ...]',
)
else:
setattr(namespace, self.dest, d)


def config_from_args() -> Config | None:
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
'-t',
'--tasks',
dest='tasks',
nargs='+',
metavar='NAME=COMMAND',
action=ParseDict,
default={},
)
args = arg_parser.parse_args()
return Config(args.tasks) if args.tasks else None


class FatalError(SystemExit):
def __init__(self, *args):
log.critical(f'💀 {" ".join(str(arg) for arg in args)}')
super().__init__(1)


class ConfigError(FatalError):
def __init__(self, config_source: str = None, cause: str = None):
message = 'Could not load config'
if config_source:
message += f' from {config_source}'
if cause:
message += f': {cause}'
super().__init__(message)


def config_from_pyproject() -> Config | None:
pyproject_file = 'pyproject.toml'
try:
with Path(pyproject_file).open('rb') as f:
config_dict = tomllib.load(f).get('tool', {}).get('powerchord', {})
except OSError:
return None
try:
return decode(config_dict, Config, decoders={LogLevel: LogLevelDecoder()})
except ValueError as exc:
raise ConfigError(pyproject_file, ' '.join(exc.args)) from exc


def load_config() -> Config:
for loader in [config_from_args, config_from_pyproject]:
config = loader()
if config:
return config
raise ConfigError


def main() -> None:
config = load_config()
task_runner = TaskRunner(config.tasks)
with logging_context(config.log_levels):
sys.exit(not asyncio.run(task_runner.run_tasks()))
success = asyncio.run(TaskRunner(config.tasks).run_tasks())
sys.exit(not success)
105 changes: 105 additions & 0 deletions src/powerchord/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import argparse
import tomllib
from collections.abc import Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from chili import decode

from .logging import LogLevel, LogLevels


@dataclass
class Config:
tasks: dict[str, str] = field(default_factory=dict)
log_levels: LogLevels = field(default_factory=LogLevels)

@classmethod
def decode(cls, value: dict) -> 'Config':
return decode(value, Config, decoders={LogLevel: LogLevel})


class FatalError(SystemExit):
def __init__(self, *args):
super().__init__(f'💀 {" ".join(str(arg) for arg in args)}')


class ConfigError(FatalError):
def __init__(self, config_source: str = None, *args):
message = 'Could not load config'
if config_source:
message += f' from {config_source}'
if args:
message += ': ' + ' '.join(str(a) for a in args)
super().__init__(message)


class ParseDict(argparse.Action):
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str = None,
) -> None:
value_seq = [values] if isinstance(values, str) else [str(v) for v in values or []]
try:
pairs = (item.split('=', 1) for item in value_seq)
new_pairs = {key.strip(): value for key, value in pairs}
except ValueError:
parser.error(
f'argument {option_string}: not matching pattern key1=val1 [key2=val2 ...]',
)
else:
setattr(namespace, self.dest, (getattr(namespace, self.dest) or {}) | new_pairs)


def config_from_args(_config_source: str) -> Config | None:
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
'-t',
'--tasks',
dest='tasks',
nargs='+',
metavar='NAME=COMMAND',
action=ParseDict,
default={},
)
arg_parser.add_argument(
'-l',
'--log-levels',
dest='log_levels',
nargs='+',
metavar='NAME=COMMAND',
action=ParseDict,
default={},
)
config_dict = arg_parser.parse_args().__dict__
return Config.decode(config_dict) if any(config_dict.values()) else None


def config_from_pyproject(config_source: str) -> Config | None:
try:
with Path(config_source).open('rb') as f:
config_dict = tomllib.load(f).get('tool', {}).get('powerchord', {})
except OSError:
return None
return Config.decode(config_dict)


CONFIG_LOADERS = {
'command line': config_from_args,
'pyproject.toml': config_from_pyproject,
}


def load_config() -> Config:
for name, loader in CONFIG_LOADERS.items():
try:
config = loader(name)
except ValueError as exc:
raise ConfigError(name, *exc.args) from exc
if config:
return config
raise ConfigError
18 changes: 9 additions & 9 deletions src/powerchord/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ class LogLevel(IntEnum):
@classmethod
def decode(cls, value: str) -> 'LogLevel':
if not value:
return LogLevel.NEVER
return cls.NEVER
try:
return LogLevel[value.upper()]
except KeyError as exc:
return cls[value.upper()]
except (AttributeError, KeyError) as exc:
raise ValueError('Invalid log level:', value) from exc


Expand All @@ -42,9 +42,9 @@ class LogLevels:
fail: LogLevel = LogLevel.INFO


def queue_listener(levels: LogLevels) -> QueueListener | None:
def setup_logging_queues(levels: LogLevels) -> Iterator[QueueListener]:
if levels.all == LogLevel.NEVER:
return None
return
console = logging.StreamHandler(sys.stdout)
logging.basicConfig(handlers=[console], level=levels.all, format='%(message)s')
queue: Queue[logging.LogRecord] = Queue()
Expand All @@ -53,16 +53,16 @@ def queue_listener(levels: LogLevels) -> QueueListener | None:
logger.setLevel(max(level, levels.all))
logger.addHandler(QueueHandler(queue))
logger.propagate = False
return QueueListener(queue, console)
yield QueueListener(queue, console)


@contextmanager
def logging_context(levels: LogLevels) -> Iterator[None]:
listener = queue_listener(levels)
if listener:
queues_listeners = list(setup_logging_queues(levels))
for listener in queues_listeners:
listener.start()
try:
yield
finally:
if listener:
for listener in queues_listeners:
listener.stop()
18 changes: 9 additions & 9 deletions src/powerchord/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ def __init__(self, tasks: dict[str, str]) -> None:
self.tasks = tasks
self.max_name_length = max(len(n) for n in tasks) if tasks else 0

async def run_task(self, name: str, task: str) -> tuple[str, bool]:
(success, (out, err)), duration = await timed_awaitable(exec_command(task))
log.info(f'{status(success)} {name.ljust(self.max_name_length)} {dim(duration)}')
for level, stream in ((logging.INFO, out), (logging.ERROR, err)):
if stream:
task_log(success).log(level, stream.decode())
return name, success

async def run_tasks(self) -> bool:
if not self.tasks:
log.warning('Nothing to do. Getting bored...\n')
Expand All @@ -28,9 +20,17 @@ async def run_tasks(self) -> bool:
summary = [f'• {name.ljust(self.max_name_length)} {dim(task)}' for name, task in tasks]
for line in (bright('To do:'), *summary, '', bright('Results:')):
log.info(line)
results = await concurrent_call(self.run_task, tasks)
results = await concurrent_call(self._run_task, tasks)
failed_tasks = [task for task, ok in results if not ok]
if failed_tasks:
log.error('')
log.error(f'{bright("Failed tasks:")} {failed_tasks}')
return not failed_tasks

async def _run_task(self, name: str, task: str) -> tuple[str, bool]:
(success, (out, err)), duration = await timed_awaitable(exec_command(task))
log.info(f'{status(success)} {name.ljust(self.max_name_length)} {dim(duration)}')
for level, stream in ((logging.INFO, out), (logging.ERROR, err)):
if stream:
task_log(success).log(level, stream.decode())
return name, success

0 comments on commit 680e195

Please sign in to comment.