# Fintech Python
## Лекция 7

# Процессы

Процесс — программа, которая исполняется в данный момент и обладает набором ресурсов:

 * образом исполняемого машинного кода
 * памятью
 * дескрипторами ресурсов (файлов, сокетов) etc.
 
Процессы изолированы друг от друга операционной системой

### Создание новых процессов

In [100]:
import os

os.fork()

print("Hello, world")

Hello, world
Hello, world


In [101]:
import os

os.fork()
os.fork()
os.fork()

print("Hello, world")

Hello, world
Hello, world
Hello, world
Hello, world
Hello, world
Hello, world
Hello, world
Hello, world


In [102]:
import os
import time

x = 1

if os.fork():  # Возвращает 0 в дочернем процессе и pid ребенка в родительском
    x += 1
    time.sleep(1)
    print(f'Parent: {x}')
else:
    x += 2
    time.sleep(1)
    print(f'Child: {x}')

Parent: 2
Child: 3


## Copy on write

* Пока читаем - используем старые данные
* При записи копируем

# Есть один нюанс...

# multiprocessing
https://docs.python.org/3/library/multiprocessing.html

In [None]:
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    
def f(name):
    info('function f')
    print('hello', name)
    
if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

In [5]:
from multiprocessing import Process, Queue


def worker(job: int, queue: Queue):
    queue.put(job)

    
queue = Queue()
processes = [Process(target=worker, args=(i, queue)) for i in range(30)]
for p in processes:
    p.start()
for p in processes:
    p.join()

result = [queue.get() for i in range(30)]
print(result)

[1, 0, 3, 2, 4, 5, 6, 10, 7, 8, 11, 12, 9, 14, 13, 15, 17, 18, 20, 19, 16, 21, 22, 23, 24, 26, 25, 28, 27, 29]


In [6]:
size = 100_000_000
arr = [1] * size

In [7]:
%%time
sum(arr)

CPU times: user 1.26 s, sys: 4 ms, total: 1.26 s
Wall time: 2.15 s


100000000

In [8]:
from multiprocessing import Pool

process_count = os.cpu_count()
part_size = size // process_count

process_count

8

In [9]:
%%time
with Pool(process_count) as p:
    p.map(sum, (arr[i * part_size: (i+1) * part_size] for i in range(process_count)))

CPU times: user 3.3 s, sys: 762 ms, total: 4.07 s
Wall time: 7.62 s


Проблема в большом объеме данных

In [10]:
def get_sum(size):
    return sum([1] * size)

In [11]:
%%time
with Pool(process_count) as p:
    p.map(get_sum, [part_size] * process_count)

CPU times: user 5.72 ms, sys: 195 ms, total: 201 ms
Wall time: 1.21 s


In [12]:
with Pool(process_count) as p:
    %timeit p.map(get_sum, [part_size] * process_count)

677 ms ± 28.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# Таки слишком дорого

### Резюме
* Создавать процессы — это дорого 
* Передавать данные между процессами — тоже дорого. Поэтому иногда меньше процессов — лучше
* Если данных для обмена много, и задача не слишком тяжелая, лучше обойтись без multiprocessing'а

## Потоки

<img src="nlhI00n.png">

<img src="TaskExecution.gif">

В один момент времени одно ядро процессора исполняет ровно один поток

 Несколько ядер могут выполнять несколько потоков буквально одновременно

<img src="diagram-thread-concurrency.png">

<img src="switch-context.png">

https://github.com/mit-pdos/xv6-public/blob/master/proc.c#L323
```c
  for(;;){

    // Loop over process table looking for process to run.
    acquire(&ptable.lock);
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
      if(p->state != RUNNABLE)
        continue;
        
      c->proc = p;
      switchuvm(p);
      p->state = RUNNING;

      swtch(&(c->scheduler), p->context);
      switchkvm();

      c->proc = 0;
    }
    release(&ptable.lock);
  }
```

<img src="Process_states.ru.svg.png">

https://docs.python.org/3/library/threading.html

In [41]:
from threading import Thread


def worker(num):
    print(f'Worker: {num}')

    
threads = [Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()    

Worker: 0
Worker: 1Worker: 2

Worker: 3
Worker: 4


In [None]:
from threading import Thread

x = 0

def worker(num):
    global x
    x += 1

    
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()   

x

In [14]:
from threading import Thread

x = 0

def worker(num):
    global x
    x += 1

    
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()   

x

10

Нам просто повезло

Давайте усугубим ситуацию

In [42]:
import time
from threading import Thread

x = 0

def worker(num: int) -> None:
    global x
    old_x = x
    time.sleep(0.00001)
    new_x = old_x + 1
    x = new_x

    
threads = [Thread(target=worker, args=(i,)) for i in range(1000)]
for t in threads:
    t.start()
for t in threads:
    t.join()   

x

916

# Race condition
<img src="race-condition.jpg">

Решение первое: в лоб

In [16]:
import time
from threading import Thread, Lock

x = 0

def worker(num: int, lock: Lock) -> None:
    global x
    lock.acquire()
    old_x = x
    time.sleep(0.00001)
    new_x = old_x + 1
    x = new_x
    lock.release()


lock = Lock()
threads = [Thread(target=worker, args=(i, lock)) for i in range(1000)]
for t in threads:
    t.start()
for t in threads:
    t.join()   

x

1000

Чуть более правильное решение

In [17]:
import time
from threading import Thread, Lock

x = 0

def worker(num: int, lock: Lock) -> None:
    global x
    with lock:
        old_x = x
        time.sleep(0.00001)
        new_x = old_x + 1
        x = new_x


lock = Lock()
threads = [Thread(target=worker, args=(i, lock)) for i in range(1000)]
for t in threads:
    t.start()
for t in threads:
    t.join()   

x

1000

С локами приходится думать

In [18]:
from threading import Lock

lock_a = Lock()
lock_b = Lock()
data = {}

def func_a():
    with lock_a:
        with lock_b:
            pass
        

def func_b():
    with lock_b:
        with lock_a:
            pass

Вариант с [queue.Queue](https://docs.python.org/3/library/queue.html)

<img src="queue.png">

# Сверхразум

In [19]:
import random

lock = Lock()

def worker():
    with lock:
        if random.randint(0, 1):
            os.fork()
            
threads = [Thread(target=worker) for i in range(10)]

Задача с суммой массива

In [20]:
import queue

In [21]:
def adder(arr, part_id, thread_count, results_queue):
    results_queue.put(
        sum(arr[i] for i in range(part_id, len(arr), thread_count))
    )
    

def sum_using_threads(arr, thread_count):
    res_queue = queue.Queue()
    threads = [
        Thread(target=adder, args=(arr, i, thread_count, res_queue))
        for i in range(thread_count)
    ]
    for thread in threads:
        thread.start()

    results = []
    for thread in threads:
        results.append(res_queue.get())
        thread.join()

    return sum(results)

In [25]:
size = 10 ** 7
arr = [1 for _ in range(size)]

In [26]:
%%timeit
sum(arr[i] for i in range(len(arr)))

668 ms ± 47.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [27]:
%%timeit
sum_using_threads(arr, 4)

725 ms ± 76 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


<img src="gil.png">

# GIL* - Global Interpreter Lock

https://asvetlov.blogspot.com/2011/07/gil.html

###### *Запрещенная в России преступная организация

Для тех, кто любит почитать исходники на ночь
<font size="3">
```c
PyObject *
PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
{
    PyThreadState *tstate = PyThreadState_GET();
    /* ... */
    for (;;) {
        /* ... */
        if (_Py_atomic_load_relaxed(&eval_breaker)) {
            /* ... */
            if (_Py_atomic_load_relaxed(&gil_drop_request)) {
                /* Give another thread a chance */
                if (PyThreadState_Swap(NULL) != tstate)
                    Py_FatalError("ceval: tstate mix-up");
                drop_gil(tstate);

                /* Other threads may run now */

                take_gil(tstate);
                if (PyThreadState_Swap(tstate) != NULL)
                    Py_FatalError("ceval: orphan tstate");
            }
        }
    /* instruction processing */
    }
}
```

# Зачем нужен GIL? Почему же его не убрали?

Отпускаем GIL:

* Если есть те, кто его ждет
* Отдаем добровольно перед системным вызовом

Забираем GIL:

* Если мы его отдали по просьбе, то не просим сразу
* Если не получилось захватить GIL, то ждем 5 секунд и отправляем запрос на переключение

In [33]:
import requests

urls = [
    'https://www.yandex.ru', 'https://www.google.com',
    'https://www.python.org', 'https://github.com'
]

In [34]:
%%timeit
for url in urls:
    requests.get(url).text

1.44 s ± 34.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [35]:
def read_url(url):
    return requests.get(url).text

In [37]:
%%timeit
readers = [
    Thread(target=read_url, args=(url,)) for url in urls
]
for reader in readers:
    reader.start()
for reader in readers:
    reader.join()

490 ms ± 39.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Вывод 

* Для IO bound (web, crawlers) приложений потоки отлично работают
* Для CPU bound (math, image processing) используем процессы или специальные C extension (numpy), которые умеют параллелиться без GIL
* В вебе почти всегда комбинация обоихвариантов, т.е. N процессов и в каждом M тредов

# Питонячая магия

In [43]:
class Cow:
    def __init__(self, name: str) -> None:
        self._name = name
        
    # Проверяем имя коровы
    def set_name(self, name: str) -> None:
        if not isinstance(name, str):
            raise ValueError()
        if name == "":
            raise ValueError()
        self._name = name

In [44]:
class Sheep:
    def __init__(self, name: str):
        self._name = name
        
    def set_name(self, name: str) -> None:
        if not isinstance(name, str):
            raise ValueError()
        if name == "":
            raise ValueError()
        self._name = name

### Решения?

### Наследование

In [45]:
class Animal:
    def __init__(self, name: str) -> None:
        self._name = name
        
    def set_name(self, name: str) -> None:
        if not isinstance(name, str):
            raise ValueError()
        if name == "":
            raise ValueError()
        self._name = name
        
class Cow(Animal):
    pass

class Sheep(Animal):
    pass

### Проблема: появился фермер

In [46]:
class Farmer:
    def __init__(self, name: str, surname: str) -> None:
        self._name = name
        self._surname = surname
        
    def set_name(self, name: str) -> None:
        if not isinstance(name, str):
            raise ValueError()
        if name == "":
            raise ValueError()
        self._name = name
        
    def set_surname(self, surname: str) -> None:
        if not isinstance(surname, str):
            raise ValueError()
        if surname == "":
            raise ValueError()
        self._surname = surname

In [47]:
class Farmer(Animal):
    def __init__(self, name: str, surname: str) -> None:
        super().__init__()
        self._surname = surname
        
    def set_surname(self, surname: str) -> None:
        if not isinstance(surname, str):
            raise ValueError()
        if surname == "":
            raise ValueError()
        self._surname = surname

# Дескрипторы

https://docs.python.org/3/howto/descriptor.html

In [None]:
a.x
# type(a).__dict__['x'].__get__(a, type(a))

In [51]:
from typing import Optional, Any

In [53]:
class NonEmptyString:
    def __init__(self, name: str):
        self._name = name
    
    def __get__(self, obj: Optional[Any], objtype: type) -> Any:
        if obj is None:
            return self
        return getattr(obj, self._name)
    
    def __set__(self, obj: Optional[Any], value: str) -> None:
        if not isinstance(value, str):
            raise ValueError()
        if value == "":
            raise ValueError()
        setattr(obj, self._name, value)

In [55]:
class Farmer:
    name = NonEmptyString("_name")
    surname = NonEmptyString("_surname")
    
    def __init__(self, name: str, surname: str) -> None:
        self.name = name
        self.surname = surname
    

farmer = Farmer("Grzegorz", "Brzęczyszczykiewicz")
print(farmer.name, farmer.surname)
farmer.name = "Boris"
print(farmer.name, farmer.surname)

Grzegorz Brzęczyszczykiewicz
Boris Brzęczyszczykiewicz


### Примеры:

* property

In [None]:
C.x
# C.__dict__['x'].__get__(None, C)

### Примеры non-data дескрипторов:

* staticmethod
* classmethod

# Метаклассы

In [None]:
class A:
    def __init__(self):
        print('A.__init__')
        self.x = 1


a = A()
print(type(a))
print(a.x)

In [58]:
class A:
    def __init__(self):
        print('A.__init__')
        self.x = 1


a = A()
print(type(a))
print(a.x)

A.__init__
<class '__main__.A'>
1


Всё есть объект

In [70]:
print(type(1))
print(type('a'))
print(type(type))
print(type(str))

<class 'int'>
<class 'str'>
<class 'type'>
<class 'type'>


In [65]:
class B(A):
    def method(self):
        print(1)

b = B()
b.method()

A.__init__
1


In [66]:
B = type('B', (A,), {'method': lambda self: print(1)})

b = B()
b.method()

A.__init__
1


In [None]:
class B:
    def __new__(cls, *args):
        print("B.__new__")
        print(args)
        obj = type("C", tuple(), {})()
        cls.__init__(obj)
        return obj
    
    def __init__(self, *args):
        print("B.__init__")
        print(args)
        self.x = "arg"
        

b = B(2)
print(type(b))
print(b.x)

In [63]:
class B:
    def __new__(cls, *args):
        print("B.__new__")
        print(args)
        obj = type("C", tuple(), {})()
        cls.__init__(obj)
        return obj
    
    def __init__(self, *args):
        print("B.__init__")
        print(args)
        self.x = "arg"
        

b = B(2)
print(type(b))
print(b.x)

B.__new__
(2,)
B.__init__
()
<class '__main__.C'>
arg


In [None]:
class C:
    def __new__(cls, *args) -> int:
        print("C.__new__")
        print(args)
        return 1
    
    def __init__(self, x: int) -> None:
        print("C.__init__")
        print(args)
        self.x = x
        
b = C(3)
print(type(b))
print(b.x)

In [67]:
class C:
    def __new__(cls, *args) -> int:
        print("C.__new__")
        print(args)
        return 1
    
    def __init__(self, x: int) -> None:
        print("C.__init__")
        print(args)
        self.x = x
        
b = C(3)
print(type(b))
print(b.x)

C.__new__
(3,)
<class 'int'>


AttributeError: 'int' object has no attribute 'x'

In [71]:
from typing import Tuple, Dict, Any

In [81]:
class Metaclass(type):
    def __new__(
        cls: type, 
        name: str,
        bases: Tuple[type, ...],
        dct: Dict[str, Any]
    ) -> "Metaclass":
        print("Metaclass.__new__")
        print(cls, name, bases, dct)
        obj = type.__new__(cls, name, bases, dct)
        return obj

In [82]:
class Example(metaclass=Metaclass):
    pass

Metaclass.__new__
<class '__main__.Metaclass'> Example () {'__module__': '__main__', '__qualname__': 'Example'}


In [87]:
class Metaclass(type):
    def __new__(cls, name, bases, dct: Dict[str, Any]) -> "Metaclass":
        print("Metaclass.__new__")
        print(cls, name, bases, dct)
        obj = type.__new__(cls, name, bases, dct)
        return obj
    
    def __init__(
        cls:   type, 
        name:  str, 
        bases: Tuple[type], 
        dct:   Dict[str, Any]
    ) -> None:
        print("Metaclass.__init__")
        print(cls, name, bases, dct)


class Exmaple(metaclass=Metaclass):
    def f():
        pass

Metaclass.__new__
<class '__main__.Metaclass'> Exmaple () {'__module__': '__main__', '__qualname__': 'Exmaple', 'f': <function Exmaple.f at 0x7f4984313710>}
Metaclass.__init__
<class '__main__.Exmaple'> Exmaple () {'__module__': '__main__', '__qualname__': 'Exmaple', 'f': <function Exmaple.f at 0x7f4984313710>}


In [88]:
class Metaclass(type):
    def __new__(meta, name, bases, dct) -> "Metaclass":
        print("Metaclass.__new__")
        return super().__new__(meta, name, bases, dct)
     
    def __init__(cls, name, bases, dct) -> None:
        print("Metaclass.__init__")
    
    def __call__(cls, *args, **kwargs):
        print("Metaclass.__call__")
        print(cls, args, kwargs)
        return type.__call__(cls, *args, **kwargs)
        # return cls(*args, **kwargs)  бесконечная рекурсия

class Exmaple(metaclass=Metaclass):
    def __init__(self, *args, **kwargs):
        print("Exmaple.__init__")
        print(args, kwargs)

Metaclass.__new__
Metaclass.__init__


In [89]:
obj = Exmaple(1, x=2)

Metaclass.__call__
<class '__main__.Exmaple'> (1,) {'x': 2}
Exmaple.__init__
(1,) {'x': 2}


1. `__new__` вызывается до создания класса, возвращает класс
2. `__init__` после создания класса
3. `__call__` вызывается перед созданием объекта класса

# ABC + abstractmethod

In [91]:
from typing import Callable

In [92]:
class AbstractMethod:
    def __call__(self) -> None:
        raise NotImplementedError("Method not implemented")

def abstractmethod(method: Callable[..., Any]) -> AbstractMethod:
    return AbstractMethod()

class Animal():
    @abstractmethod
    def hello(self) -> None:
        pass

In [97]:
from copy import deepcopy

import inspect


class MyABCMeta(type):
    def __init__(cls: type, name: str, bases: Tuple[type, ...], dct: Dict[str, Any]) -> None:
        # Собираем все AbstractMethod из класса, который создаём
        abstract_methods = {name for name, value in dct.items() if isinstance(value, AbstractMethod)}
        # Собираем все AbstractMethod из родителей класса, который создаём
        for base in bases:
            new_methods = inspect.getmembers(base, predicate=lambda x: isinstance(x, AbstractMethod))
            abstract_methods.update({k for k, v in new_methods})
        # Теперь в abstract_methods собрали все методы, которые нужно переписать
        # Собираем все функции, которые есть в классе, который создаём
        concrete_methods = {name for name, value in dct.items() if inspect.isfunction(value)}
        # Записываем все непереопределённые методы в __abstract_methods__
        cls.__abstract_methods = abstract_methods - concrete_methods
        
    def __call__(cls: type, *args: Any, **kwargs: Any) -> Any:
        # Если на момент создания объекта в классе остаются абстрактные методы кидаем ошибку
        if cls.__abstract_methods:
            methods = ", ".join(cls.__abstract_methods)
            raise NotImplementedError("Methods not implemented: {}".format(methods))
        return type.__call__(cls, *args, **kwargs)

class MyABC(metaclass=MyABCMeta):
   pass

In [98]:
 class Animal(MyABC):
    @abstractmethod
    def hello(self) -> None:
        pass

    
class Cow(Animal):
    def hello(self) -> None:
        print("Moo")
        
class Sheep(Animal):
    pass

In [99]:
try:
    l = Animal()
except NotImplementedError as e:
    print(e)
try:
    s = Sheep()
except NotImplementedError as e:
    print(e)

c = Cow()
c.hello()

Methods not implemented: hello
Methods not implemented: hello
Moo
