# Использование отдельных датчиков роботом (часть 2). Потоки, многозадачность и использование модуля threading в Python.

## Что такое потоки?

**Потоки _(threads)_** в Python - это легковесные подпрограммы, которые выполняются параллельно внутри процесса. Они позволяют программе выполнять несколько задач одновременно и увеличивают общую производительность программы.

## Основные понятия

* **Многозадачность:**

    * Многозадачность - это способность системы выполнять несколько задач одновременно. Она может быть реализована как с использованием конкурентности, так и с использованием параллельности.

* **Конкурентность *(Concurrency)*:**

    <img src="images/threading/Concurrency.png" alt="Concurrency-image" width="300">

    * Несколько задач выполняются за определенный период времени, но не обязательно одновременно *(выполняются в пересмешивающемся порядке, и могут быть реализованы на одном физическом процессоре. В этом случае, на первый взгляд, задачи могут казаться выполняемыми одновременно, но фактически они чередуются в выполнении.)*

* **Параллельность *(Parallelism)*:**

    <img src="images/threading/Parallelism.png" alt="Parallelism-image" width="300">

    * Параллельность означает, что несколько задач выполняются фактически одновременно.

    * Потоки могут быть параллельными, когда они выполняются одновременно на многопроцессорной системе или на многоядерном процессоре.

* **Глобальная блокировка интерпретатора *(Global Interpreter Lock - GIL)*:**

    * GIL - это механизм, который предотвращает одновременное выполнение нескольких потоков Python кода в одном процессе.

    * Это ограничение происходит из-за управления памятью в CPython, интерпретаторе Python, который используется по умолчанию.

    * GIL не позволяет использовать полностью многопоточность для эффективной параллельной обработки в Python. Однако он не мешает конкурентной обработке ввода-вывода.

## Модуль **threading**

Python предоставляет встроенный модуль `threading`, который упрощает работу с потоками. Этот модуль позволяет создавать, управлять и синхронизировать потоки в Python.

Для создания потока в Python используется класс `Thread` из модуля `threading`. Он и будет являтся основной вещью при дальнейшей работе с многопоточностью.

### Thread _(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)_

* **group** должно быть *None*; *зарезервировано для будущего расширения при реализации класса `ThreadGroup`.*

* **target** - это вызываемый объект, который будет вызван методом `run()`. По умолчанию используется значение *None*, что означает, что ничего не вызывается.

* **name** - это имя потока. По умолчанию создается уникальное имя вида *Thread-N*, где *N* - небольшое десятичное число, или *Thread-N (target)*, где *target* - это `target.__name__`, если указан целевой аргумент.

* **args** - это кортеж аргументов для целевого вызова. По умолчанию используется *()*.

* **kwargs** - это словарь аргументов ключевых слов для целевого вызова. По умолчанию используется значение *{}*.

* **daemon** отвечает за то, что будет ли поток работать независмо от основной программы или нет.

#### start _()_

Запуск активности потока.

Он должен вызываться не более одного раза для каждого объекта потока. Он обеспечивает вызов метода `run()` объекта в отдельном потоке управления.

Этот метод вызовет `RuntimeError`, если вызывается более одного раза для одного и того же объекта потока.

#### run _()_

Метод, представляющий активность потока.

Вы можете переопределить этот метод в подклассе. Стандартный метод `run()` вызывает вызываемый объект, передаваемый конструктору объекта в качестве **целевого аргумента _(target)_**, если таковой имеется, с __позиционными аргументами__ и __аргументами ключевого слова__, взятыми из аргументов __args__ и __kwargs__ соответственно.

#### join _(timeout=None)_

Дождидается завершения потока. Это блокирует вызывающий поток до тех пор, пока поток, метод `join()` которого вызывается, не завершится – либо обычным образом, либо через необработанное исключение – или пока не наступит необязательный тайм-аут.

Когда аргумент **timeout** присутствует, а не *None*, это должно быть число с плавающей запятой, указывающее время ожидания для операции в секундах (или их долях). Поскольку `join()` всегда возвращает *None*, следует вызвать `is_alive()` после `join()`, чтобы решить, произошел ли тайм–аут - если поток все еще жив, время ожидания вызова `join()` истекло.

Когда аргумент **timeout** отсутствует или *None*, операция будет блокироваться до завершения потока.

Использовать метод `join()` можно много раз.

`join()` вызывает `RuntimeError`, если предпринята попытка присоединиться к текущему потоку, поскольку это приведет к взаимоблокировке. Также является ошибкой искользовать `join()` к потоку до того, как он был запущен, и попытки сделать это вызывают такое же исключение.

#### name

Строка, используемая только для целей идентификации. Она не имеет семантики. Нескольким потокам может быть присвоено одно и то же имя. Начальное имя задается конструктором.

#### ident

**Идентификатор потока** этого потока или *None*, если поток не был запущен. Это ненулевое целое число. Идентификаторы потоков могут быть повторно использованы при завершении потока и создании другого потока. Идентификатор доступен даже после завершения потока.

#### native_id

**Идентификатор потока (TID)** этого потока, присвоенный операционной системой *(ядром)*. Это неотрицательное целое число или *None*, если поток не был запущен. Это значение может использоваться для уникальной идентификации этого конкретного потока в масштабах всей системы *(до тех пор, пока поток не завершится, после чего значение может быть повторно использовано операционной системой)*.

#### is_alive _()_

Возвращает, активен ли поток.

#### daemon

Логическое значение, указывающее, является ли этот поток потоком демона *(True)* или нет *(False)*. Это должно быть установлено перед вызовом `start()`, в противном случае возникает ошибка `RuntimeError`. Его начальное значение наследуется от создающего потока; основной поток не является потоком-демоном, и поэтому для всех потоков, созданных в основном потоке, **по умолчанию используется значение daemon = False**.

Вся программа Python завершается, когда не остается живых потоков, не являющихся демонами.

### Создание потока

In [None]:
import threading

def task():
    # код для выполнения в потоке
    print("Task executed by thread")

# Создание объекта потока
thread = threading.Thread(target=task)

# Запуск потока
thread.start()

### Передача аргументов в поток

In [None]:
import threading

def print_numbers(start, end):
    for i in range(start, end+1):
        print(i)

# Создание объекта потока с передачей аргументов
my_thread = threading.Thread(target=print_numbers, args=(1, 5))

# Запуск потока
my_thread.start()

### Ожидание завершения потока

In [None]:
import time
import threading

def my_function():
    # код для выполнения в потоке
    for _ in range(10):
        time.sleep(1)

# Создание объекта потока
my_thread = threading.Thread(target=my_function)

# Запуск потока
my_thread.start()

# Ожидание завершения потока
my_thread.join()

### Избегание гонок данных

In [None]:
import threading

counter = 0
lock = threading.Lock()

def increment_counter():
    global counter
    with lock:
        counter += 1

# Создание объектов потока
thread1 = threading.Thread(target=increment_counter)
thread2 = threading.Thread(target=increment_counter)

# Запуск потоков
thread1.start()
thread2.start()

# Ожидание завершения потоков
thread1.join()
thread2.join()

print("Counter:", counter)

### Общение между потоками

In [None]:
import threading

shared_variable = 0
shared_lock = threading.Lock()

def increment():
    global shared_variable
    for _ in range(100000):
        with shared_lock:
            shared_variable += 1

def decrement():
    global shared_variable
    for _ in range(100000):
        with shared_lock:
            shared_variable -= 1

# Создание потоков
thread_increment = threading.Thread(target=increment)
thread_decrement = threading.Thread(target=decrement)

# Запуск потоков
thread_increment.start()
thread_decrement.start()

# Ожидание завершения потоков
thread_increment.join()
thread_decrement.join()

print("Результат:", shared_variable)

### Передача данных между потоками

#### Используя глобальную переменную

In [None]:
import threading
import time

# Глобальная переменная для хранения данных
shared_variable = 0

# Функция, которая будет выполняться в отдельном потоке
def worker():
    global shared_variable
    for i in range(5):
        # Запись данных в переменную в отдельном потоке
        shared_variable += 1
        time.sleep(1)

# Создание объекта для отдельного потока
thread = threading.Thread(target=worker)

# Запуск потока
thread.start()

# Основной поток
for i in range(5):
    # Вывод значений переменной из основного потока
    print(f"Значение переменной в основном потоке: {shared_variable}")
    time.sleep(1)

# Ожидание завершения отдельного потока
thread.join()

# Вывод значения переменной после завершения всех потоков
print(f"Значение переменной после завершения всех потоков: {shared_variable}")

Обратите внимание, что использование глобальных переменных в многопоточном коде требует аккуратности, чтобы избежать гонок данных и других проблем синхронизации. В данном примере использовалась простая переменная, и в реальном коде, возможно, потребуется использовать мьютексы или другие механизмы синхронизации в зависимости от конкретных требований раличных задач.

#### Использование очередей *(Queue)*

In [None]:
import threading
import queue
import time

# Создание объекта очереди
data_queue = queue.Queue()

# Функция, которая будет выполняться в отдельном потоке
def worker():
    for i in range(5):
        # Запись данных в очередь в отдельном потоке
        data_queue.put(i)
        time.sleep(1)

# Создание объекта для отдельного потока
thread = threading.Thread(target=worker)

# Запуск потока
thread.start()

# Основной поток
for i in range(5):
    # Получение данных из очереди в основном потоке
    data = data_queue.get()
    print(f"Значение из очереди в основном потоке: {data}")
    time.sleep(1)

# Ожидание завершения отдельного потока
thread.join()

Очереди предоставляют безопасный и удобный способ обмена данными между потоками. Они автоматически обеспечивают синхронизированный доступ к данным, избегая гонок данных. Так же они позволяют управлять порядком обработки данных, организуя их в FIFO (первым пришел, первым обработан) порядке.

Из-за механизма блокировки, использование очередей может привести к замедлению приложения в случае частого доступа к данным из разных потоков.

### **Event** в **threading**

**Event** - это механизм синхронизации в модуле threading, который позволяет одному потоку уведомлять другие о каких-либо событиях. **Event** может быть установлен в состояние *установлено* или *сброшено*. Потоки могут ожидать, пока событие не станет установленным, или сбрасывать его.

#### Пример:

In [None]:
import threading
import time

def worker(event):
    print("Worker начинает ожидание события")
    event.wait()  # Поток ожидает, пока событие не станет установленным
    print("Событие произошло. Worker продолжает выполнение.")

# Создание объекта Event
my_event = threading.Event()

# Создание объекта потока
my_thread = threading.Thread(target=worker, args=(my_event,))

# Запуск потока
my_thread.start()

# Некоторая работа выполняется в основном потоке

time.sleep(2)  # Подождем 2 секунды, чтобы убедиться, что Worker уже ждет

# Установка события
print("Установка события.")
my_event.set()

# Ожидание завершения потока
my_thread.join()

В этом примере основной поток устанавливает событие, и тем самым, *разблокирует* ожидающий поток. **Event** может быть использован для синхронизации и координации работы потоков в различных сценариях.

## Примеры применения

Далее будут представленны примеры использования многопоточности непосредственно на Ev3. Больше примеров можно посмотреть в папке `resources/Thread`. Они представлены в общем виде *(без использования модуля ev3dev2)*.

### Использование потока

In [None]:
#!/usr/bin/env python3

from threading import Thread
import time

from ev3dev2.sensor.lego import TouchSensor
from ev3dev2.sound import Sound


ts = TouchSensor()
sound = Sound()


def twenty_tones():
    for _ in range(0,20):           # Повторение 20 раз
        sound.play_tone(1000, 0.2)  # 1000 Гц в течение 0,2 с
        time.sleep(0.5)


t = Thread(target=twenty_tones)
t.start()

for i in range(0,5):       # Повторение пять раз, при этом i = 0, 1, 2, 3, 4.
    ts.wait_for_bump()

sound.beep()

### Использование потока с передачей аргументов

In [None]:
#!/usr/bin/env python3

from threading import Thread
import time

from ev3dev2.sensor.lego import TouchSensor
from ev3dev2.sound import Sound


ts = TouchSensor()
sound = Sound()


def twenty_tones(frequency):
    for _ in range(0,20):               # Повторение 20 раз
        sound.play_tone(frequency, 0.2) # 1500 Гц в течение 0,2 с
        time.sleep(0.5)


t = Thread(target=twenty_tones, args=(1500,))
t.setDaemon(True)
t.start()

for i in range(0,5):  # Повторение пять раз, при этом i = 0, 1, 2, 3, 4.
    ts.wait_for_bump()

sound.beep()

### Использование потока совместно с Event

In [None]:
#!/usr/bin/env python3

from threading import Thread, Event
import time

from ev3dev2.sensor.lego import TouchSensor
from ev3dev2.sound import Sound


ts = TouchSensor()
sound = Sound()
event = Event()


def tones_forever():
    while not event.is_set():
        sound.play_tone(1000, 0.2)
        time.sleep(0.5)


t = Thread(target=tones_forever)
t.start()

for i in range(0,5):
    ts.wait_for_bump()

sound.beep()

event.set()

### Чтение данных в потоке

In [None]:
#!/usr/bin/env python3

import os
import time
from threading import Thread

from ev3dev2.console import Console
from ev3dev2.motor import OUTPUT_A, OUTPUT_B, MoveTank

# Задаем шрифт для вывода на экран
os.system("setfont Greek-TerminusBold20x10")

# Значения энкодера
LEFT_VALUE = 0
RIGHT_VALUE = 0

def read_motors_positions(move_tank: MoveTank):
    global LEFT_VALUE, RIGHT_VALUE
    while True:
        # Записываем текущие значения энкодера
        LEFT_VALUE = move_tank.left_motor.position
        RIGHT_VALUE = move_tank.right_motor.position
        time.sleep(0.1)


def main():
    console = Console()

    # Создаем объект моторов
    move_tank = MoveTank(OUTPUT_A, OUTPUT_B)

    # Создаем поток чтения данных энкодеров
    read_thread = Thread(target=read_motors_positions, args=(move_tank,))
    # Запускаем поток
    read_thread.start()

    while True:
        # Очищаем все с экрана
        console.reset_console()

        # Выводим значения на дисплей
        print("LEFT:\t%s\nRIGHT:\t%s\r"%(LEFT_VALUE, RIGHT_VALUE))

        time.sleep(0.1)

## Задание

1. Вывести на экран значения на экран данные всех подключенных датчиков. *(Для каждого отдельного типа датчика одтельный поток)*

2. Заставить робота ездить прямо, используя энкодеры. *(Передавать скорость, учитывая отношение значений энкодеров левого и правого моторов)*

3. Учитывая *пункт 2*, проехать по периметру стола-стенда. На пути могут быть препядствия, от которых нужно уворачиваться, используя ультразвукавой датчик.

## Полезные ссылки

* [Официальная документация по использования датчиков при помощи модуля ev3dev2](https://ev3dev-lang.readthedocs.io/projects/python-ev3dev/en/ev3dev-stretch/sensors.html)

* [Официальная документация модуля threading](https://docs.python.org/3.10/library/threading.html)

* [GIL и его влияние на многопоточность Python](https://habr.com/ru/articles/592189/)

* [Практический гайд по процессам и потокам (и не только) в Python](https://habr.com/ru/articles/773376/)

* [Making lunch faster with Python concurrency](https://sourcery.ai/blog/concurrency/)

* [What is the difference between concurrency and parallelism?](https://stackoverflow.com/questions/1050222/what-is-the-difference-between-concurrency-and-parallelism)

* [Логирование в Python: руководство разработчика](https://habr.com/ru/companies/wunderfund/articles/683880/)

* [8 продвинутых возможностей модуля logging в Python, которые вы не должны пропустить](https://habr.com/ru/articles/513966/)