In [140]:
from pprint import pprint
import copy
import types
from jsonpath_ng.ext import parse
from abc import ABCMeta, abstractmethod
from pydantic import BaseModel, validator
from typing import List, Optional, Any, Union
import datetime

class Chainable:
    pass

#Todo: default params
#Todo: store secrets
class ChainableContext:
    def __init__(self):#, dryrun, debug):
        #self.dryrun = dryrun
        #self.debug = debug
        self.workflow = []

    def add_mapping(self, mapping):
        self.workflow.append(mapping)

    def run(self):
        return

class ChainableObjectHistory(BaseModel):
    func: Any#type(AbstractChainableFunction) #Union[ChainableFunction, TypeSafeChainableFunction]
    #data: Optional[Union[dict, TypeSafeChainableFunction.Data]] = None
    #param: Optional[Union[dict, TypeSafeChainableFunction.Param]] = None
    data: Optional[dict] = None
    param: Optional[dict] = None
    mapping: Optional[dict] = None
    
    @validator("func")
    def validate_some_foo(cls, val):
        if issubclass(type(val), AbstractChainableFunction):
            return val
        raise TypeError("Wrong type for 'func', must be subclass of AbstractChainableFunction")
    
class ChainableObject(BaseModel):
    data: Optional[dict] = {}
    meta: Optional[dict] = {}
    hist: Optional[List[ChainableObjectHistory]] = []
    mapping: Optional[dict] = {}
    emit: Optional[Any] = None#lambda obj: print("NOT IMPLEMENTED")

    def resolve(self, mapping):
        #make components availabel in function scobe
        data = self.data
        meta = self.meta
        #hist = self.hist
        for key in mapping['param']:
            res = []
            value = mapping['param'][key]
            if isinstance(value, dict): #dynamic
                if 'static' in value: res.append(value['static']) #explicit static 
                if 'eval' in value: res.append(eval(value['eval'])) #eval expression
                if 'jsonpath' in value:
                    jsonpath_expr = parse(value['jsonpath'])
                    for match in jsonpath_expr.find(self.dict()):
                        res.append(match.value)
                if 'match' in value:  #match condition
                    if 'meta' in value['match']:
                        jsonpath_expr = parse(value['match']['meta']['jsonpath'])
                        #[print(str(match.full_path)) for match in jsonpath_expr.find(self.content)]
                        #[pprint(match.value) for match in jsonpath_expr.find(obj.content)]
                        for match in jsonpath_expr.find(self.dict()):
                            data_path = str(match.full_path).replace('meta', 'data', 1) #default: replace only root key => traverse to data branch
                            if 'value' in value and 'data' in value['value']:
                                data_path = str(match.full_path).replace('meta', 'data', 1) 
                                if value['value']['data']['jsonpath'] != "": data_path += "." + value['value']['data']['jsonpath'] #value path relative to match path
                                res.append(parse(data_path).find(self.dict())[0].value)
                            if 'value' in value and 'meta' in value['value']:
                                data_path = str(match.full_path) + "." + value['value']['meta']['jsonpath'] #value path relative to match path
                                res.append(parse(data_path).find(self.dict())[0].value)
                            print(data_path)
                            #pprint(parse(data_path).find(self.content)[0].value)
                            
                if (len(res) == 1): res = res[0]
                mapping['param'][key] = res
            if isinstance(value, types.FunctionType) and not key.startswith('_'): #dynamic function
                mapping['param'][key] = value()
        return mapping
    
    def apply(self, mappings):
        if not isinstance(mappings, list):
            mappings = [mappings]
        for mapping in mappings:
            if mapping is None:
                mapping = self.mapping #read map from object (result from previous function)
            mapping = self.resolve(mapping)
            self = eval(mapping['func'] + ".apply(self, mapping['param'])")
        #raw_data(self.content, mapping)
        #pprint(mapping)
        #pprint(self.content)
        return self
    
    #Todo: consider file or db backends
    def store_data(self, key: str, data, meta):
        if not key in self.data: self.data[key] = []
        self.data[key].append(data)
        if not key in self.meta: self.meta[key] = []
        self.meta[key].append(meta)
        
    def store_hist(self, hist: ChainableObjectHistory):
        #hist.func.obj = None #prevent circular objects
        self.hist.append(hist)
        
class AbstractChainableFunction(BaseModel):
    name: str
    uuid: str
    start_time: Optional[datetime.datetime] = None
    end_time: Optional[datetime.datetime] = None
    obj: Optional[ChainableObject] = None
        
    @abstractmethod
    def apply(self, obj, param):
        pass   
    
class ChainableFunction(AbstractChainableFunction):
    param_default: dict = {}
    
    def set_default_params(self, param: dict):
        self.param_default = param 
    
    def set_param(self, param: dict = None):        
        if not param is None:
            param = {**self.param_default, **param}
        else: param = self.param_default
        pprint(param)
        return param
    
    def apply(self, obj: ChainableObject, param: dict = None):
        self.obj = obj
        param = self.set_param(param)
        self.pre_exec(param)
        self.func(param)
        self.post_exec(param)
        return obj
    
    def pre_exec(self, param):
        self.start_time = datetime.datetime.utcnow()
    
    @abstractmethod
    def func(self, param):
        pass
    
    def store(self, key, data, meta):
        self.obj.store_data(key, data, meta)
        
    def post_exec(self, param):
        obj = self.obj
        #self.obj = None
        hist = {}
        #if param['debug']: hist['data'] = copy.deepcopy(obj.data)
        #if param['debug']: hist['mapping'] = copy.deepcopy(obj.mapping)
        self.end_time = datetime.datetime.utcnow()
        func = self.copy()
        func.obj = None
        hist['func'] = func
        hist['param'] = param
        obj.store_hist(ChainableObjectHistory(**hist))
    
class TypeSafeChainableFunction(AbstractChainableFunction):
    class Param(BaseModel):
        debug: bool = False
    class Data(BaseModel):
        pass
    class Meta(BaseModel):
        data_class: Optional[type(BaseModel)] = None  
        data_class_name: Optional[str] = ""  
        
    param_class: type(Param)# = TypeSafeChainableFunction.Param  
    data_class: Optional[type(Data)]
    meta_class: Optional[type(Meta)]
    
    def __init__(self, name: str, uuid: str, 
                 param_class: type(Param) = Param,
                 data_class: type(Data) = None,
                 meta_class: type(Meta) = None
                ): 
        assert type(param_class) == type(TypeSafeChainableFunction.Param) or issubclass(param_class, TypeSafeChainableFunction.Param)
        assert data_class == None or type(data_class) == type(TypeSafeChainableFunction.Data) or issubclass(data_class, TypeSafeChainableFunction.Data)
        assert meta_class == None or type(meta_class) == type(TypeSafeChainableFunction.Meta) or issubclass(meta_class, TypeSafeChainableFunction.Meta)
        super().__init__(name=name, uuid=uuid, param_class=param_class, data_class=data_class, meta_class=meta_class)

    def apply(self, obj, param):
        self.obj = obj
        param = self.param_class(**param)
        self.pre_exec(param)
        self.func(param)
        self.post_exec(param)
        return obj
    
    def pre_exec(self, param):
        self.start_time = datetime.datetime.utcnow()
    
    @abstractmethod
    def func(self, param: Param):
        pass
    
    def store(self, key: str, data, meta):
        if self.data_class != None:
            if type(data) != self.data_class:
                data = self.data_class(**data)
        if self.meta_class != None:
            if type(meta) != self.meta_class:
                meta = self.meta_class(data_class = self.data_class, data_class_name = self.__class__.__name__ + '.' + self.data_class.__name__, **meta)
        self.obj.store_data(key, data, meta)
        
    def post_exec(self, param):      
        hist = {}
        #if param['debug']: hist['data'] = copy.deepcopy(obj.data)
        #if param['debug']: hist['mapping'] = copy.deepcopy(obj.mapping)
        self.end_time = datetime.datetime.utcnow()
        func = self.copy()
        func.obj = None
        hist['func'] = func
        hist['param'] = param
        obj.store_hist(ChainableObjectHistory(**hist))    
    
class GetRaw(ChainableFunction):
    def __init__(self):
        super().__init__(name="get_raw", uuid="0001")
        super().set_default_params({'debug': True, 'data_name':"raw"})

    def func(self, param):
        super().store('raw', [1,2,3,4], {'type': "list", 'dim': 1, 'name': param['data_name'], 'label': {'de':"Spannung"}, 'quant': "qudt:Voltage", 'unit':"qudt:mV", 'test':{'nested': "value"}})
    
class GetRawTypeSafe(TypeSafeChainableFunction):
    class Param(TypeSafeChainableFunction.Param):
        data_name: str = "raw"
    
    def __init__(self):
        super(__class__, self).__init__(name="get_raw", uuid="0001", param_class=__class__.Param)

    def func(self, param):
        #param.debug
        super().store('raw', [1,2,3,4], {'type': "list", 'dim': 1, 'name': param.data_name, 'label': {'de':"Spannung"}, 'quant': "qudt:Voltage", 'unit':"qudt:mV", 'test':{'nested': "value"}}) 

class GetRawFullTypeSafe(TypeSafeChainableFunction):
    class Param(TypeSafeChainableFunction.Param):
        data_name: str = "raw"
        
    class Data(TypeSafeChainableFunction.Data):
        content: List[int]
        
    class Meta(TypeSafeChainableFunction.Meta):
        name: str
        label: dict[str, str]
        quant: str
    
    def __init__(self):
        super(__class__, self).__init__(name="get_raw", uuid="0001", param_class=__class__.Param, data_class=__class__.Data, meta_class=__class__.Meta)

    def func(self, param):
        super().store('raw', {'content':[1,2,3,4]}, {'type': "list", 'dim': 1, 'name': param.data_name, 'label': {'de':"Spannung"}, 'quant': "qudt:Voltage", 'unit':"qudt:mV", 'test':{'nested': "value"}}) 

class RemoveListElement(TypeSafeChainableFunction):
    class Param(TypeSafeChainableFunction.Param):
        l: List[Any]
    
    def __init__(self):
        super(RemoveListElement, self).__init__(name="remove_list_element", uuid="0003", param_class=RemoveListElement.Param)

    def func(self, param):
        print(type(param))
        print(param.l)
        del param.l[-1]
        print(param.l)

{'data_name': 'raw', 'debug': True}
{'data': {'raw': [[1, 2, 3, 4]]},
 'hist': [{'data': None,
           'func': {'end_time': datetime.datetime(2022, 5, 12, 5, 49, 19, 182582),
                    'name': 'get_raw',
                    'obj': None,
                    'param_default': {'data_name': 'raw', 'debug': True},
                    'start_time': datetime.datetime(2022, 5, 12, 5, 49, 19, 182560),
                    'uuid': '0001'},
           'mapping': None,
           'param': {'data_name': 'raw', 'debug': True}}],
 'mapping': {},
 'meta': {'raw': [{'dim': 1,
                   'label': {'de': 'Spannung'},
                   'name': 'raw',
                   'quant': 'qudt:Voltage',
                   'test': {'nested': 'value'},
                   'type': 'list',
                   'unit': 'qudt:mV'}]}}
{'data': {'raw': [[1, 2, 3, 4]]},
 'hist': [{'data': None,
           'func': {'data_class': None,
                    'end_time': datetime.datetime(2022, 5, 12, 5, 49, 19,

In [None]:
#m = TypeSafeChainableFunction.Meta()
#print(type(m))
#GetRaw.name
get_raw = GetRaw()
obj = ChainableObject()
obj = get_raw.apply(obj)
pprint(obj.dict())
get_raw2 = GetRawTypeSafe()
obj = ChainableObject()
obj = get_raw2.apply(obj, {'debug': True, 'data_name': "Test"})
pprint(obj.dict())
get_raw3 = GetRawFullTypeSafe()
obj = ChainableObject()
obj = get_raw3.apply(obj, {'debug': True, 'data_name': "Test"})
pprint(obj.dict())
#print(obj.meta['raw'][0].dict())
#pprint(obj.meta['raw'][0].data_class.schema())
#pprint(type(obj.meta['raw'][0].data_class))
mapping = { 'param': {
    'param3': {'match': {'meta': {'jsonpath': 'meta.*[?name = "Test"]'}}, 'value': {'data': {'jsonpath': 'content'}}}, #default: traverse to data branch
    'param4': {'match': {'meta': {'jsonpath': 'meta.*[?data_class_name = "GetRawFullTypeSafe.Data"]'}}, 'value': {'data': {'jsonpath': 'content'}}}, #value path relative to match path
}}
mapping = obj.resolve(mapping)
pprint(mapping)
        
l1 = lambda: False
obj = ChainableObject()
obj.apply({
    'func': "get_raw",
    'param': {'debug': False}
})
pprint(obj.dict())
mapping = { 'param': {
    'debug1': True,
    'debug2' : lambda: False,
    'debug3': {'static': False},
    #'debug4': {'eval': "hist[-1]['func']['name'] == 'get_raw'"},
    'param1': {'eval': "data['raw'][0]"},
    'param2': {'jsonpath': 'meta.*[?name = "raw"].label.de'}, #eval jsonpath
    'param3': {'match': {'meta': {'jsonpath': 'meta.*[?name = "raw"]'}}, 'value': {'data': {'jsonpath': '[0]'}}}, #default: traverse to data branch
    'param4': {'match': {'meta': {'jsonpath': 'meta.*[?name = "raw"]'}}, 'value': {'meta': {'jsonpath': 'label'}}}, #value path relative to match path
}}
mapping = obj.resolve(mapping)
pprint(mapping)
    
def get_mapping():
    return {
        'func': "Chainable.get_raw",
        'param': {'debug': {'static': False}}
    }
      
#Chainable.get_raw = GetRaw()
Chainable.get_raw = GetRawTypeSafe()
Chainable.remove_list_element = RemoveListElement()
obj = ChainableObject()
obj = obj.apply({
    'func': "Chainable.get_raw",
    'param': {'debug': False}
}).apply(get_mapping())
pprint(obj.dict())

workflow = [{
    'func': "Chainable.get_raw",
    'param': {'debug': False}
},{
    'func': "Chainable.get_raw",
    'param': {'debug': False, 'data_name': "RawVoltage"}
},{
    'func': "Chainable.remove_list_element",
    'param': {'l': {'match': {'meta': {'jsonpath': 'meta.*[?name = "RawVoltage"]'}}, 'value': {'data': {'jsonpath': ''}}}}
}]
obj2 = ChainableObject()
for step in workflow:
    obj2 = obj2.apply(step)
pprint(obj2.dict())

In [199]:
import asyncio
from time import sleep

class AsyncChainableContext(ChainableContext):
    async def run_async(self, workflow, obj = None):
        if (obj == None): obj = AsyncChainableObject()
        for step in workflow:
            obj = await obj.apply_parallel_async(step)
        return obj

class AsyncChainableObject(ChainableObject):
    done: Any = None
    async def apply_async(self, mapping: Union[dict, List[dict]]):
        #print("start")
        await loop.run_in_executor(None, super(__class__, self).apply, mapping)
        #print("stop")
        return self
        
    async def apply_parallel_async(self, mappings: Union[dict, List[dict]], timeout = None):
        if not isinstance(mappings, list):
            mappings = [mappings]
        jobs = []
        for mapping in mappings:
            jobs.append(self.apply_async(mapping))
        group_task = asyncio.gather(*jobs)
    
        try:
            if timeout == None: result = await group_task
            else: result = await asyncio.wait_for(group_task, 3)
            #print (result)
        except asyncio.CancelledError:
            print("Gather was cancelled")
        except asyncio.TimeoutError:
            print("Time's up!")
        
        #self.done(self)
        return self
    
    def apply_parallel(self, mappings: List[dict], timeout = None):
        #asyncio.run(testFunc())
        loop = asyncio.get_running_loop()
        tsk = loop.create_task(self.apply_parallel_async(mappings, timeout))
        tsk.add_done_callback(lambda t: t.result().done(t.result()))#(lambda t: print(f'Task done with result={t.result()}  << return val of main()'))
        return self
        
    def apply_sequential_async(self, mappings):
        #super(__class__, self).apply
        return self.apply_async(mappings)
    
    def apply_sequential(self, mappings):
        return self.apply(mappings)
        
class AsyncChainableFunction(ChainableFunction):    
    def __init__(self):
        super(__class__, self).__init__(name="async_function", uuid="0004")

    def func(self, param):
        sleep(1)
        print(param['msg'])
        if 'print_obj' in param and param['print_obj']: pprint(self.obj.dict())

Chainable.async_log = AsyncChainableFunction()          
obj = AsyncChainableObject()  
obj.apply_parallel([
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test1'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test2'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test3'}
    }
]).done = lambda obj : obj.apply({
    'func': 'Chainable.async_log', 'param': {'msg': 'test9', 'print_obj': False}
}).apply({
    'func': 'Chainable.async_log', 'param': {'msg': 'test10', 'print_obj': True}
})



{'msg': 'test1'}{'msg': 'test2'}

{'msg': 'test3'}
test1
test3
test2
{'msg': 'test9', 'print_obj': False}
test9
{'msg': 'test10', 'print_obj': True}
test10
{'data': {},
 'done': <function <lambda> at 0x7f28e22191b0>,
 'hist': [{'data': None,
           'func': {'end_time': datetime.datetime(2022, 5, 12, 18, 50, 54, 887721),
                    'name': 'async_function',
                    'obj': None,
                    'param_default': {},
                    'start_time': datetime.datetime(2022, 5, 12, 18, 50, 53, 887429),
                    'uuid': '0004'},
           'mapping': None,
           'param': {'msg': 'test1'}},
          {'data': None,
           'func': {'end_time': datetime.datetime(2022, 5, 12, 18, 50, 54, 889568),
                    'name': 'async_function',
                    'obj': None,
                    'param_default': {},
                    'start_time': datetime.datetime(2022, 5, 12, 18, 50, 53, 887429),
                    'uuid': '0004'},
           '

In [198]:
obj = AsyncChainableObject() 
await (await (await obj.apply_parallel_async([
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test1'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test2'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test3'}
    }
])).apply_sequential_async([
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test4'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test5'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test6'}
    }
])).apply_parallel_async([
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test7'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test8'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test9'}
    }
])
#pprint(obj.dict())

obj2 = AsyncChainableObject()  
context = AsyncChainableContext()
obj2 = await context.run_async([
    [
        {
            'func': 'Chainable.async_log', 'param': {'msg': 'test1'}
        },{
            'func': 'Chainable.async_log', 'param': {'msg': 'test2'}
        },{
            'func': 'Chainable.async_log', 'param': {'msg': 'test3'}
        }
    ],
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test4'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test5'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test6'}
    },
    [
        {
            'func': 'Chainable.async_log', 'param': {'msg': 'test7'}
        },{
            'func': 'Chainable.async_log', 'param': {'msg': 'test8'}
        },{
            'func': 'Chainable.async_log', 'param': {'msg': 'test9'}
        }
    ]
])
pprint(obj2.dict())

{'msg': 'test1'}
{'msg': 'test2'}
{'msg': 'test3'}
test1
test3
test2
{'msg': 'test4'}
test4
{'msg': 'test5'}
test5
{'msg': 'test6'}
test6
{'msg': 'test7'}
{'msg': 'test8'}
{'msg': 'test9'}
test8
test9test7

{'msg': 'test1'}{'msg': 'test2'}
{'msg': 'test3'}

test2
test3
test1
{'msg': 'test4'}
test4
{'msg': 'test5'}
test5
{'msg': 'test6'}
test6
{'msg': 'test7'}
{'msg': 'test8'}
{'msg': 'test9'}
test7
test8
test9
{'data': {},
 'done': None,
 'hist': [{'data': None,
           'func': {'end_time': datetime.datetime(2022, 5, 12, 18, 50, 23, 989285),
                    'name': 'async_function',
                    'obj': None,
                    'param_default': {},
                    'start_time': datetime.datetime(2022, 5, 12, 18, 50, 22, 989820),
                    'uuid': '0004'},
           'mapping': None,
           'param': {'msg': 'test2'}},
          {'data': None,
           'func': {'end_time': datetime.datetime(2022, 5, 12, 18, 50, 23, 990809),
                    'name': 'a

In [180]:
class AsyncChainableEventFunction(ChainableFunction):    
    def __init__(self):
        super(__class__, self).__init__(name="async_event_function", uuid="0005")
        
    async def loop(self, param):
        timer = 0
        while(timer < 3):
            await asyncio.sleep(1)
            #sleep(1)
            #self.obj.store_data(
            org_obj = copy.deepcopy(self.obj)
            self.obj.emit(org_obj)
            timer += 1
        print("Event loop done")
        
    def func(self, param):
        #asyncio.run(self.loop()) #for standard python
        loop = asyncio.get_running_loop() #for jupyter / ipython
        tsk = loop.create_task(self.loop(param))
    
Chainable.async_event = AsyncChainableEventFunction()          
obj = AsyncChainableObject()  
#obj = EmitableChainableObject() 
obj.apply({
    'func': 'Chainable.async_event', 'param': {'msg': 'test7'}
}).emit = lambda obj : obj.apply({
    'func': 'Chainable.async_log', 'param': {'msg': 'test9', 'print_obj': True}
})
.do

{'msg': 'test7'}
{'msg': 'test9', 'print_obj': True}
test9
{'data': {},
 'emit': <function <lambda> at 0x7f28e2632cb0>,
 'hist': [{'data': None,
           'func': {'end_time': datetime.datetime(2022, 5, 12, 17, 53, 41, 29534),
                    'name': 'async_event_function',
                    'obj': None,
                    'param_default': {},
                    'start_time': datetime.datetime(2022, 5, 12, 17, 53, 41, 29472),
                    'uuid': '0005'},
           'mapping': None,
           'param': {'msg': 'test7'}}],
 'mapping': {},
 'meta': {}}
{'msg': 'test9', 'print_obj': True}
test9
{'data': {},
 'emit': <function <lambda> at 0x7f28e2632cb0>,
 'hist': [{'data': None,
           'func': {'end_time': datetime.datetime(2022, 5, 12, 17, 53, 41, 29534),
                    'name': 'async_event_function',
                    'obj': None,
                    'param_default': {},
                    'start_time': datetime.datetime(2022, 5, 12, 17, 53, 41, 29472),
     

In [200]:
class Test:
    pass

t = Test()
t.a = 1
.b = 2

SyntaxError: invalid syntax (1878095829.py, line 6)

In [None]:
workflow = [[{
    'func': "Chainable.get_raw",
    'param': {'debug': False},
    'skip': {'eval': "not 'do_a' in meta['res'][0]"}
    'cond': {'eval': "meta['res']['do_a'][0] == True"}
},{
    'func': "Chainable.get_raw",
    'param': {'debug': False, 'data_name': "RawVoltage"}
}],{
    'func': "Chainable.remove_list_element",
    'param': {'l': {'match': {'meta': {'jsonpath': 'meta.*[?name = "RawVoltage"]'}}, 'value': {'data': {'jsonpath': ''}}}}
}]