## Concurrency, parallelism and python

### Process

<b>Процесс</b> — запущенная программа.
* У каждого процесса есть изолированное от других
процессов состояние:
    * виртуальное адресное пространство,
    * указатель на исполняемую инструкцию,
    * стек вызовов,
    * системные ресурсы, например, открытые файловые
        дескрипторы.
* Процессы удобны для одновременного выполнения
нескольких задач.
* Альтернативный способ: делегировать каждую задачу на
выполнение потоку

### Thread

Поток похож на процесс тем, что его исполнение
происходит независимо от других потоков (и процессов).
* В отличие от процесса поток исполняется внутри процесса
и разделяет с ним адресное пространство и системные
ресурсы.
* Потоки удобны для одновременного выполнения
нескольких задач, которым требуется доступ к
разделяемому состоянию.
* Совместным выполнением нескольких процессов и
потоков управляет операционная система, поочерёдно
разрешая каждому процессу или потоку использовать
сколько-то циклов процессора.

In [1]:
import time
import requests

def fetch_response(url: str) -> requests.Response:
    return requests.get(url)

def print_status_codes(responses: list):
    for r in responses:
        print(r.status_code, end=" ")

In [2]:
urls = [
    "https://docs.python.org/3/",
    "https://google.com",
    "https://example.com",
    "https://python.org",
    "https://www.youtube.com/",
    "https://github.com/",
]

### Simple (synchronously)

In [6]:
%%time
responses = [fetch_response(url) for url in urls]
print_status_codes(responses) 

200 200 200 200 200 200 CPU times: user 146 ms, sys: 24.6 ms, total: 171 ms
Wall time: 9.11 s


### multithreading

In [8]:
from concurrent.futures import ThreadPoolExecutor

In [9]:
%%time
# max_workers is changed to min(32, os.cpu_count() + 4) python3.8+
with ThreadPoolExecutor(max_workers=6) as pool:
    responses = pool.map(fetch_response, urls)
    print_status_codes(responses)

200 200 200 200 200 200 CPU times: user 158 ms, sys: 23.5 ms, total: 181 ms
Wall time: 2.29 s


### multiprocessing

In [3]:
from concurrent.futures import ProcessPoolExecutor

In [4]:
%%time
# If max_workers is None or not given, it will default to the number of processors (cores) on the machine
with ProcessPoolExecutor(max_workers=len(urls)) as pool:
    responses = pool.map(fetch_response, urls)
    print_status_codes(responses)

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

### Another example

In [5]:
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419,
]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

### Simple (synchronously)

In [6]:
%%time
for number, _is_prime in zip(PRIMES, map(is_prime, PRIMES)):
    print(f'{number} is prime: {_is_prime}')

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
CPU times: user 2.77 s, sys: 22.6 ms, total: 2.79 s
Wall time: 2.82 s


### multithreading

In [9]:
%%time
with ThreadPoolExecutor() as pool:
    for number, _is_prime in zip(PRIMES, pool.map(is_prime, PRIMES)):
            print(f'{number} is prime: {_is_prime}')

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
CPU times: user 2.75 s, sys: 46 ms, total: 2.8 s
Wall time: 2.8 s


### multiprocessing

In [11]:
%%time
with ProcessPoolExecutor() as pool:
    for number, _is_prime in zip(PRIMES, pool.map(is_prime, PRIMES)):
            print(f'{number} is prime: {_is_prime}')

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

### Why is there such a big difference?

### parallelism and concurrency

![](https://s3.tproger.ru/uploads/2016/12/concepts3mini.png)

### Why is there such a big difference?
### Answer is GIL

* <b>GIL</b> (global interpreter lock) — это мьютекс, который гарантирует, что в каждый момент времени только один
поток имеет доступ к состоянию интерпретатора.
* Python C API позволяет отпустить GIL, но это безопасно только при работе с объектами, не зависящими от интерпретатора Python.
* Например, все операции ввода/вывода в CPython отпускают GIL
* переключение контекста каждые 5 миллисекунд (вытесняющая, принудительная многозадачность)

In [None]:
...
Py_BEGIN_ALLOW_THREADS
err = close(fd);
if (err < 0)
    save_errno = errno;
Py_END_ALLOW_THREADS
...

### So GIL is bad?

### Nope! Depends on task

### Have a snack?
![](https://upload.wikimedia.org/wikipedia/commons/3/36/McDonald%27s_Golden_Arches.svg)

### Lets order a coffee

![](https://i.imgur.com/XsSm55k.png)

### Look good, now lets order a Big Mac


![](https://cdn-images-1.medium.com/max/720/0*APVcPuyDaIKSDZPz.png)


### Where is the relation?

![](https://i.imgur.com/3zAfnP7.png)

### I/O bound tasks vs CPU bound tasks
![](https://i.imgur.com/8F3Roo8.png)

### GIL doesn't save you from logical errors

In [6]:
for i in range(1000):
    counter = 0

    def increment_counter(fake_value):
        global counter
        for _ in range(100):
            counter = counter + 1


    fake_data = [x for x in range(5000)]
    with ThreadPoolExecutor(max_workers=5000) as executor:
        executor.map(increment_counter, fake_data)
    if counter != 500000:
        print(i," -- ", counter)

402  --  498200


In [None]:
from threading import Lock, RLock
lock = RLock()

for i in range(1000):
    counter = 0

    def increment_counter(fake_value):
        global counter
        for _ in range(100):
            with lock:
                counter += 1


    fake_data = [x for x in range(5000)]
    with ThreadPoolExecutor(max_workers=5000) as executor:
        executor.map(increment_counter, fake_data)
    if counter != 500000:
        print(counter)

### + multithreading is hardly scalable

### asyncio

* works in one thread
* use eventloop and generator concepts; single Python object, called the event loop, controls how and when each task gets run
* tasks never give up control without intentionally doing so -> thread safe
* context switch on **await** statement (cooperative multitasking)

![](https://eng.paxos.com/hs-fs/hubfs/_02_Paxos_Engineering/Event-Loop.png?width=1200&name=Event-Loop.png)

In [13]:
import asyncio

async def sleep(seconds):
    await asyncio.sleep(seconds)

async def main():
    print('Hello ...')
    await sleep(1)
    print('... World!')

# Python 3.7+
await main()
# asyncio.run(main())

Hello ...
... World!


We say that an object is an `awaitable` object if it can be used in an **await** expression. Many asyncio APIs are designed to accept awaitables.

There are three main types of awaitable objects: 
* Сoroutines
* Tasks
* Futures.

### Coroutines example

In [14]:
# bad example
import time
import asyncio

async def say_after(delay, what):
    await asyncio.sleep(delay)  # (4) (5)
    print(what) # (6)

async def main():
    print("Started")  # (2)

    await say_after(1, 'hello')  # (3) (7)
    await say_after(2, 'world') # (8)
    print("Finished") # (9)
    
t1 = time.time()
await main() # (10)
# asyncio.run(main())
t2 = time.time()
print(f"It took {t2 - t1} seconds")

Started
hello
world
Finished
It took 3.0064103603363037 seconds


1. coroutine created and event loop stared with that coroutine in the queue, control returned to event loop, event loop starts the coroutine (main)
2. prints text in the main function
3. coroutine added to event loop queue, control passed to event loop, event loop starts the coroutine (say_after(1, 'hello'))
4. coroutine added to event loop queue, control returned to event loop, event loop is looping
5. event loop wait until the coroutine finished and control passed to function
6. do some stuff with IO and exit function, It is await function and control passed to event loop after it finished.
7. loop see that it can return control to main function
8. once more schedule, wait until finished.
9. print to IO and finish. Event loop closed.
10. event loop shuts down

### Tasks example

<b>Tasks are used to schedule coroutines concurrently.</b> <br> When a coroutine is wrapped into a Task with functions like `asyncio.create_task()` the coroutine is automatically scheduled and will run as soon as control passed to event loop.

In [15]:
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print("Started")
    # Wait until both tasks are completed
    # (should take around 2 seconds.)
    await task1
    
    await task2 
    print("Finished")

t1 = time.time()
await main()
# asyncio.run(main())
t2 = time.time()
print(f"It took {t2 - t1} seconds")

Started
hello
world
Finished
It took 2.0058529376983643 seconds


This code do a magic trick, all tasks in the queue started when first await is called. `await task1` will start both `task1` and `task2`. Keep in mind, if you await a coroutine (`await say_after(1, 'oops')`, it will not trigger any task start.

![](http://onreader.mdl.ru/MasteringConcurrencyInPython/content/figures/Fig1001.jpg)

### Futures example

A **Future** is a special low-level awaitable object that represents an eventual result of an asynchronous operation. <br>
Future objects are used to bridge **low-level callback-based code** with high-level async/await code. <br>
Normally **there is no need** to create Future objects at the application level code. Future objects, sometimes exposed by libraries and some asyncio APIs, can be awaited.

In [17]:
import time
import asyncio

async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)

async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    loop.create_task(
        set_after(fut, 1, '... world'))

    print('hello ...')

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)
    
t1 = time.perfcounter()
await main()
# asyncio.run(main())
t2 = time.perfcounter()
print(f"It took {t2 - t1} seconds")

AttributeError: module 'time' has no attribute 'perfcounter'

### Network example

In [18]:
%%time
# max_workers is changed to min(32, os.cpu_count() + 4) python3.8+
with ThreadPoolExecutor(max_workers=len(urls)) as pool:
    responses = pool.map(fetch_response, urls)
    print_status_codes(responses)

200 200 200 200 200 200 CPU times: user 164 ms, sys: 31.1 ms, total: 195 ms
Wall time: 2.94 s


In [19]:
async def fetch_response(url: str) -> requests.Response:
    return requests.get(url)

async def main():
    t1 = time.time()
    tasks = [asyncio.create_task(fetch_response(url)) for url in urls]
#     for task in tasks:
#         await task
    await asyncio.gather(*tasks)
    t2 = time.time()

    print(f"It took {t2 - t1} seconds")
    for task in tasks:
        print(task.result().status_code)
        
await main()

It took 7.943724870681763 seconds
200
200
200
200
200
200


In [20]:
async def fetch_response(url: str) -> requests.Response:
    return requests.get(url)

async def main():
    t1 = time.time()
    tasks = [asyncio.create_task(fetch_response(url)) for url in urls]
    for task in tasks:
        await task
    t2 = time.time()

    print(f"It took {t2 - t1} seconds")
    for task in tasks:
        print(task.result().status_code, end= " ")
        
await main()
# asyncio.run(main())

It took 7.669713973999023 seconds
200 200 200 200 200 200 

Python `requests` calls are blocking calls. <br>
To make pure async calls you will need event loop embedded into the client.

### aiohttp client

In [21]:
# bad example
import aiohttp

async def fetch_response(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response


async def main():
    t1 = time.time()
    tasks = [asyncio.create_task(fetch_response(url)) for url in urls]
    await asyncio.gather(*tasks)
    t2 = time.time()

    print(f"It took {t2 - t1} seconds")
    for task in tasks:
        print(task.result().status, end=" ")

        
await main()
# asyncio.run(main())

It took 2.3421177864074707 seconds
200 200 200 200 200 200 

In [22]:
# good example
async def fetch_response(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response

async def main():
    tasks = [asyncio.create_task(fetch_response(url)) for url in urls]

    t1 = time.time()
    await asyncio.gather(*tasks)
    t2 = time.time()

    print(f"It took {t2 - t1} seconds")
    for task in tasks:
        print(task.result().status, end=" ")


await main()

It took 2.300110101699829 seconds
200 200 200 200 200 200 

### Conclusion

* there are different approaches to make your program run faster (multithreading, multiprocessing, async), so
* Once you’ve decided that you should optimize your program, figuring out if your program is **CPU-bound** or **I/O-bound** is a great next step.
* For I/O-bound problems, there’s a general rule of thumb in the Python community: “Use asyncio when you can, threading when you must.”
* async approach effectively utilizes resources when task is IO bound, it will work more effective under heavy load.
* not all IO libraries have async analogues, using both async and sync code together may degrade system performance to sync level.
* you can use async model outside of your application by using external tools like [celery](https://docs.celeryproject.org/en/stable/getting-started/introduction.html)

![](https://files.realpython.com/media/Screen_Shot_2018-10-17_at_3.18.44_PM.c02792872031.jpg)

### Materials
#### Documentation:
* [Concurrent Execution in python](https://docs.python.org/3/library/concurrency.html)
* [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html)
* [multiprocessing ](https://docs.python.org/3/library/multiprocessing.html)
* [asyncio](https://docs.python.org/3/library/asyncio.html)
* [Celery](https://docs.celeryproject.org/en/stable/getting-started/introduction.html)
####  Videos to watch
* [McDonalds и Python](https://www.youtube.com/watch?v=b3iXdDmXm7s&t=167s&ab_channel=%D0%94%D0%B8%D0%B4%D0%B6%D0%B8%D1%82%D0%B0%D0%BB%D0%B8%D0%B7%D0%B8%D1%80%D1%83%D0%B9%21)
* [Многопроцессность, многопоточность, асинхронность в Python](https://www.youtube.com/watch?v=JIp14T9bvvc&ab_channel=%D0%94%D0%B8%D0%B4%D0%B6%D0%B8%D1%82%D0%B0%D0%BB%D0%B8%D0%B7%D0%B8%D1%80%D1%83%D0%B9%21)
* [Многопоточность и GIL](https://www.youtube.com/watch?v=nR8WhdcRJwM&ab_channel=ComputerScienceCenter)
* [GIL в Python: зачем он нужен и как с этим жить](https://www.youtube.com/watch?v=AWX4JnAnjBE&ab_channel=MoscowPython)
* [Андрей Светлов: "Подводные камни asyncio"](https://www.youtube.com/watch?v=GLN_xo4Awcc&ab_channel=PiterPy)
#### Real python ultimate guides
* [Speed Up Your Python Program With Concurrency](https://realpython.com/python-concurrency/)
* [Async IO in Python: A Complete Walkthrough](https://realpython.com/async-io-python/)
#### Others
* [Асинхронное программирование в Python](https://tproger.ru/translations/asynchronous-programming-in-python/)
* [Зачем, когда и как использовать multithreading и multiprocessing в Python](https://habr.com/ru/company/otus/blog/501056/)
* [Asynchronous programming. Python3.5+](https://luminousmen.com/post/asynchronous-programming-python3.5)
* [How does asyncio work?](https://stackoverflow.com/questions/49005651/how-does-asyncio-actually-work/51116910#51116910)
* [Concurrent computing (by Andrey Solomatin)](https://github.com/Cjkjvfnby/presentation-async/blob/master/src/main/asciidoc/presentation.adoc)

### Homework
Ваша задача спарсить информацию о компаниях, находящихся в индексе S&P 500 с данного сайта: <br>
https://markets.businessinsider.com/index/components/s&p_500

Для каждой компании собрать следующую информацию:
* Текущая стоимость в рублях (конвертацию производить по текущему курсу, взятому с сайта [центробанка РФ](http://www.cbr.ru/development/sxml/))
* Код компании (справа от названия компании на странице компании)
* P/E компании (информация находится справа от графика на странице компании)
* Годовой рост/падение компании в процентах (основная таблица)
* Высчитать какую прибыль принесли бы акции компании (в процентах), если бы они были куплены на уровне 52 Week Low и проданы на уровне 52 Week High (справа от графика на странице компании)

Сохранить итоговую информацию в 4 JSON файла:
1. Топ 10 компаний с самими дорогими акциями в рублях.
2. Топ 10 компаний с самым низким показателем P/E.
3. Топ 10 компаний, которые показали самый высокий рост за последний год
4. Топ 10 комппаний, которые принесли бы наибольшую прибыль, если бы были куплены на самом минимуме и проданы на самом максимуме за последний год.
<br>Пример формата:
```
[
{
    "code": "MMM",
    "name": "3M CO.",
    "price" | "P/E" | "growth" | "potential profit" : value,
},
...
]
```
<br>**P.S. по-максимуму использовать возможности параллелизма и асинхронности.** 
<br>
* bs4
* aiohttp