In [10]:
from ipyparallel import Client
cl = Client()
lbv = cl.load_balanced_view()
    
with cl[:].sync_imports():
    import time
    import random

def tst_fun(par):
    tm = random.random()*3+1
    time.sleep(tm)
    return f"result of {par}, time {tm}"

print('запущено ' , len(cl[:]), ' ядер')

result = lbv.map(tst_fun, range(7), ordered=False)
for r in result:
    print(r)

importing time on engine(s)
importing random on engine(s)
запущено  4  ядер
result of 2, time 1.7041287677299382
result of 1, time 2.3876548631431214
result of 0, time 2.521168301807199
result of 6, time 1.2613714749530858
result of 3, time 3.9720367905305713
result of 4, time 3.277509779026752
result of 5, time 2.7059188646991124


In [9]:
lbv.map?

In [4]:
%%writefile OptiSwarm\ExecParallel.py

from rx import Observable
from rx.concurrency import ThreadPoolScheduler,CurrentThreadScheduler
from enum import Enum

ExState = Enum('ExState', 'paused playing error')

def param_res_wrapp(f):
    """
    Функция для обертки обычной фитнесс-функции,
    возвращает кортеж (патаметр, f(параметр))
    """
    def inner(par):
        return par, f(par)
    return inner

class ExecParallel(object):
    """
    класс-обертка для вычислительного кластера
    Поля класса:
        client     - ссылка на ipyparallel.Client()
        lbv        - ссылка на client.load_balanced_view()
        fit_fun    - ссылка на фитнесс-функцию. Функция принимает 1 аргумент (хромосому)
        async_res  - ссылка на async result от lbv.map_async(...)
        stream     - Observable поток кортежей результатов вычисления (патаметр, результат вычисления)
    """
    def __init__(self, client, fit_fun):
        """
        Конструктор -__- класса ExecParallel
        
        Аргумены:
            client     - ссылка на ipyparallel.Client()
            fit_fun    - ссылка на фитнесс-функцию. Функция принимает 1 аргумент (хромосому)
        """
        try:           
            self.client = client
            self.lbv = client.load_balanced_view()
            self.fit_fun = fit_fun
            self._state = ExState.paused
            self.scheduler=ThreadPoolScheduler()
            self.async_res = None
            self.stream = None
        except:
            self._state = ExState.error
        
    def state(self):
        """
        Состояние, в котором находится объект:
            ExState.paused 
            ExState.playing 
            ExState.error   
        """
        return self._state
   
    def execute(self,params):
        """
        основная функция. Функция высшего порядка map для функции fit_fun. Для вычисления используется кластер client/lbv.
        Функция возвращает Observable-поток, на который можно подписаться) Поток состоит из кортежа
        (параметр, fit_fun(параметр))
        
        Аргументы:
            params - список/iterable объект, содержащий элементы-аргументы для fit_fun 
        """

        self.async_res = self.lbv.map_async(param_res_wrapp(self.fit_fun),params,ordered = False)
        def callb():
            self._state = ExState.paused
        self.stream = Observable.from_(iter(self.async_res), scheduler=self.scheduler)\
            .publish().auto_connect(2)
        self.stream.subscribe(on_completed=callb)
        self._state = ExState.playing
        return self.stream
        
    def wait(self, timeout=-1):
        """
        Wait until the result is available or until `timeout` miliseconds pass.
        
        Возвращает 'beep beep', если сработал таймер (таймер истек раньше, чем закончились расчеты),
        или 0, если расчеты закончились раньше
        """
        if self.stream is None:
            return 0
        
        if timeout<0:
            lasty = self.stream.to_blocking().last_or_default(0)
            return 0
        
        timer_message = 'beep beep'
        timer = Observable.timer(timeout).map(lambda x: timer_message)
        lasty = self.stream.last_or_default(0)\
            .amb(timer)\
            .observe_on(CurrentThreadScheduler())\
            .to_blocking()\
            .first()
        return timer_message if lasty == timer_message else 0
    def abort(self):
        """
        Отменяет расчет,
        врзвращает количество отмененных задач
        """
        try:  
            if self.async_res is None:
                return 0
            if self.stream is None:
                return 0
           # if self.async_res.
            results = []
            self.stream\
                .observe_on(CurrentThreadScheduler())\
                .filter(lambda x: not isinstance(x,tuple))\
                .subscribe(on_next = lambda x: results.append(x))

            self.async_res.abort()

            self.wait()
            return len(results)
        except:
            return 0
            
if __name__ == '__main__':
    from ipyparallel import Client
    cl = Client()
    lbv = cl.load_balanced_view()
    
    with cl[:].sync_imports():
        import time
        import random
        
    def tst_fun(par):
        tm = random.random()*3+1
        time.sleep(tm)
        return f"result of {par}, time {tm}"
    
    print('запущено ' , len(cl[:]), ' ядер')
    ex = ExecParallel(cl,tst_fun)
    n = 33
    obs = ex.execute(range(n))
    results = []
    def onnext(x):
        print(x)
        results.append(x)
    
    obs.filter(lambda x: isinstance(x,tuple))\
        .subscribe(on_next=onnext,
                   on_completed = lambda: print('done'),
                   on_error=lambda e: print('error ',e))
        
    print('запущено ' , n, ' задач')
    t = 2000
    res = ex.wait(t)
    print('отменено спустя' , t, ' мс, код = ', res)
    aborted = ex.abort()
    print('отменено (не посчитано) ' , aborted, ' шт.')
    print('посчитано               ' , len(results), ' шт.')
    print('всего                   ' , aborted+len(results), ' шт.')

Overwriting OptiSwarm\ExecParallel.py


In [11]:
from ipyparallel import Client
cl = Client()
lbv = cl.load_balanced_view()

with cl[:].sync_imports():
    import time
    import random

def tst_fun(par):
    tm = random.random()*3+1
    time.sleep(tm)
    return f"result of {par}, time {tm}"

ex = ExecParallel(cl,tst_fun)
n = 22
obs = ex.execute(range(n))
results = []
obs.filter(lambda x: isinstance(x,tuple))\
    .subscribe(on_next=lambda x: print(x, ex.state(), results.append(x)),
               on_completed = lambda: print('done'),
               on_error=lambda e: print('error ',e))
print('запущено ' , n, ' шт.')
t = 2000
res = ex.wait(t)
print('отменено спустя' , t, ' мс, код = ', res)
aborted = ex.abort()
print('отменено (не посчитано) ' , aborted, ' шт.')
print('посчитано               ' , len(results), ' шт.')
print('всего                   ' , aborted+len(results), ' шт.')


importing time on engine(s)
importing random on engine(s)
запущено  22  шт.
(2, 'result of 2, time 1.5291243270912127') ExState.playing None
(3, 'result of 3, time 1.8269372725750472') ExState.playing None
отменено спустя 2000  мс, код =  beep beep
(5, 'result of 5, time 2.1058941732529277') ExState.playing None
(7, 'result of 7, time 2.2400392504406774') ExState.playing None
(4, 'result of 4, time 2.850995926237483') ExState.playing None
(8, 'result of 8, time 1.6611461865838462') ExState.playing None
(0, 'result of 0, time 3.4586165670715374') ExState.playing None
(6, 'result of 6, time 3.6966983883273885') ExState.playing None
(1, 'result of 1, time 3.929813843997981') ExState.playing None
(9, 'result of 9, time 3.639481302504251') ExState.playing None
done
отменено (не посчитано)  12  шт.
посчитано                10  шт.
всего                    22  шт.


In [15]:
ex.stream.

False

In [32]:
Observable.timer(1000).subscribe(on_next=lambda x: print(x), on_completed=lambda:print('done'))

<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x27178a64588>

0
done


In [112]:
ex.async_res.


TypeError: 'TaskAborted' object is not iterable

In [100]:
a = 0
def inita(b):
    a = b
c = obs.observe_on(CurrentThreadScheduler())\
    .to_blocking().count().subscribe(on_next=lambda x: inita(x))
a

0