In [2]:
import sys
sys.path.append('../')

from src.chainables import Chainable, ChainableObject, ChainableFunction, TypeSafeChainableFunction
from typing import List, Optional, Any, Union
from pprint import pprint

class GetRaw(ChainableFunction):
    def __init__(self):
        super().__init__(name="get_raw", uuid="0001")
        super().set_default_params({'debug': True, 'data_name':"raw"})

    def func(self, param):
        super().store('raw', [1,2,3,4], {'type': "list", 'dim': 1, 'name': param['data_name'], 'label': {'de':"Spannung"}, 'quant': "qudt:Voltage", 'unit':"qudt:mV", 'test':{'nested': "value"}})
    
class GetRawTypeSafe(TypeSafeChainableFunction):
    class Param(TypeSafeChainableFunction.Param):
        data_name: str = "raw"
    
    def __init__(self):
        super(__class__, self).__init__(name="get_raw", uuid="0001", param_class=__class__.Param)

    def func(self, param):
        #param.debug
        super().store('raw', [1,2,3,4], {'type': "list", 'dim': 1, 'name': param.data_name, 'label': {'de':"Spannung"}, 'quant': "qudt:Voltage", 'unit':"qudt:mV", 'test':{'nested': "value"}}) 

class GetRawFullTypeSafe(TypeSafeChainableFunction):
    class Param(TypeSafeChainableFunction.Param):
        data_name: str = "raw"
        
    class Data(TypeSafeChainableFunction.Data):
        content: List[int]
        
    class Meta(TypeSafeChainableFunction.Meta):
        name: str
        label: dict[str, str]
        quant: str
    
    def __init__(self):
        super(__class__, self).__init__(name="get_raw", uuid="0001", param_class=__class__.Param, data_class=__class__.Data, meta_class=__class__.Meta)

    def func(self, param):
        super().store('raw', {'content':[1,2,3,4]}, {'type': "list", 'dim': 1, 'name': param.data_name, 'label': {'de':"Spannung"}, 'quant': "qudt:Voltage", 'unit':"qudt:mV", 'test':{'nested': "value"}}) 

class RemoveListElement(TypeSafeChainableFunction):
    class Param(TypeSafeChainableFunction.Param):
        l: List[Any]
    
    def __init__(self):
        super(RemoveListElement, self).__init__(name="remove_list_element", uuid="0003", param_class=RemoveListElement.Param)

    def func(self, param):
        print(type(param))
        print(param.l)
        del param.l[-1]
        print(param.l)

In [3]:
#m = TypeSafeChainableFunction.Meta()
#print(type(m))
#GetRaw.name
get_raw = GetRaw()
obj = ChainableObject()
obj = get_raw.apply(obj)
pprint(obj.dict())
get_raw2 = GetRawTypeSafe()
obj = ChainableObject()
obj = get_raw2.apply(obj, {'debug': True, 'data_name': "Test"})
pprint(obj.dict())
get_raw3 = GetRawFullTypeSafe()
obj = ChainableObject()
obj = get_raw3.apply(obj, {'debug': True, 'data_name': "Test"})
pprint(obj.dict())
#print(obj.meta['raw'][0].dict())
#pprint(obj.meta['raw'][0].data_class.schema())
#pprint(type(obj.meta['raw'][0].data_class))
mapping = { 'param': {
    'param3': {'match': {'meta': {'jsonpath': 'meta.*[?name = "Test"]'}}, 'value': {'data': {'jsonpath': 'content'}}}, #default: traverse to data branch
    'param4': {'match': {'meta': {'jsonpath': 'meta.*[?data_class_name = "GetRawFullTypeSafe.Data"]'}}, 'value': {'data': {'jsonpath': 'content'}}}, #value path relative to match path
}}
mapping = obj.resolve(mapping)
pprint(mapping)

Chainable.get_raw = GetRaw()
l1 = lambda: False
obj = ChainableObject()
obj.apply({
    'func': "Chainable.get_raw",
    'param': {'debug': False}
})
pprint(obj.dict())
mapping = { 'param': {
    'debug1': True,
    'debug2' : lambda: False,
    'debug3': {'static': False},
    #'debug4': {'eval': "hist[-1]['func']['name'] == 'get_raw'"},
    'param1': {'eval': "data['raw'][0]"},
    'param2': {'jsonpath': 'meta.*[?name = "raw"].label.de'}, #eval jsonpath
    'param3': {'match': {'meta': {'jsonpath': 'meta.*[?name = "raw"]'}}, 'value': {'data': {'jsonpath': '[0]'}}}, #default: traverse to data branch
    'param4': {'match': {'meta': {'jsonpath': 'meta.*[?name = "raw"]'}}, 'value': {'meta': {'jsonpath': 'label'}}}, #value path relative to match path
}}
mapping = obj.resolve(mapping)
pprint(mapping)
    
def get_mapping():
    return {
        'func': "Chainable.get_raw",
        'param': {'debug': {'static': False}}
    }
      
#Chainable.get_raw = GetRaw()
Chainable.get_raw = GetRawTypeSafe()
Chainable.remove_list_element = RemoveListElement()
obj = ChainableObject()
obj = obj.apply({
    'func': "Chainable.get_raw",
    'param': {'debug': False}
}).apply(get_mapping())
pprint(obj.dict())

workflow = [{
    'func': "Chainable.get_raw",
    'param': {'debug': False}
},{
    'func': "Chainable.get_raw",
    'param': {'debug': False, 'data_name': "RawVoltage"}
},{
    'func': "Chainable.remove_list_element",
    'param': {'l': {'match': {'meta': {'jsonpath': 'meta.*[?name = "RawVoltage"]'}}, 'value': {'data': {'jsonpath': ''}}}}
}]
obj2 = ChainableObject()
for step in workflow:
    obj2 = obj2.apply(step)
pprint(obj2.dict())

{'data_name': 'raw', 'debug': True}
{'data': {'raw': [[1, 2, 3, 4]]},
 'emit': None,
 'hist': [{'data': None,
           'func': {'end_time': datetime.datetime(2022, 6, 3, 3, 9, 45, 492473),
                    'name': 'get_raw',
                    'obj': None,
                    'param_default': {'data_name': 'raw', 'debug': True},
                    'start_time': datetime.datetime(2022, 6, 3, 3, 9, 45, 492440),
                    'uuid': '0001'},
           'mapping': None,
           'param': {'data_name': 'raw', 'debug': True}}],
 'mapping': {},
 'meta': {'raw': [{'dim': 1,
                   'label': {'de': 'Spannung'},
                   'name': 'raw',
                   'quant': 'qudt:Voltage',
                   'test': {'nested': 'value'},
                   'type': 'list',
                   'unit': 'qudt:mV'}]}}
{'data': {'raw': [[1, 2, 3, 4]]},
 'emit': None,
 'hist': [{'data': None,
           'func': {'data_class': None,
                    'end_time': datetime.dateti

In [None]:
from src.chainables import AsyncChainableObject, AsyncChainableContext, AsyncChainableEventFunction
from time import sleep

class AsyncChainableFunction(ChainableFunction):    
    def __init__(self):
        super(__class__, self).__init__(name="async_function", uuid="0004")

    def func(self, param):
        sleep(1)
        print(param['msg'])
        if 'print_obj' in param and param['print_obj']: pprint(self.obj.dict())

Chainable.async_log = AsyncChainableFunction()          
obj = AsyncChainableObject()  
obj.apply_parallel([
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test1'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test2'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test3'}
    }
]).done = lambda obj : obj.apply({
    'func': 'Chainable.async_log', 'param': {'msg': 'test9', 'print_obj': False}
}).apply({
    'func': 'Chainable.async_log', 'param': {'msg': 'test10', 'print_obj': True}
})



In [None]:
obj = AsyncChainableObject() 
await (await (await obj.apply_parallel_async([
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test1'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test2'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test3'}
    }
])).apply_sequential_async([
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test4'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test5'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test6'}
    }
])).apply_parallel_async([
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test7'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test8'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test9'}
    }
])
#pprint(obj.dict())

obj2 = AsyncChainableObject()  
context = AsyncChainableContext()
obj2 = await context.run_async([
    [
        {
            'func': 'Chainable.async_log', 'param': {'msg': 'test1'}
        },{
            'func': 'Chainable.async_log', 'param': {'msg': 'test2'}
        },{
            'func': 'Chainable.async_log', 'param': {'msg': 'test3'}
        }
    ],
    {
        'func': 'Chainable.async_log', 'param': {'msg': 'test4'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test5'}
    },{
        'func': 'Chainable.async_log', 'param': {'msg': 'test6'}
    },
    [
        {
            'func': 'Chainable.async_log', 'param': {'msg': 'test7'}
        },{
            'func': 'Chainable.async_log', 'param': {'msg': 'test8'}
        },{
            'func': 'Chainable.async_log', 'param': {'msg': 'test9'}
        }
    ]
])
pprint(obj2.dict())

In [None]:
from abc import ABCMeta, abstractmethod
import asyncio
        
class EventGenerator(AsyncChainableEventFunction):    
    def __init__(self):
        super(__class__, self).__init__(name="event_generator", uuid="0005")
        
    async def loop(self, param):
        timer = 0
        while(timer < 3):
            await asyncio.sleep(1)
            #sleep(1)
            #self.obj.store_data(
            self.emit()
            timer += 1
        print("Event loop done")
    
Chainable.async_event = EventGenerator()          
obj = AsyncChainableObject()  
#obj = EmitableChainableObject()
print(type(obj))
obj.apply({
    'func': 'Chainable.async_event', 'param': {'msg': 'test7'}
}).emit = lambda obj : obj.apply({
    'func': 'Chainable.async_log', 'param': {'msg': 'test9', 'print_obj': True}
})


In [None]:
workflow = [[{
    'func': "Chainable.get_raw",
    'param': {'debug': False},
    'skip': {'eval': "not 'do_a' in meta['res'][0]"}
    'cond': {'eval': "meta['res']['do_a'][0] == True"}
},{
    'func': "Chainable.get_raw",
    'param': {'debug': False, 'data_name': "RawVoltage"}
}],{
    'func': "Chainable.remove_list_element",
    'param': {'l': {'match': {'meta': {'jsonpath': 'meta.*[?name = "RawVoltage"]'}}, 'value': {'data': {'jsonpath': ''}}}}
}]