Skip to content

Commit

Permalink
add commande line and allow list of Config in configs
Browse files Browse the repository at this point in the history
  • Loading branch information
BastienCagna committed Feb 13, 2024
1 parent 7d607d8 commit 169e210
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 33 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ Use it to call a python function.
## Configuration file


## Command line


## Example
Here is an example of simple configuration file:
```yaml
Expand Down
15 changes: 15 additions & 0 deletions micropype/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
Example
=======
```shell
$ python -m micropype mymodule.submodule.function1 --x 0.2 --y 12 --config myconfig_file.yaml --db.path /path/to/the/db.sqlite
```
"""
import sys
from micropype.commandline import run_function_from_commandline

r = run_function_from_commandline(*sys.argv[1:])
if r is not None:
print(r)
163 changes: 163 additions & 0 deletions micropype/commandline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from typing import Any, List, get_type_hints
import importlib
import inspect

from micropype.utils import cast_to_type
from . import Config


def parse_args(args: str) -> dict:
""" Convert
--arg1 val1 --arg2 --arg3 val3.1 val3.2 --arg4
in
{"arg1": val1, "arg2": True, "arg3", [val3.1, val3.2], "arg4": True}
"""
pos_args = []
kwargs = {}
a = 0
while a < len(args):
arg = args[a]
if arg.startswith("--"):
key = arg[2:]
# This keyword args value
for a2 in range(a+1, len(args)):
if args[a2].startswith("--"):
a2 -= 1
break
if a2 == a+1:
kwargs[key] = cast_to_type(args[a+1])
elif a2 > a+1:
kwargs[key] = list(cast_to_type(val) for val in args[a+1: a2+1])
else:
kwargs[key] = True
a = a2 + 1
else:
# This is still positional args
pos_args.append(cast_to_type(arg))
a += 1
return pos_args, kwargs


def get_all_type_hints(obj) -> dict:
""" Return all attribute as keys and annotation type as values. """
all_hints = {}
for base_cls in type(obj).__mro__:
all_hints.update(get_type_hints(base_cls))
return all_hints


def init_class_attribute(obj, attribut_name):
""" Create a new instance of an object by calling the contrustor without any args. """
if hasattr(obj, attribut_name):
obj.__setattr__(attribut_name, get_all_type_hints(obj)[attribut_name]())
return obj
raise KeyError(f'Object {type(obj)} has no attribute "{attribut_name}".')


def override_config_with_arg(config: Config, arg_path:List[str], value:Any):
""" Overwrite config property base on the splitted argument path. """
if len(arg_path) == 0:
raise ValueError(f'No path.')
attr = arg_path[0]
if not attr in get_all_type_hints(config):
raise ValueError(f'Invalid path "{".".join(arg_path)}" for config "{type(config).__name__}".')

# attr_type = type(config.__getattribute__(attr))
if len(arg_path) > 1:
val = config.__getattribute__(attr)
if val is None or type(val).__name__ == "type":
config = init_class_attribute(config, attr)
val = override_config_with_arg(config.__getattribute__(attr), arg_path[1:], value)
else:
val = value

config.__setattr__(arg_path[0], val)
return config


def override_config_with_args(config: Config, parsed_args: dict):
""" Overwrite all the request args. """
for path, val in parsed_args.items():
splt = path.split(".")
config = override_config_with_arg(config, splt, val)
return config


def import_function_by_path(function_path: str):
""" Dynamic python import of an object of a python module. """
module_path, function_name = function_path.rsplit(".", 1)
module = importlib.import_module(module_path)
return getattr(module, function_name)


def list_function_arguments(func) -> dict:
""" Return the list of function arguments. """
signature = inspect.signature(func)
parameters = signature.parameters
arguments_info = {}
for param_name, param in parameters.items():
param_type = param.annotation if param.annotation != inspect.Parameter.empty else None
arguments_info[param_name] = param_type
return arguments_info


def run_function_from_commandline(function_path: str, *args):
""" Run a Python function using arguments of a command line.
First, the function is imported and the arguments that match function definition are found,
they are use when running the function as kwargs.
If a "--config path/to/config/file.yaml" is in args and that the function definition
uses a config parameter of type micropype.Config (or subclass), the config file is loaded
to initialze the config.
If some arguments are not used in the function definition, they are used to override the
load config.
If the first argument points to a non-callable object, the object is returned.
Parameters
==========
function_path: str
The pythonic path to the function to be executed (ex: module.submodule.function_name)
*args: List[str]
A list of commandline inputs.
For example, with: $python script.py module.funcname --arg1 val1,
use run_function_from_commandline(sys.argv[1:])
Return
======
It return the function result or the value of the path if not callable.
"""
# Import the function and get the list of arguments
function = import_function_by_path(function_path)
if not callable(function):
return function
f_args = list_function_arguments(function)

# convert --arg1 val1 --arg2 --arg3 val3 in {"arg1": val1, "arg2": True, "arg3", val3}
pos_args, parsed_args = parse_args(args)

# search if commandline args are for function arguments
kwargs = {}
for arg in parsed_args.keys():
if arg != "config" and arg in f_args:
kwargs[arg] = parsed_args[arg]
del parsed_args[arg]

# If the function has a "config" argument of type (or subtype) micropype.Config
if "config" in f_args and issubclass(f_args["config"], Config):
# And if the second argument is a YAML file, load the config file
if "config" in parsed_args and parsed_args["config"].endswith(".yaml"):
conf = f_args["config"](parsed_args["config"])
del parsed_args["config"]
# Else init the Config from nothing
else:
conf = type(f_args["config"])()
# Then use command line arguments to override default or loaded config
conf = override_config_with_args(conf, parsed_args)

# Finally run the function
return function(*pos_args, config=conf, **kwargs)
else:
return function(*pos_args, **kwargs)
55 changes: 29 additions & 26 deletions micropype/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from warnings import warn
import yaml
from micropype.utils import MessageIntent, cprint, merge_dicts

Expand All @@ -12,8 +11,20 @@ def read_annotations(cls):
attributes = {}
for attr, attr_type in cls.__annotations__.items():
default = cls.__getattribute__(cls, attr) if hasattr(cls, attr) else None

# If the attribute is optional, get it's subtype
attr_type = attr_type.__args__[0] if attr_type.__name__ == "Optional" else attr_type

t = attr_type.__name__
attributes[attr] = (t, default)

sub_t = None
if hasattr(attr_type, "__name__") and attr_type.__name__ == "List":
if len(attr_type.__args__) > 0:
sub_t = attr_type.__args__[0].__name__
t = list


attributes[attr] = (t, default, sub_t)
return attributes


Expand All @@ -39,17 +50,30 @@ def __init__(self, yaml_f=None, priority="yaml", **kwargs) -> None:

attributes = read_annotations(self.__class__)

#
used_kwargs = 0
for attr_name, (attr_t, default_v) in attributes.items():
for attr_name, (attr_t, default_v, sub_type) in attributes.items():
useDefault = not attr_name in kwargs
if not useDefault:
used_kwargs += 1

if attr_t in self._children:
value = self._children[attr_t] if useDefault else self._children[attr_t](**kwargs[attr_name])
if attr_t in self._children or (attr_t in [list, tuple] and sub_type in self._children):
if useDefault:
value = self._children[attr_t if not attr_t in [list, tuple] else sub_type]
# If it is a sub Config
elif type(kwargs[attr_name]).__name__ == "dict":
value = self._children[attr_t](**kwargs[attr_name])
# If it is a list of Config
elif attr_t in [list, tuple] and len(kwargs[attr_name]) > 0 and type(kwargs[attr_name][0]).__name__ == "dict":
value = list(self._children[sub_type](**kwargs[attr_name][i]) for i in range(len(kwargs[attr_name])))
else:
value = kwargs[attr_name]
else:
value = default_v if useDefault else kwargs[attr_name]
self.__setattr__(attr_name, value)

# Print a message if a argument was passed but not used as it is no part
# of this config object
if used_kwargs < len(kwargs.keys()):
for k in kwargs.keys():
if not k in attributes.keys():
Expand Down Expand Up @@ -82,24 +106,3 @@ def to_yaml(self, filepath:str):
with open(filepath, 'w') as fp:
yaml.dump(self.to_dict(), fp)


# if __name__ == "__main__":
# class SubConfig(Config):
# name: str = "Albert"
# age: float
# class AConfig(Config):
# num: float = .3
# foo: dict
# subject: SubConfig

# conf = {
# "paul": "jeje",
# "foo": {"item1": 0.1, "item2": 0.2},
# "subject": {
# "name": "ciceron",
# "age": 12
# }
# }

# config = AConfig(**conf)
# pass
63 changes: 59 additions & 4 deletions micropype/pipelining.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,50 @@
import subprocess
import os.path as op
from time import time
from time import time, sleep
from datetime import datetime
from typing import Iterable, List, Union
from typing import Iterable, List, Union, Any
from .utils import MessageIntent, cprint
import sys
import traceback
# import signal
# from subprocess import signal, Popen

# PREKILL_SLEEPING_TIME_SEC = 1

# class RuntimeManager:
# stopped = False
# current_subprocess: Popen = None

# def __init__(self) -> None:
# pass

# def stop(self):
# if self.current_subprocess:
# print("runtimemanager: terminate not working")
# # self.current_subprocess.send_signal(signal.SIGINT)
# # # # Try to stop
# # self.current_subprocess.terminate()
# # self.current_subprocess.communicate()
# # # # Wait to see if the process can be terminated
# # # sleep(PREKILL_SLEEPING_TIME_SEC)

# # # # If the process is still alive, kill it
# # # print("runtimemanager: kill?")
# # # if self.current_subprocess.poll() is None:
# # # print("runtimemanager: yes, kill")
# # sleep(1)
# # print("killing", self.current_subprocess.pid)
# # self.current_subprocess.kill()
# # self.current_subprocess.send_signal(signal.SIGTERM)
# # # self.current_subprocess.join()
# self.stopped = True
# sys.exit()

# def __setattr__(self, __name: str, __value: Any) -> None:
# if __name == "current_subprocess":
# print("Updating current subprocess to:", __value)
# print(__value)
# return super().__setattr__(__name, __value)


def write_log(log_f, *text: Union[str, List[str]]):
Expand Down Expand Up @@ -48,15 +87,15 @@ def run_cmd(cmd, title=None, log=None, versions:dict=None, raise_errors=True):
f"\n##########\nStarted at: {datetime.isoformat(datetime.now())}"
)
tic = time()
output = subprocess.check_output(splitted_cmd, stderr=subprocess.STDOUT, shell=True)
output = _run_cmd(splitted_cmd)
toc = time()
write_log(log,
output.decode("utf-8"),
f"\nFinshed at: {datetime.isoformat(datetime.now())} - tooks {toc-tic:.03f}s",
"\n#####\n\n\n"
)
else:
subprocess.run(splitted_cmd, shell=True)
_run_cmd(splitted_cmd)
except Exception as e:
tb = traceback.format_exc()
cprint(f"An error occured while running: {cmd}", intent=MessageIntent.ERROR)
Expand All @@ -79,6 +118,20 @@ def run_cmd(cmd, title=None, log=None, versions:dict=None, raise_errors=True):
cprint(f"done in {time()-tic:.02f}s", intent=MessageIntent.SUCCESS)
return 0

def _run_cmd(splitted_cmd):
# output = subprocess.check_output(splitted_cmd, stderr=subprocess.STDOUT, shell=True)
process = subprocess.Popen(splitted_cmd, stdout=subprocess.PIPE, shell=True, stderr=subprocess.STDOUT)# stderr=subprocess.STDOUT, shell=True)
# if runtime_manager:
# if runtime_manager.stopped:
# sys.exit()
# runtime_manager.current_subprocess = process
# else:
# print("not using runtime manager")
process.wait()
output = process.stdout.read()
# if runtime_manager:
# runtime_manager.current_subprocess = None
return output

def cached_run(cmd, out_files:List[str]=None, title=None, log=None,
versions:dict=None, raise_errors=True, verbose=False):
Expand Down Expand Up @@ -180,3 +233,5 @@ def cached_function_call(func, args, out_files:List[str]=None, title=None, log=N
return run_func(func, args, title, log, versions, raise_errors)
if verbose:
cprint('Using cached files for', title if title else func.__name__, intent=MessageIntent.WARNING)


Loading

0 comments on commit 169e210

Please sign in to comment.