Skip to content

Commit

Permalink
Merge 8ddafbc into 6ce938c
Browse files Browse the repository at this point in the history
  • Loading branch information
leifj committed Apr 15, 2021
2 parents 6ce938c + 8ddafbc commit 4dceab1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/pyff/builtins.py
Expand Up @@ -268,6 +268,7 @@ def fork(req, *opts):
return req.t


@deprecated(reason="any pipeline has been replace by other behaviour")
@pipe(name='any')
def _any(lst, d):
for x in lst:
Expand Down
40 changes: 23 additions & 17 deletions src/pyff/pipes.py
Expand Up @@ -6,7 +6,8 @@

import os
import traceback
from typing import Any, Dict, Optional
import functools
from typing import Any, Dict, Optional, Callable, Type, Tuple

import yaml
from apscheduler.schedulers.background import BackgroundScheduler
Expand All @@ -22,26 +23,31 @@
registry = dict()


def pipe(*args, **kwargs):
"""
Register the decorated function in the pyff pipe registry
:param name: optional name - if None, use function name
"""
def pipe(*args, **kwargs) -> Callable:
def pipe_decorator(f: Callable) -> Callable:
if 'name' in kwargs: # called with the name argument @pipe(name=...) or as @pipe()
f_name = kwargs.get('name', f.__name__)
registry[f_name] = f

@functools.wraps(f)
def wrapper_pipe(*iargs, **ikwargs) -> Any:
opts_type: Optional[Type] = None
if 'opts' in f.__annotations__:
opts_type = f.__annotations__['opts']

if opts_type is not None:
opts_in = ikwargs.pop('opts')
ikwargs['opts'] = opts_type(**dict(list(zip(opts_in[::2], opts_in[1::2]))))

def deco_none(f):
return f
return f(*iargs, **ikwargs)

def deco_pipe(f):
f_name = kwargs.get('name', f.__name__)
registry[f_name] = f
return f
return wrapper_pipe

if 1 == len(args):
f = args[0]
registry[f.__name__] = f
return deco_none
if len(args) == 1 and callable(args[0]): # called without arguments @pipe
registry[args[0].__name__] = args[0]
return pipe_decorator(args[0])
else:
return deco_pipe
return pipe_decorator


class PipeException(PyffException):
Expand Down

0 comments on commit 4dceab1

Please sign in to comment.