# Процессы

In [3]:
import random
import os
import time
import threading
import multiprocessing
import os

In [6]:
# Типы задач
def cpu_bound_task(n):
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        return fib(n-1) + fib(n-2)

    print(f'{n} -> {fib(n)}')


def io_bound_task(ind):
    print('start task {ind}'.format(ind=ind))
    time.sleep(5)
    print('finish task {ind}\n'.format(ind=ind), end='')

In [5]:
%%time

# CPU bound задача на потоках
tasks = []

for number in [36, 35, 34, 33, 32, 31, 30, 25, 20, 15, 10]:
    task = threading.Thread(target=cpu_bound_task, args=(number,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

20 -> 6765
15 -> 610
25 -> 75025
10 -> 55
30 -> 832040
31 -> 1346269
32 -> 2178309
33 -> 3524578
34 -> 5702887
35 -> 9227465
36 -> 14930352
CPU times: user 13.5 s, sys: 52 ms, total: 13.6 s
Wall time: 13.5 s


In [9]:
%%time

# CPU bound задача на процессах
tasks = []

for number in [36, 35, 34, 33, 32, 31, 30, 25, 20, 15, 10]:
    task = multiprocessing.Process(target=cpu_bound_task, args=(number,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()


15 -> 610
10 -> 55
20 -> 6765
25 -> 75025
30 -> 832040
31 -> 1346269
32 -> 2178309
33 -> 3524578
34 -> 5702887
35 -> 9227465
36 -> 14930352
CPU times: user 8 ms, sys: 36 ms, total: 44 ms
Wall time: 5.77 s


In [10]:
%%time

# IO bound задача на потоках
tasks = []

for ind in range(10):
    task = threading.Thread(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

start task 0
start task 1
start task 2
start task 3
start task 4
start task 5
start task 6start task 7

start task 8
start task 9
finish task 0
finish task 1
finish task 5
finish task 6
finish task 2
finish task 3
finish task 4
finish task 7
finish task 8
finish task 9
CPU times: user 16 ms, sys: 4 ms, total: 20 ms
Wall time: 5.02 s


In [11]:
%%time

# IO bound задача на процессах
tasks = []

for ind in range(10):
    task = multiprocessing.Process(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

start task 0
start task 1
start task 2
start task 3
start task 4
start task 5
start task 6
start task 7
start task 8
start task 9
finish task 0
finish task 1
finish task 2
finish task 3
finish task 5
finish task 6
finish task 7
finish task 8
finish task 4
finish task 9
CPU times: user 32 ms, sys: 28 ms, total: 60 ms
Wall time: 5.07 s


In [16]:
# Получение идентификатора процесса
def io_bound_task(ind):
    print('start task {ind} with {pid} and {ppid}'.format(ind=ind, pid=os.getpid(), ppid=os.getppid()))
    time.sleep(2)
    print('finish task {ind}'.format(ind=ind))
    
tasks = []

for ind in range(10):
    task = multiprocessing.Process(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

print('start task  with {pid} and {ppid}'.format(pid=os.getpid(), ppid=os.getppid()))

start task 2 with 24291 and 23568
start task 0 with 24289 and 23568
start task 1 with 24290 and 23568
start task 3 with 24298 and 23568
start task 4 with 24301 and 23568
start task 5 with 24302 and 23568
start task 7 with 24310 and 23568
start task 6 with 24307 and 23568
start task 8 with 24313 and 23568
start task 9 with 24315 and 23568
finish task 5
finish task 2
finish task 1
finish task 0
finish task 3
finish task 4
finish task 7
finish task 6
finish task 8
finish task 9
start task  with 23568 and 23020


In [17]:
# Пулл процессов
from multiprocessing import Pool

def getpid(n):
    time.sleep(2)
    return os.getpid()

with Pool(3) as p:
    print(p.map(getpid, range(5)))

[24396, 24395, 24397, 24397, 24396]


In [19]:
# Шаринг ресурсов
share_memory = {
    'count': 0,
}
def share_memory_task():
    print('read {count}'.format(count=share_memory["count"]))
    share_memory['count'] += 1
    print('write {count}'.format(count=share_memory["count"]))

tasks = []

for _ in range(5):
    task = multiprocessing.Process(target=share_memory_task)
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

read 0
read 0
read 0
write 1
write 1
read 0
read 0
write 1
write 1
write 1


In [35]:
# Шаринг ресурсов через файл
filename = 'share_memory.tmp'

with open(filename, 'w') as fd:
    fd.write('0')

def share_memory_task():
    with open(filename) as fd:
        count = int(fd.read())
    print('read {count}'.format(count=count))

    with open(filename, 'w') as fd:
        fd.write(str(count + 1))
    print('write {count}'.format(count=count + 1))


tasks = []

for _ in range(10):
    task = multiprocessing.Process(target=share_memory_task)
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

with open(filename) as fd:
    print('COUNT: {count}'.format(count=fd.read()))

read 0
read 1
write 1
read 0
read 1
write 2
write 2
write 1
read 1
write 2
read 1
write 2
read 2
write 3
read 3
read 3
write 4
read 3
write 4
write 4
COUNT: 4


In [4]:
# Шаринг ресурсов через файл с блокировкой
filename = 'share_memory.tmp'

# TODO исправить ошибку
# Ищите помощь в локументации https://docs.python.org/3.6/library/multiprocessing.html
from multiprocessing import Lock
lock = Lock()

with open(filename, 'w') as fd:
    fd.write('0')

def share_memory_task(lock):
    with lock:
        with open(filename) as fd:
            count = int(fd.read())
        print('read {count}'.format(count=count))

        with open(filename, 'w') as fd:
            fd.write(str(count + 1))
        print('write {count}'.format(count=count + 1))


tasks = []

for _ in range(10):
    task = multiprocessing.Process(target=share_memory_task, args=(lock,))
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

with open(filename) as fd:
    print('COUNT: {count}'.format(count=fd.read()))

read 0
write 1
read 1
write 2
read 2
read 3
write 3
write 4
read 4
write 5
read 5
write 6
read 6
write 7
read 7
write 8
read 8
write 9
read 9
write 10
COUNT: 10


In [5]:
# Шаринг ресурсов через очередь
from multiprocessing import Queue

q = Queue()
q.put(0)

lock = multiprocessing.Lock()

def share_memory_task(q, lock):
    with lock:
        count = q.get()
        print('read {count}'.format(count=count))

        q.put(count + 1)
        print('write {count}'.format(count=count + 1))


tasks = []

for _ in range(10):
    task = multiprocessing.Process(target=share_memory_task, args=(q, lock, ))
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

print('COUNT: {count}'.format(count=q.get()))

read 0
write 1
read 1
write 2
read 2
write 3
read 3
write 4
read 4
write 5
read 5
write 6
read 6
write 7
read 7
write 8
read 8
write 9
read 9
write 10
COUNT: 10


# Ассинхронность

In [1]:
import asyncio
import random
import time

loop = asyncio.get_event_loop()

async def sleep(delay):
    await asyncio.sleep(delay)

In [2]:
async def io_bound_task(group_name):
    print('group is "{group_name}"'.format(group_name=group_name))
    await sleep(2)
    count = random.randint(100, 200)
    print('count {count}'.format(count=count))
    return count

In [3]:
%%time

# Последовательное выполнение
async def main():
    count = 0
    for name in ['Amir', 'Zarina', 'Misha', 'Ilya', 'Igor']:
        count += await io_bound_task(name)
    print('COUNT: {count}'.format(count=count))

loop.run_until_complete(main())

RuntimeError: Event loop is running.

group is "Amir"
count 112
group is "Zarina"
count 120
group is "Misha"
count 176
group is "Ilya"
count 111
group is "Igor"


ERROR:root:Invalid alias: The name clear can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name more can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name less can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name man can't be aliased because it is another magic command.


count 184
COUNT: 703


In [2]:
%%time
import asyncio

loop = asyncio.get_event_loop()
# Совместное выполнение
async def main():
    tasks = []
    count = 0
    for name in ['Amir', 'Zarina', 'Misha', 'Ilya', 'Igor']:
        tasks.append(io_bound_task(name))
        
    results = await asyncio.gather(*tasks)
    print('COUNT: {count}'.format(count=sum(results)))

loop.run_until_complete(main())

RuntimeError: Event loop is running.

ERROR:root:Invalid alias: The name clear can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name more can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name less can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name man can't be aliased because it is another magic command.


In [1]:
%%time
import asyncio

loop = asyncio.get_event_loop()

# Блокирущая операция
async def sleep(delay):
    time.sleep(delay)

loop.run_until_complete(main())

NameError: name 'main' is not defined

In [18]:
# Обработка ошибок
async def sleep(delay):
    await asyncio.sleep(delay)
    
async def error_task():
    raise ValueError 
    
async def main():
    tasks = [error_task()]
    for name in ['Amir', 'Zarina', 'Misha', 'Ilya', 'Igor']:
        tasks.append(io_bound_task(name))
        
    try:
        results = await asyncio.gather(*tasks)
    except ValueError:
        print('Error')
    else:
        print(f'COUNT: {sum(results)}')

loop.run_until_complete(main())

group is "Misha"
group is "Zarina"
group is "Amir"
group is "Ilya"
group is "Igor"
Error


In [19]:
# Обработка ошибок
async def main():
    tasks = [error_task()]
    
    for name in ['Amir', 'Zarina', 'Misha', 'Ilya', 'Igor']:
        tasks.append(io_bound_task(name))
        
    count = 0
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # TODO Посчитать сумму участников 
            
    print(f'COUNT: {count}')

loop.run_until_complete(main())

group is "Zarina"
group is "Igor"
group is "Misha"
group is "Ilya"
group is "Amir"
count 155
count 149
count 122
count 183
count 120
count 160
count 176
count 152
count 110
count 146
COUNT: 0


# Итераторы

In [8]:
# Утиная типизация
class FibIterator:
    def __init__(self, n):
        self.n = n
        
    def __iter__(self):
        self.prev, self.cur = 0, 1
        self.ind = 0
        return self
    
    def __next__(self):
        if self.ind == self.n:
            raise StopIteration
    
        self.ind += 1
        self.prev, self.cur = self.cur, self.cur + self.prev
        return self.ind, self.prev

for ind, num in FibIterator(20):
    print('{} -> {}'.format(ind, num))

1 -> 1
2 -> 1
3 -> 2
4 -> 3
5 -> 5
6 -> 8
7 -> 13
8 -> 21
9 -> 34
10 -> 55
11 -> 89
12 -> 144
13 -> 233
14 -> 377
15 -> 610
16 -> 987
17 -> 1597
18 -> 2584
19 -> 4181
20 -> 6765


In [21]:
# Реализуем свой цикл for
fib_seq = iter(FibIterator(20))
while True:
    try:
        ind, num = next(fib_seq)
        print(f'{ind} -> {num}')
    except StopIteration:
        print('EXIT FROM LOOP')
        break

1 -> 1
2 -> 1
3 -> 2
4 -> 3
5 -> 5
6 -> 8
7 -> 13
8 -> 21
9 -> 34
10 -> 55
11 -> 89
12 -> 144
13 -> 233
14 -> 377
15 -> 610
16 -> 987
17 -> 1597
18 -> 2584
19 -> 4181
20 -> 6765
EXIT FROM LOOP


# Генераторы

In [22]:
# Бесконечный генератор
def fib_generator():
    prev, cur = 0, 1
    while True:
        yield cur
        prev, cur = cur, cur + prev

fib_seq = fib_generator()
for ind in range(1, 21):
    print(f'{ind} -> {next(fib_seq)}')
    
f'OUT LOOP: -> {next(fib_seq)}'

1 -> 1
2 -> 1
3 -> 2
4 -> 3
5 -> 5
6 -> 8
7 -> 13
8 -> 21
9 -> 34
10 -> 55
11 -> 89
12 -> 144
13 -> 233
14 -> 377
15 -> 610
16 -> 987
17 -> 1597
18 -> 2584
19 -> 4181
20 -> 6765


'OUT LOOP: -> 10946'

In [23]:
# Генератор с остановкой
def fib_generator(n):
    prev, cur = 0, 1
    
    ind = 1
    while ind <= n:
        yield ind, cur
        prev, cur = cur, cur + prev
        ind += 1

for ind, val in fib_generator(20):
    print(f'{ind} -> {val}')

1 -> 1
2 -> 1
3 -> 2
4 -> 3
5 -> 5
6 -> 8
7 -> 13
8 -> 21
9 -> 34
10 -> 55
11 -> 89
12 -> 144
13 -> 233
14 -> 377
15 -> 610
16 -> 987
17 -> 1597
18 -> 2584
19 -> 4181
20 -> 6765


In [24]:
# Генератор реализует интерфейс итератора
fib_seq = fib_generator(20)
while True:
    try:
        ind, num = next(fib_seq)
        print(f'{ind} -> {num}')
    except StopIteration:
        print('EXIT FROM LOOP')
        break

1 -> 1
2 -> 1
3 -> 2
4 -> 3
5 -> 5
6 -> 8
7 -> 13
8 -> 21
9 -> 34
10 -> 55
11 -> 89
12 -> 144
13 -> 233
14 -> 377
15 -> 610
16 -> 987
17 -> 1597
18 -> 2584
19 -> 4181
20 -> 6765
EXIT FROM LOOP
