# Вступ до багатопотоковості

Код, що виконує паралельно кілька завдань, називається асинхронним. 

Найпростіший спосіб реалізувати асинхронність — це виконувати завдання в окремих потоках всередині одного процесу.

**Процес** — `область пам'яті` (віртуальна) + `набір ресурсів` + `1 і більше потоків`.

**Потік** — `послідовність інструкцій та системних викликів` всередині процесу.

Всі потоки мають доступ до всіх ресурсів свого процесу. Усі процеси ізольовані один від одного, і будь-яка міжпроцесна взаємодія відбувається тільки через операції введення/виведення (системні виклики).

## IO and CPU bound tasks

### IO tasks

Завдання, які виконують операції введення/виведення (читання/запис файлів, запити в мережі тощо), називаються **IO (Input Output)-bound** завданнями. Домогтися паралелізму виконання **IO** завдань у Python можна, використовуючи потоки.

Однак потрібно пам'ятати, що асинхронний код завжди на порядок складніший для розуміння та відлагодження. 
- Для багатопотокових застосунків складно писати тести та складно перевіряти всілякі ситуації, які відбуваються рідко та залежать від порядку викликів у різних потоках. 
- Загальне правило для програмування будь-якою мовою: **якщо є можливість обійтися синхронним кодом, то так і потрібно зробити.**

### CPU tasks
Інший тип блокуючих викликів — це важкі з точки зору обчислень операції. 

Такі завдання називаються **CPU-bound** завданнями. 
- Як і для IO-bound завдань, можна винести виконання блокуючих операцій (складних обчислень) в окремий потік, щоб застосунок продовжував взаємодіяти з користувачем, здійснюючи обчислення.

Загалом, операційна система передає управління потокам (як і передача управління процесами). Це означає, що будь-якої миті, перед будь-яким викликом ОС (Операційна Система) може призупинити виконання коду потоку та розпочати виконувати код іншого потоку, щоб потім так само далі призупинити і його для передачі управління.


[Гарна стаття](https://medium.com/swift-india/concurrency-parallelism-threads-processes-async-and-sync-related-39fd951bc61d) про асинхронний код.

## Global Interpreter Lock (GIL)

Потоки можуть виконуватися дійсно паралельно (якщо ядер процесора більше 1), процеси — тим більше. 

Але у Python є механізм, який примусово блокує виконання коду різними потоками одного Python процесу в один і той самий час.

**Тільки один потік всередині процесу Python виконується, всі інші (якщо такі є) знаходяться в режимі 'Sleep'.**
Операції, пов'язані з введенням/виведенням **(системні виклики) не блокуються GIL**, але не їх послідовність.

Це означає, що якщо ви зробите кілька **IO** викликів у різних потоках, то вам не гарантується черговість завершення цих потоків, але гарантується, що коли виконується код будь-якого з потоків, всі інші потоки чекають черги і нічого не роблять. Це буде так, навіть якщо код виконується на сучасному процесорі з кількома ядрами

### Чому в Python є GIL?

- Простий і зрозумілий збирач сміття.
- **Виключає можливість одночасного доступу до ресурсів/пам'яті**. 
- Немає потреби враховувати особливості конкретної ОС для обробки таких ситуацій.
- Це спадщина епохи одноядерних процесорів, коли додаткові потоки/процеси уповільнювали виконання програми.

Python розроблявся в епоху одноядерних процесорів і навіть теоретично ніхто тоді не міг припустити дійсне одночасне виконання коду в різних потоках. Через це було зроблено низку архітектурних рішень, які вже не змінити, і на Python накладено обмеження GIL.


### Як обійти GIL:

- Написати частину коду, яку потрібно запускати паралельно, на Python і використовувати потоки.
- Використовувати Multiprocessing.

### Чому не потрібно цього робити:

- Python — скриптова мова і швидкість роботи не її сильна сторона. Якщо потрібна швидкість, то, можливо, є сенс розглянути інший інструмент.
- Створення процесів використовує деяку кількість ресурсів системи (пам'ять та процесорний час).
- Перемикання між процесами також використовує процесорний час.

## Створення потоків у Python

[Документація](https://docs.python.org/3/library/concurrency.html) про доступні в Python механізми написання поток

### Потік як клас
​
- Щоб створити потік, найпростіше імпортувати клас `Thread` з модуля `threading` і наслідуватись від цього класу. 
- Далі вам потрібно визначити метод `run` у вашого класу, **цей метод буде виконуватись в окремому потоці**. 
- Щоб розпочати виконання коду в окремому потоці, потрібно викликати метод `start,` який визначений у `Thread`. 

Давайте напишемо клас `MyThread`, що в окремому потоці спить вказаний час і після цього виводить у консоль 'Wake up!':


In [3]:
from threading import Thread
import logging
from time import sleep


class MyThread(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        super().__init__(group=group, target=target, name=name, daemon=daemon)
        self.args = args
        self.kwargs = kwargs

    def run(self) -> None:
        sleep(2)
        logging.debug('Wake up!')
        logging.debug(f"args: {self.args}")


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    for i in range(5):
        thread = MyThread(args=(f"Count thread - {i}",))
        thread.start()
    print('Usefull message')


Usefull message


Thread-13 Wake up!
Thread-11 Wake up!
Thread-14 Wake up!
Thread-12 Wake up!
Thread-13 args: ('Count thread - 3',)
Thread-11 args: ('Count thread - 1',)
Thread-14 args: ('Count thread - 4',)
Thread-12 args: ('Count thread - 2',)


### Потік як функтор

Є інший спосіб виконати код окремого потоку. Для цього потрібно, щоб код виконання був функтором (функцією або класом, який має метод __call__). Тоді об'єкт можна передати як іменований аргумент target у Thread

In [2]:
from threading import Thread
from time import sleep
import logging


class UsefulClass():
    def __init__(self, second_num):
        self.delay = second_num

    def __call__(self):
        sleep(self.delay)
        logging.debug('Wake up!')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    t2 = UsefulClass(2)
    thread = Thread(target=t2)
    thread.start()
    print('Some stuff')


Some stuff


Thread-9 Wake up!
Thread-10 Wake up!
Thread-10 args: ('Count thread - 0',)


### Потік у функції

У процесі створення екземпляра класу `Thread` можна передати аргументу target функцію та передати їй аргументи

Зверніть увагу, що аргументи, які потрібно передати у функцію, передаються як `кортеж` `args` у `Thread`. Іменовані аргументи для функції можна так само передати як `словник` `kwargs` у `Thread`.

In [4]:
from threading import Thread
from time import sleep
import logging


def example_work(delay):
    sleep(delay)
    logging.debug('Wake up!')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    for i in range(5):
        thread = Thread(target=example_work, args=(i,))
        thread.start()


Thread-15 (example_work) Wake up!


Thread-16 (example_work) Wake up!
Thread-17 (example_work) Wake up!
Thread-18 (example_work) Wake up!
Thread-19 (example_work) Wake up!


### Очікування виконання потоку

Коли потрібно в основному застосунку дочекатися виконання потоку, можна скористатися блокуючим методом `join`.

Основний потік дочекався `[el.join() for el in threads]`, доки завершаться всі потоки thread зі списку `threads`, і тільки потім вивів `End program`:

In [5]:
from threading import Thread
import logging
from time import sleep


def example_work(params):
    sleep(params)
    logging.debug('Wake up!')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    logging.debug('Start program')
    threads = []
    for i in range(5):
        thread = Thread(target=example_work, args=(i,))
        thread.start()
        threads.append(thread)

    [el.join() for el in threads]

    logging.debug('End program')


MainThread Start program
Thread-20 (example_work) Wake up!
Thread-21 (example_work) Wake up!
Thread-22 (example_work) Wake up!
Thread-23 (example_work) Wake up!
Thread-24 (example_work) Wake up!
MainThread End program


Ви також можете перевірити — чи виконується потік, викликавши метод `is_alive`

Це може бути корисним, якщо ви хочете перевіряти стан потоку самостійно і не блокувати застосунок в очікуванні завершення.:

In [6]:
from threading import Thread
from time import sleep
import logging


class UsefulClass:
    def __init__(self, second_num):
        self.delay = second_num

    def __call__(self):
        sleep(self.delay)
        logging.debug('Wake up!')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    t2 = UsefulClass(2)
    thread = Thread(target=t2)
    thread_locking = Thread(target=t2)

    thread.start()
    print(thread.is_alive(), thread_locking.is_alive())
    thread_locking.start()
    thread.join()
    thread_locking.join()
    print(thread.is_alive(), thread_locking.is_alive())
    print('After all...')


True False


Thread-26 Wake up!
Thread-25 Wake up!


False False
After all...


### Потоки Timer

Екземпляри класу `Timer` починають працювати з деякою затримкою, яку визначає програміст. Крім того, ці потоки можна скасувати будь-якої миті в період затримки. Наприклад, ви передумали стартувати певний потік.

Тут ми запланували виконання двох потоків, через 0.5 та 0.7 секунд. Але потім через 0.6 секунди скасували виконання другого потоку `second.cancel()`


In [10]:
from threading import Timer
import logging
from time import sleep


def example_work_():
    logging.debug('Start!')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')

    first = Timer(0.3, example_work_)
    first.name = 'First thread'
    second = Timer(0.7, example_work_)
    second.name = 'Second thread'
    logging.debug('Start timers')
    first.start()
    second.start()
    sleep(0.6)
    second.cancel()

    logging.debug('End program')


MainThread Start timers
MainThread End program


## Контроль доступу до ресурсів

Оскільки ОС може на будь-якому виклику перервати виконання потоку та передати контроль іншому потоку, ви не можете бути впевненим, що робота із загальним ресурсом буде коректно завершеною і ресурс не опиниться в невизначеному стані.


### Lock, RLock

Для цього є механізм блокування. 

У Python є два примітива блокувань: **Lock та RLock**. 
- `Lock` трохи швидший і більш низькорівневий, але він **не рекурсивний** 
     - і може бути ситуація потрапляння в `DeadLock`, коли виконання коду заблокується, кілька потоків чекатимуть, доки хтось віддасть `Lock`, а його ніхто ніколи вже не віддасть. Це і є ситуація, коли програма "зависла".
- `RLock` трохи повільніший, зате **виключає взаємне блокування**. 
     - Рекомендується завжди використовувати саме його, якщо немає вагомих причин використовувати `Lock`.

**NOTE**
**Якщо провести аналогію з життя, то `Lock` це коли у кожного потоку один і той самий ключ і будь-який потік може відкрити замок, хто б його не закрив із потоків. З `RLock` ситуація трохи інша, у кожного потоку свій ключ і свій замок.**

**Потік може відкрити лише свій замок своїм ключем і не відкриє замок, якщо його закрив інший потік.**


In [11]:
from threading import Thread, RLock
import logging
from time import time, sleep

lock = RLock()


def func(locker, delay):
    timer = time()
    locker.acquire()
    sleep(delay)
    locker.release()
    logging.debug(f'Done {time() - timer}')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    t1 = Thread(target=func, args=(lock, 2))
    t2 = Thread(target=func, args=(lock, 2))
    t1.start()
    t2.start()
    logging.debug('Started')


MainThread Started


Thread-35 (func) Done 2.0050466060638428
Thread-36 (func) Done 4.012096643447876


#### Could you please explain me this code in simple words? 
How it works, what is the order of execution the lines of code and why?🙏

Certainly! This code demonstrates the use of threading in Python to execute two functions concurrently. Here's a breakdown of how it works:

1. We import necessary modules: `Thread` and `RLock` from `threading`, `logging`, `time`, and `sleep`.

2. We define a `lock` object using `RLock()`. This is a reentrant lock that allows the same thread to acquire the lock multiple times.

3. We define a function `func(locker, delay)` that simulates some task by acquiring the lock, sleeping for a specified duration (`delay`), and then releasing the lock. It also logs the time taken by the task.

4. Inside the `if __name__ == '__main__':` block, we configure logging to display debug messages with thread names and their messages.

5. We create two `Thread` objects (`t1` and `t2`) that target the `func` function. Each thread is passed the same lock object (`lock`) and a delay value of `2` seconds.

6. We start both threads (`t1` and `t2`) using the `start()` method.

7. Finally, we log a debug message indicating that the threads have started.

The order of execution is as follows:

- The main thread starts by configuring logging.
- It then creates two threads (`t1` and `t2`) but does not start them yet.
- The main thread logs a message indicating that the threads have started.
- Both `t1` and `t2` acquire the lock (`lock`) and start executing the `func` function concurrently.
- Each thread sleeps for `2` seconds, simulating some time-consuming task.
- Once the tasks are complete, the threads release the lock.
- The main thread continues to execute and may exit, depending on whether there are any other statements after the logging.

Throughout the execution, the lock ensures that only one thread can execute the critical section of the `func` function at a time, preventing potential race conditions or conflicts between the threads accessing shared resources.

The `logging.debug('Started')` line is executed after the `t1.start()` and `t2.start()` lines in the code because of the order of statements within the `if __name__ == '__main__':` block. 

In Python, statements within a block are executed sequentially from top to bottom. Therefore, `t1.start()` and `t2.start()` are executed before `logging.debug('Started')` because they appear earlier in the code.

Here's the sequence of execution:

1. The `logging.basicConfig()` function is called to configure logging.
2. Two `Thread` objects (`t1` and `t2`) are created, but they are not started yet.
3. The `start()` method is called on both `t1` and `t2`, initiating the execution of their respective target functions (`func`) concurrently.
4. After starting the threads, the main thread continues executing and reaches the `logging.debug('Started')` statement, logging the message "Started".

So, while `t1.start()` and `t2.start()` initiate the execution of the threads, the logging statement is executed afterward, allowing you to see that the threads have been started.

#### Aha, so it means that while t1 and t2 are starting execution, the main thread logs ‘Started’

That’s why the expected result is:

MainThread Started
Thread-1 Done 2.0003550052642822
Thread-2 Done 4.000735521316528


Exactly! You've got it right. Here's how the execution flow goes:

1. The main thread (`MainThread`) executes the `t1.start()` and `t2.start()` statements, initiating the execution of the `func` function in separate threads (`Thread-1` and `Thread-2`).
2. While `Thread-1` and `Thread-2` are executing their respective `func` functions, the main thread continues its execution and logs the message `'Started'`.
3. Meanwhile, `Thread-1` and `Thread-2` complete their tasks, logging the time taken for each operation (`Done 2.0003550052642822` and `Done 4.000735521316528`, respectively).

So, the main thread logs "Started" while the other threads are still running, and then it continues its execution concurrently with those threads.

#### And one more question - what is it the main thread?
How to define it in the code?

The main thread in Python is the thread where your Python program starts its execution. It's the initial thread created when your program runs and where your `__main__` module's code is executed. 

In Python, you can check if you're in the main thread by using the condition `if __name__ == '__main__':`. This condition evaluates to `True` if the current module is being run as the main program, and `False` otherwise. It's commonly used to prevent code in the module from being executed when the module is imported as a module into another program.

Here's an example:

```python
import threading

def my_function():
    print("Running in a thread")

if __name__ == "__main__":
    print("This code runs in the main thread")

    # Create and start a new thread
    thread = threading.Thread(target=my_function)
    thread.start()

    # Wait for the thread to finish
    thread.join()

    print("Back in the main thread")
```

In this example, the code inside the `if __name__ == '__main__':` block runs in the main thread. We then create a new thread and start it to execute the `my_function()` function concurrently. Finally, we wait for the thread to finish using the `join()` method and print a message indicating that we're back in the main thread.

#### Could you clarify some points:
1. For the case where we don’t use thread module and just run the code - the code executes in one thread, right?
import threading

def my_function():
    print("Running in a thread")

if __name__ == "__main__":
    print("This code runs in the main thread")

2. Could you explain how thread.join() works? 

Sure, let me clarify those points:

1. **Execution without threading module**: When you run Python code without explicitly creating threads using the `threading` module, the code executes in a single thread, which is the main thread. All the code in your Python program runs sequentially in this single thread unless you explicitly create additional threads using `threading.Thread`.

    In your example:
    ```python
    import threading

    def my_function():
        print("Running in a thread")

    if __name__ == "__main__":
        print("This code runs in the main thread")
    ```
    The code will execute in a single thread (the main thread). The `my_function()` function is not being executed concurrently in a separate thread because there are no threads created explicitly.

2. **`thread.join()` method**: The `join()` method is used to wait for a thread to complete its execution before proceeding to the next steps in the main thread. When you call `thread.join()` on a thread object `thread`, the main thread will wait until `thread` finishes executing its target function before continuing further.

    Here's how it works in the context of your example:
    ```python
    # Create and start a new thread
    thread = threading.Thread(target=my_function)
    thread.start()

    # Wait for the thread to finish
    thread.join()

    print("Back in the main thread")
    ```
    In this code snippet, after starting the `thread` to execute the `my_function()` function, the main thread encounters the `thread.join()` line. At this point, the main thread will pause and wait until the `thread` finishes executing `my_function()`. Once `my_function()` completes its execution, the main thread will resume from the `thread.join()` line and continue with the next statement, printing "Back in the main thread".

### Але найчастіше для блокування використовують контекст виконання:


In [12]:
from threading import Thread, RLock
import logging
from time import time, sleep

lock = RLock()


def func(locker, delay):
    timer = time()
    with locker:
        sleep(delay)
    logging.debug(f'Done {time() - timer}')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    t1 = Thread(target=func, args=(lock, 2))
    t2 = Thread(target=func, args=(lock, 2))
    t1.start()
    t2.start()
    logging.debug('Started')


MainThread Started


Thread-37 (func) Done 2.00314998626709
Thread-38 (func) Done 4.011469125747681


### Семафори

Семафори підходять до блокування іншим шляхом та вказують, що кілька потоків можуть користуватися ресурсом одночасно і цим обмежують кількість потоків. 
- Наприклад, ми не хочемо надсилати десятки тисяч запитів до мережі одночасно, щоб не створювати навантаження на обладнання і вкажемо семафор, щоб не більше ста потоків могли одночасно надсилати запити. 
- Щойно якийсь із потоків закінчить роботу і семафор його відпустить, то наступний потік із черги очікування зможе зробити свій запит.

Як приклад розглянемо виконання 10 потоків і обмежимо виконання за допомогою семафору до двох одночасно

У цьому прикладі ми створили семафор, що обмежує до 2. Виконавши код, побачимо:

```
Th-0 Got semaphore
Th-1 Got semaphore
Th-1 finished
Th-0 finished
Th-2 Got semaphore
Th-3 Got semaphore
Th-3 finished
Th-2 finished
Th-4 Got semaphore
Th-5 Got semaphore
Th-5 finished
Th-4 finished
Th-6 Got semaphore
Th-7 Got semaphore
Th-7 finished
Th-6 finished
Th-8 Got semaphore
Th-9 Got semaphore
Th-9 finished
Th-8 finished
```



In [None]:
from threading import Semaphore, Thread
import logging
from time import sleep


def worker(condition):
    with condition:
        logging.debug(f'Got semaphore')
        sleep(1)
        logging.debug(f'finished')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    pool = Semaphore(2)
    for num in range(10):
        thread = Thread(name=f'Th-{num}', target=worker, args=(pool, ))
        thread.start()


Th-0 Got semaphore
Th-1 Got semaphore


Th-0 finished
Th-1 finished
Th-2 Got semaphore
Th-3 Got semaphore
Th-2 finished
Th-3 finished
Th-4 Got semaphore
Th-5 Got semaphore
Th-4 finished
Th-5 finished
Th-6 Got semaphore
Th-7 Got semaphore
Th-6 finished
Th-7 finished
Th-8 Got semaphore
Th-9 Got semaphore
Th-8 finished
Th-9 finished


## Синхронізація потоків

### Condition

Є примітиви синхронізації, які дозволяють потокам очікувати сигнал від інших потоків — це **Condition**. 

Створимо дві різні функції: одна **master** повідомлятиме, що **worker** може продовжити роботу. 

При цьому вони будуть виконуватись у різних потоках.

- Спочатку `master` у потоці виконує якусь роботу. 
- Після цього він виконує метод c`ondition.notify_all()` чим дозволяє запустити інші потоки, доки вони чекають виконання. 
- Вони очікують на виконання в точці виклику методу `condition.wait()`.

- Якщо ж `master `повинен дозволити роботу лише одному з `worker`, можна викликати метод `condition.notify()`, тоді тільки один з тих, хто очікує дозволу worker продовжить роботу. Другий чекатиме, доки не буде виконано наступне `condition.notify`.


In [14]:
from threading import Thread, Condition
import logging
from time import sleep


def worker(condition: Condition):
    logging.debug('Worker ready to work')
    with condition:
        condition.wait()
        logging.debug('The worker can do the work')


def master(condition: Condition):
    logging.debug('Master doing some work')
    sleep(2)
    with condition:
        logging.debug('Informing that workers can do the work')
        condition.notify_all()


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    condition = Condition()
    master = Thread(name='master', target=master, args=(condition,))

    worker_one = Thread(name='worker_one', target=worker, args=(condition, ))
    worker_two = Thread(name='worker_two', target=worker, args=(condition,))
    worker_one.start()
    worker_two.start()
    master.start()

    logging.debug('End program')


worker_one Worker ready to work
worker_two Worker ready to work
master Master doing some work
MainThread End program


master Informing that workers can do the work
worker_one The worker can do the work
worker_two The worker can do the work


### Event

Інший примітив синхронізації — це **потокобезпечний прапорець класу `Event`**. 
- Клас `Event` має внутрішній прапорець, який можуть встановлювати або скидати інші потоки. 
- Для цього використовують метод `set`, щоб встановити прапор та метод `clear` для скидання. 
- Методу `wait` класу `Event` зупиняє роботу потоку до того часу, доки інший потік не встановить прапор методом `set`. 
- Є можливість перевірити, чи встановлено прапор методом `is_set`.

Таким чином, `master` може встановити прапорець класу `Event`, і всі `worker` потоки продовжать роботу тільки після отримання дозволу.

In [15]:
from threading import Thread, Event
import logging
from time import sleep


def worker(event: Event):
    logging.debug('Worker ready to work')
    event.wait()
    logging.debug('The worker can do the work')


def master(event: Event):
    logging.debug('Master doing some work')
    sleep(2)
    logging.debug('Informing that workers can do the work')
    event.set()


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    event = Event()
    master = Thread(name='master', target=master, args=(event, ))

    worker_one = Thread(name='worker_one', target=worker, args=(event, ))
    worker_two = Thread(name='worker_two', target=worker, args=(event,))
    worker_one.start()
    worker_two.start()
    master.start()

    logging.debug('End program')


worker_one Worker ready to work
worker_two Worker ready to work
master Master doing some work
MainThread End program


master Informing that workers can do the work
worker_two The worker can do the work
worker_one The worker can do the work


Виникає закономірне питання, навіщо, якщо результат той самий? 
- Справа в тому, що ми можемо керувати виконанням, перезапуском та зупинкою роботи потоків через клас `Event`. 
- Наприклад, у наступному прикладі ми **перериваємо виконання потоку**, який працює в нескінченному циклі та інакше просто ніколи не завершиться.


In [16]:
from threading import Thread, Event
import logging
from time import sleep


def example_work(event_for_exit: Event):
    while True:
        sleep(1)
        logging.debug('Run event work')

        if event_for_exit.is_set():
            break


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    event = Event()
    thread = Thread(target=example_work, args=(event,))
    thread.start()

    sleep(5)
    event.set()

    logging.debug('End program')


Thread-39 (example_work) Run event work
Thread-39 (example_work) Run event work
Thread-39 (example_work) Run event work
Thread-39 (example_work) Run event work
MainThread End program


Thread-39 (example_work) Run event work


### Barrier

Останній примітив синхронізації, який ми розглянемо в `Python` — це бар'єр `Barrier`. 
- Він дозволяє задати умову, **щоб кілька потоків продовжили роботу лише після того, як задане число потоків добереться у виконанні коду до цього "бар'єру"**.

Використовується, коли вам потрібно, щоб робота застосунку продовжилася лише після того, як усі потоки зроблять якусь частину своєї роботи та дійдуть до деякої точки, з якою можна знову продовжувати.

Розглянемо наступний приклад:

Потік може дістатися бар'єру і чекати його за допомогою функції `wait()`. 
- Це блокуючий виклик, який повернеться, коли решта потоків (попередньо налаштована кількість `barrier = Barrier(5)`) дістануться бар'єру.

Функція очікування `wait()` повертає ціле число, яка вказує на кількість учасників, що залишилися до бар'єру. Якщо потік був останнім, що прибув, то повернене значення буде нульовим.

**Головна вимога, щоб кількість потоків, що запускаються, в нашому випадку 10 - `range(10)`, була кратною кількості бар'єру, у нашому випадку 5 - `Barrier(5)`.**



In [None]:
from random import randint
from threading import Thread, Barrier
import logging
from time import sleep, ctime


def worker(barrier: Barrier):
    logging.debug(f'Start thread: {ctime()}')
    sleep(randint(1, 3))  # Simulate some work
    r = barrier.wait()
    logging.debug(f'count: {r}')
    logging.debug(f'Barrier overcome: {ctime()}')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    barrier = Barrier(5)

    for num in range(10):
        thread = Thread(name=f'Th-{num}', target=worker, args=(barrier, ))
        thread.start()


Th-0 Start thread: Thu Mar 28 08:57:38 2024
Th-1 Start thread: Thu Mar 28 08:57:38 2024
Th-2 Start thread: Thu Mar 28 08:57:38 2024
Th-3 Start thread: Thu Mar 28 08:57:38 2024
Th-4 Start thread: Thu Mar 28 08:57:38 2024
Th-5 Start thread: Thu Mar 28 08:57:38 2024
Th-6 Start thread: Thu Mar 28 08:57:38 2024
Th-7 Start thread: Thu Mar 28 08:57:38 2024


Th-8 Start thread: Thu Mar 28 08:57:38 2024
Th-9 Start thread: Thu Mar 28 08:57:38 2024
Th-5 count: 4
Th-1 count: 0
Th-3 count: 1
Th-8 count: 3
Th-6 count: 2
Th-6 Barrier overcome: Thu Mar 28 08:57:40 2024
Th-1 Barrier overcome: Thu Mar 28 08:57:40 2024
Th-3 Barrier overcome: Thu Mar 28 08:57:40 2024
Th-8 Barrier overcome: Thu Mar 28 08:57:40 2024
Th-5 Barrier overcome: Thu Mar 28 08:57:40 2024
Th-9 count: 4
Th-0 count: 0
Th-2 count: 1
Th-7 count: 3
Th-4 count: 2
Th-9 Barrier overcome: Thu Mar 28 08:57:41 2024
Th-0 Barrier overcome: Thu Mar 28 08:57:41 2024
Th-2 Barrier overcome: Thu Mar 28 08:57:41 2024
Th-7 Barrier overcome: Thu Mar 28 08:57:41 2024
Th-4 Barrier overcome: Thu Mar 28 08:57:41 2024


## Пул потоків

В `Python` існує ще один механізм написання асинхронного коду. 
- Ви можете скористатися пакетом `concurrent.futures`. 
- Він дозволяє піднятися на вищий рівень абстракції, коли вам просто потрібно паралельно виконати ряд однотипних завдань і немає необхідності вдаватися до низькорівневих деталей реалізації.

Основна ідея полягає у використанні реалізації абстрактного класу `Executor`. 
- У `concurrent.futures` є дві реалізації цього абстрактного базового класу: 
     - `ProcessPoolExecutor` — для виконання коду окремих процесів (з ним ми познайомимося пізніше) та 
     - `ThreadPoolExecutor` — для виконання в окремих потоках.

Кожен такий `Executor` приховує набір потоків або процесів, яким ви можете дати роботу та отримати результат її виконання. Вам не потрібно вручну управляти створенням потоків та їх коректним завершенням.

Звичайно, все ще потрібно пам'ятати про доступ до загальних ресурсів та примітиви синхронізації.


У цьому прикладі 
- ми створюємо `ThreadPoolExecutor `
- та задаємо, що робота буде виконуватися не більше ніж 2 потоками. 
- Пул можна створити в менеджері контексту як у прикладі, щоб бути впевненим, що всі ресурси будуть коректно повернуті до системи після завершення. - Але це не обов'язково, можна створити пул і потім викликати у нього метод `shutdown`, щоб завершити всі потоки та повернути ресурси системі.

- Далі ми передаємо в пул функцію, яку потрібно виконати у кілька потоків, та набір аргументів цієї функції — кожен для виконання в окремому потоці. - В `executor` є метод `map`, він використовується, коли потрібно паралельно виконати функцію з різними вхідними аргументами в окремих потоках.
- Результатом виклику `map` буде ітератор за результатами виконання в окремих потоках.

Зверніть увагу, що код паралельно виконується не більше ніж двома потоками — ThreadPoolExecutor-0_0 та ThreadPoolExecutor-0_1. Результат повертається лише після того, як усі вхідні дані оброблені.

In [18]:
import concurrent.futures
import logging
from random import randint
from time import sleep


def greeting(name):
    logging.debug(f'greeting for: {name}')
    sleep(randint(0, 3))
    return f"Hello {name}"


arguments = (
    "Bill",
    "Jill",
    "Till",
    "Sam",
    "Tom",
    "John",
)

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        results = list(executor.map(greeting, arguments))

    logging.debug(results)


ThreadPoolExecutor-0_0 greeting for: Bill
ThreadPoolExecutor-0_1 greeting for: Jill
ThreadPoolExecutor-0_0 greeting for: Till
ThreadPoolExecutor-0_1 greeting for: Sam
ThreadPoolExecutor-0_1 greeting for: Tom
ThreadPoolExecutor-0_0 greeting for: John
MainThread ['Hello Bill', 'Hello Jill', 'Hello Till', 'Hello Sam', 'Hello Tom', 'Hello John']
