In [1]:
import logging
import time
from functools import reduce
from joblib import Parallel, delayed, parallel_config
from pipeline import PipeLine, Data
# from joblib.externals.loky import set_loky_pickler
# from joblib import wrap_non_picklable_objects

logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler()])
#set_loky_pickler('cloudpickle')


logger = logging.getLogger(__name__)

## Simple Pipeline Implementation

In [2]:
def quare(x):
    time.sleep(.2)
    return x*x

def add_5(x):
    time.sleep(.2)
    return x+5

def mul_6(x):
    time.sleep(.1)
    return x*6

def make_pipeline(list_of_callables):
    return reduce(lambda f,g: lambda x: g(f(x)), list_of_callables)

class Ensemble:
    def __init__(self, transformers):
        self.transformers = transformers
        
    def __call__(self, data):
        # list_of_callables = [component for component in self.transformers ]
        # final_func = reduce(lambda f, g: lambda x: g(f(x)), self.transformers)
        data_t = data
        for component in self.transformers:
            data_t = component(data_t)
        return  data_t   

pline = Ensemble([quare,add_5,mul_6])



## Testing With out multiprocessing

In [3]:
st_time = time.time()
data_ = [pline(i) for i in range(1,100) ]
logger.info(f'E2E Time {time.time() - st_time } sec, {data_[-5:]}')

INFO:__main__:E2E Time 51.722574949264526 sec, [54180, 55326, 56484, 57654, 58836]


## Testing with Multiprocessing [ It is Failing]

In [4]:
logger.info(f'Starting at {time.localtime()}')
st_time = time.time()
with parallel_config(backend="multiprocessing", n_jobs=-1, verbose=10):
    # results = Parallel()(delayed(pline)(item) for item in range(1,100))
logger.info(f'E2E Time {time.time() - st_time } sec, {results[-5:]}')

IndentationError: expected an indented block (2488962149.py, line 5)

### Testing with LOKY BACKEND

In [5]:
logger.info(f'Starting at {time.localtime()}')
st_time = time.time()
with parallel_config(backend="loky", n_jobs=-1, verbose=10):
    #delayed_funcs = [delayed(pline)(item) for item in data_]
    results = Parallel()(delayed(pline)(item) for item in range(1,100))
logger.info(f'E2E Time {time.time() - st_time } sec, {results[-5:]}')

INFO:__main__:Starting at time.struct_time(tm_year=2024, tm_mon=2, tm_mday=11, tm_hour=20, tm_min=49, tm_sec=54, tm_wday=6, tm_yday=42, tm_isdst=0)
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    0.7s
[Parallel(n_jobs=-1)]: Done   8 tasks      | elapsed:    0.7s
[Parallel(n_jobs=-1)]: Done  17 tasks      | elapsed:    1.3s
[Parallel(n_jobs=-1)]: Done  26 tasks      | elapsed:    1.8s
[Parallel(n_jobs=-1)]: Done  37 tasks      | elapsed:    2.2s
[Parallel(n_jobs=-1)]: Done  48 tasks      | elapsed:    2.3s
[Parallel(n_jobs=-1)]: Done  61 tasks      | elapsed:    3.3s
[Parallel(n_jobs=-1)]: Done  74 tasks      | elapsed:    3.9s
[Parallel(n_jobs=-1)]: Done  86 out of  99 | elapsed:    4.4s remaining:    0.6s
[Parallel(n_jobs=-1)]: Done  96 out of  99 | elapsed:    4.4s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done  99 out of  99 | elapsed:    4.9s finished
INFO:__main__:E2E Time 5.019072532653809 sec, 

In [6]:
print(results)

[36, 54, 84, 126, 180, 246, 324, 414, 516, 630, 756, 894, 1044, 1206, 1380, 1566, 1764, 1974, 2196, 2430, 2676, 2934, 3204, 3486, 3780, 4086, 4404, 4734, 5076, 5430, 5796, 6174, 6564, 6966, 7380, 7806, 8244, 8694, 9156, 9630, 10116, 10614, 11124, 11646, 12180, 12726, 13284, 13854, 14436, 15030, 15636, 16254, 16884, 17526, 18180, 18846, 19524, 20214, 20916, 21630, 22356, 23094, 23844, 24606, 25380, 26166, 26964, 27774, 28596, 29430, 30276, 31134, 32004, 32886, 33780, 34686, 35604, 36534, 37476, 38430, 39396, 40374, 41364, 42366, 43380, 44406, 45444, 46494, 47556, 48630, 49716, 50814, 51924, 53046, 54180, 55326, 56484, 57654, 58836]


## Same Pipleline with Little complex data  

In [7]:
def quare__d(x):
    time.sleep(.2)
    ll = [ i*i for i in x.obj ]
    return x.replicate(obj = ll)

def add__d(x):
    time.sleep(.2)
    ll = [ i + 5 for i in x.obj ]
    return x.replicate(obj = ll)

def mul__d(x):
    time.sleep(.1)
    ll = [ i * 6 for i in x.obj ]
    return x.replicate(obj = ll)


pipeline11 = Ensemble([quare__d,add__d,mul__d])

In [8]:
data_ = [
        Data(identifier="1", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="2", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="3", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="4", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="5", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="6", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="7", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="8", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="9", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="10", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="11", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
        Data(identifier="12", metadata={}, obj=[1, 2, 2, 3, 4, 5, 6, 7, 8, 9]),
    ]

In [9]:
logger.info(f'Starting at {time.localtime()}')
st_time = time.time()

with parallel_config(backend="loky", verbose=10):
    #delayed_funcs = [delayed(pipeline11)(item) for item in data_]
    results = Parallel()(delayed(pipeline11)(item) for item in data_)

logger.info(f'E2E Time {time.time() - st_time } sec, {results[-5:]}')

INFO:__main__:Starting at time.struct_time(tm_year=2024, tm_mon=2, tm_mday=11, tm_hour=20, tm_min=50, tm_sec=7, tm_wday=6, tm_yday=42, tm_isdst=0)
[Parallel(n_jobs=1)]: Done   1 tasks      | elapsed:    0.4s
[Parallel(n_jobs=1)]: Done   4 tasks      | elapsed:    2.0s
[Parallel(n_jobs=1)]: Done   7 tasks      | elapsed:    3.6s
[Parallel(n_jobs=1)]: Done  12 tasks      | elapsed:    6.2s
[Parallel(n_jobs=1)]: Done  12 tasks      | elapsed:    6.2s
INFO:__main__:E2E Time 6.291088104248047 sec, [Data(identifier='8', metadata={}, obj=[36, 54, 54, 84, 126, 180, 246, 324, 414, 516]), Data(identifier='9', metadata={}, obj=[36, 54, 54, 84, 126, 180, 246, 324, 414, 516]), Data(identifier='10', metadata={}, obj=[36, 54, 54, 84, 126, 180, 246, 324, 414, 516]), Data(identifier='11', metadata={}, obj=[36, 54, 54, 84, 126, 180, 246, 324, 414, 516]), Data(identifier='12', metadata={}, obj=[36, 54, 54, 84, 126, 180, 246, 324, 414, 516])]


## Our Implementations

In [10]:
config_path = "./config.cfg"
pipeline = PipeLine.load(config_path)

In [11]:
st_time = time.time()
results = pipeline(data_)
logger.info(f'E2E Time {time.time() - st_time } sec, {results[-5:]}')

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    1.4s
[Parallel(n_jobs=-1)]: Done   3 out of  12 | elapsed:    1.4s remaining:    4.5s
[Parallel(n_jobs=-1)]: Done   5 out of  12 | elapsed:    1.5s remaining:    2.1s
[Parallel(n_jobs=-1)]: Done   7 out of  12 | elapsed:    1.5s remaining:    1.1s
[Parallel(n_jobs=-1)]: Done   9 out of  12 | elapsed:    1.5s remaining:    0.4s
[Parallel(n_jobs=-1)]: Done  12 out of  12 | elapsed:    1.8s finished
INFO:__main__:E2E Time 1.8994622230529785 sec, [<pipeline.db.orm.Results object at 0x000001FF1CCE24F0>, <pipeline.db.orm.Results object at 0x000001FF1CC98550>, <pipeline.db.orm.Results object at 0x000001FF1CCE2B80>, <pipeline.db.orm.Results object at 0x000001FF1CC987C0>, <pipeline.db.orm.Results object at 0x000001FF1CC95430>]


In [12]:
results

[<pipeline.db.orm.Results at 0x1ff1cc98130>,
 <pipeline.db.orm.Results at 0x1ff1cc98160>,
 <pipeline.db.orm.Results at 0x1ff1cc98310>,
 <pipeline.db.orm.Results at 0x1ff1a7b5f10>,
 <pipeline.db.orm.Results at 0x1ff1cc95130>,
 <pipeline.db.orm.Results at 0x1ff1cc98af0>,
 <pipeline.db.orm.Results at 0x1ff1cce28e0>,
 <pipeline.db.orm.Results at 0x1ff1cce24f0>,
 <pipeline.db.orm.Results at 0x1ff1cc98550>,
 <pipeline.db.orm.Results at 0x1ff1cce2b80>,
 <pipeline.db.orm.Results at 0x1ff1cc987c0>,
 <pipeline.db.orm.Results at 0x1ff1cc95430>]

In [None]:
st_time = time.time()
with parallel_config(backend="loky", n_jobs=-1, verbose=10):
    results = Parallel()(delayed(pipeline)(item) for item in data_)
logger.info(f'E2E Time {time.time() - st_time } sec, {results[-5:]}')

In [None]:
import pickle
pickle.dumps(pipeline)

In [12]:
# pipeline(data_)