Skip to content

Commit

Permalink
Add set_luigi_config() context manager to use temporary luigi config
Browse files Browse the repository at this point in the history
Change-Id: If1370836b5af088368a97da31a889be9cd74c612
  • Loading branch information
adrien-berchet committed Mar 15, 2021
1 parent a1c01fa commit dbdcfc8
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 75 deletions.
61 changes: 61 additions & 0 deletions luigi_tools/util.py
@@ -1,6 +1,8 @@
"""This module provides some fonctions to work with luigi tasks."""
import logging
import os
import re
from configparser import ConfigParser
from pathlib import Path

import luigi
Expand Down Expand Up @@ -286,6 +288,65 @@ def register_templates(directory=None, name=None, hierarchy_end=True):
template = (template_path / template_name).with_suffix(".cfg")
if not template.exists():
raise ValueError(f"The template '{template}' could not be found.")

luigi.configuration.add_config_path(template)
if hierarchy_end:
luigi.configuration.add_config_path("luigi.cfg")


class set_luigi_config:
"""Context manager to set current luigi config.
Args:
params (dict): The parameters to load into the luigi configuration.
configfile (str): Path to the temporary config file (luigi.cfg by default).
"""

def __init__(self, params=None, configfile=None):
self.params = params
self.luigi_config = luigi.configuration.get_config()
if configfile is not None:
self.configfile = configfile
else:
self.configfile = "luigi.cfg"

def __enter__(self):
"""Load the given luigi configuration."""
# Reset luigi config
self.luigi_config.clear()

# Remove config file
if os.path.exists(self.configfile):
os.remove(self.configfile)

# Export config
if self.params is not None:
self.export_config(self.configfile)

# Set current config in luigi
self.luigi_config.read(self.configfile)
else:
self.configfile = None

def get_config(self):
"""Convert the parameter dict to a :class:`configparser.ConfigParser` object."""
config = ConfigParser()
config.read_dict(self.params)
return config

def export_config(self, filepath):
"""Export the configuration to the configuration file."""
params = self.get_config()

# Export params
with open(filepath, "w") as configfile:
params.write(configfile)

def __exit__(self, *args):
"""Reset the configuration when exiting the context manager."""
# Remove config file
if self.configfile is not None and os.path.exists(self.configfile):
os.remove(self.configfile)

# Reset luigi config
self.luigi_config.clear()
2 changes: 1 addition & 1 deletion luigi_tools/version.py
@@ -1,2 +1,2 @@
"""Package version."""
VERSION = "0.0.7" # pragma: no cover
VERSION = "0.0.8.dev0" # pragma: no cover
10 changes: 3 additions & 7 deletions tests/conftest.py
Expand Up @@ -4,10 +4,9 @@
import pytest

import luigi_tools.task
from luigi_tools.util import set_luigi_config

from .tools import create_not_empty_file
from .tools import dict_to_config
from .tools import set_luigi_config


@pytest.fixture(scope="function")
Expand All @@ -21,16 +20,13 @@ def tmp_working_dir(tmp_path):

@pytest.fixture
def luigi_tools_params():
return dict_to_config({"TaskA": {"a_cfg": "default_value_in_cfg"}})
return {"TaskA": {"a_cfg": "default_value_in_cfg"}}


@pytest.fixture
def luigi_tools_working_directory(tmp_working_dir, luigi_tools_params):
# Setup config
params = luigi_tools_params

# Set config
with set_luigi_config(params):
with set_luigi_config(luigi_tools_params):
yield tmp_working_dir


Expand Down
2 changes: 1 addition & 1 deletion tests/test_parameter.py
Expand Up @@ -9,9 +9,9 @@
import luigi_tools.task
import luigi_tools.target
import luigi_tools.util
from luigi_tools.util import set_luigi_config

from .tools import create_empty_file
from .tools import set_luigi_config


def test_ext_parameter(luigi_tools_working_directory):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_task.py
Expand Up @@ -11,13 +11,13 @@
import luigi_tools.util
from luigi_tools.task import DuplicatedParameterError
from luigi_tools.task import GlobalParameterNoValueError
from luigi_tools.util import set_luigi_config

from .tools import check_empty_file
from .tools import check_existing_file
from .tools import check_not_empty_file
from .tools import create_empty_file
from .tools import create_not_empty_file
from .tools import set_luigi_config


@pytest.mark.filterwarnings("ignore::UserWarning:luigi.parameter")
Expand Down
98 changes: 88 additions & 10 deletions tests/test_util.py
@@ -1,4 +1,5 @@
"""Tests for luigi tools."""
import copy
from configparser import ConfigParser
from pathlib import Path

Expand All @@ -9,9 +10,9 @@
import luigi_tools.task
import luigi_tools.target
import luigi_tools.util
from luigi_tools.util import set_luigi_config

from .tools import create_not_empty_file
from .tools import set_luigi_config


DATA = Path(__file__).parent / "data"
Expand Down Expand Up @@ -197,6 +198,15 @@ def test_param_repr():


class TestRegisterTemplates:
@pytest.fixture
def config_reseter(self):
cfg_cls = luigi.configuration.cfg_parser.LuigiConfigParser
current_config = copy.deepcopy(cfg_cls._config_paths)
yield
cfg_cls._config_paths = current_config
cfg_cls.reload()
luigi.configuration.get_config().clear()

@pytest.fixture
def Task(self, tmpdir):
class Task(luigi.Task):
Expand All @@ -221,7 +231,7 @@ def template_dir(self, tmpdir):
config.write(f)
return str(template_dir)

def test_cfg_only(self, Task, template_dir):
def test_cfg_only(self, Task, template_dir, config_reseter):
with set_luigi_config(
{
"Task": {"a": "a_from_cfg"},
Expand All @@ -230,7 +240,7 @@ def test_cfg_only(self, Task, template_dir):
luigi_tools.util.register_templates(template_dir, "template_1")
assert luigi.build([Task(expected_a="a_from_cfg")], local_scheduler=True)

def test_template_only(self, Task, template_dir):
def test_template_only(self, Task, template_dir, config_reseter):
with set_luigi_config(
{
"Template": {"name": "template_1"},
Expand All @@ -239,7 +249,7 @@ def test_template_only(self, Task, template_dir):
luigi_tools.util.register_templates(template_dir)
assert luigi.build([Task(expected_a="a_from_template")], local_scheduler=True)

def test_template_and_cfg(self, Task, template_dir):
def test_template_and_cfg(self, Task, template_dir, config_reseter):
with set_luigi_config(
{
"Template": {"name": "template_1"},
Expand All @@ -249,7 +259,7 @@ def test_template_and_cfg(self, Task, template_dir):
luigi_tools.util.register_templates(template_dir)
assert luigi.build([Task(expected_a="a_from_cfg")], local_scheduler=True)

def test_missing_template(self, Task, template_dir):
def test_missing_template(self, Task, template_dir, config_reseter):
with set_luigi_config(
{
"Template": {"name": "missing_template"},
Expand All @@ -259,7 +269,7 @@ def test_missing_template(self, Task, template_dir):
with pytest.raises(ValueError, match=r"The template .* could not be found\."):
luigi_tools.util.register_templates(template_dir)

def test_template_directory_in_cfg(self, Task, template_dir):
def test_template_directory_in_cfg(self, Task, template_dir, config_reseter):
with set_luigi_config(
{
"Template": {
Expand All @@ -272,7 +282,7 @@ def test_template_directory_in_cfg(self, Task, template_dir):
luigi_tools.util.register_templates()
assert luigi.build([Task(expected_a="a_from_cfg")], local_scheduler=True)

def test_template_directory_override(self, Task, template_dir):
def test_template_directory_override(self, Task, template_dir, config_reseter):
directory = Path(template_dir)
new_directory = directory.with_name("new_templates")
directory.rename(new_directory)
Expand All @@ -287,7 +297,7 @@ def test_template_directory_override(self, Task, template_dir):
luigi_tools.util.register_templates(template_dir)
assert luigi.build([Task(expected_a="a_from_template")], local_scheduler=True)

def test_no_directory(self, Task):
def test_no_directory(self, Task, config_reseter):
with set_luigi_config(
{
"Template": {"name": "template_name"},
Expand All @@ -300,7 +310,7 @@ def test_no_directory(self, Task):
with pytest.raises(ValueError, match=msg):
luigi_tools.util.register_templates()

def test_no_name(self, Task):
def test_no_name(self, Task, config_reseter):
with set_luigi_config(
{
"Template": {"directory": "any directory"},
Expand All @@ -313,7 +323,7 @@ def test_no_name(self, Task):
with pytest.raises(ValueError, match=msg):
luigi_tools.util.register_templates()

def test_no_hierarchy_end(self, Task, template_dir):
def test_no_hierarchy_end(self, Task, template_dir, config_reseter):
# Register the template only
luigi_tools.util.register_templates(template_dir, "template_1", hierarchy_end=False)

Expand All @@ -329,3 +339,71 @@ def test_no_hierarchy_end(self, Task, template_dir):
# Register the new template and the luigi.cfg file
luigi_tools.util.register_templates(new_directory, "template_2")
assert luigi.build([Task(expected_a="a_from_2nd_template")], local_scheduler=True)


class TestSetLuigiConfig:
@pytest.fixture
def Task(self, tmpdir):
class Task(luigi.Task):
a = luigi.Parameter(default="a")
expected_a = luigi.Parameter(default="a")

def run(self):
assert self.a == self.expected_a

def output(self):
return tmpdir / "not_existing_file"

return Task

def test_defaults(self, Task):
assert luigi.build([Task()], local_scheduler=True)

def test_new_config(self, Task):
with set_luigi_config(
{
"Task": {"a": "a_from_cfg"},
}
):
assert luigi.build([Task(expected_a="a_from_cfg")], local_scheduler=True)

failed_task = []
exceptions = []

@Task.event_handler(luigi.Event.FAILURE)
def check_exception(task, exception):
failed_task.append(str(task))
exceptions.append(str(exception))

with set_luigi_config(
{
"Task": {"a": "a_from_cfg"},
}
):
assert not luigi.build([Task(expected_a="different_value")], local_scheduler=True)

assert failed_task == [str(Task(a="a_from_cfg", expected_a="different_value"))]
assert exceptions == [
"assert 'a_from_cfg' == 'different_value'\n - different_value\n + a_from_cfg"
]

def test_config_file(self, Task):
filename = "test_config_file.cfg"
with set_luigi_config(
{
"Task": {"a": "a_from_cfg"},
},
filename,
):
assert luigi.build([Task(expected_a="a_from_cfg")], local_scheduler=True)
assert Path(filename).exists()
assert not Path(filename).exists()

def test_params_ConfigParser(self, Task):
params = {
"Task": {"a": "a_from_cfg"},
}
config = ConfigParser()
config.read_dict(params)
with set_luigi_config(config):
assert luigi.build([Task(expected_a="a_from_cfg")], local_scheduler=True)
55 changes: 0 additions & 55 deletions tests/tools.py
@@ -1,11 +1,6 @@
"""A few helpers for tests"""
import os
from pathlib import Path

from configparser import ConfigParser

import luigi.configuration


def check_existing_file(filename):
"""Check if a file exists"""
Expand Down Expand Up @@ -34,53 +29,3 @@ def check_not_empty_file(filename):
"""Checck that a file is not empty"""
with open(filename) as f:
return f.read() == "NOT EMPTY"


def dict_to_config(params):
config = ConfigParser()
config.read_dict(params)
return config


def export_config(params, filepath):
if isinstance(params, dict):
params = dict_to_config(params)

# Export params
with open(filepath, "w") as configfile:
params.write(configfile)


class set_luigi_config:
"""Context manager to set current luigi config."""

def __init__(self, params=None):
self.params = params
self.luigi_config = luigi.configuration.get_config()

def __enter__(self):
self.configfile = "luigi.cfg"

# Reset luigi config
self.luigi_config.clear()

# Remove config file
if os.path.exists(self.configfile):
os.remove(self.configfile)

# Export config
if self.params is not None:
export_config(self.params, self.configfile)

# Set current config in luigi
self.luigi_config.read(self.configfile)
else:
self.configfile = None

def __exit__(self, *args):
# Remove config file
if self.configfile is not None and os.path.exists(self.configfile):
os.remove(self.configfile)

# Reset luigi config
self.luigi_config.clear()

0 comments on commit dbdcfc8

Please sign in to comment.