Процесс- запущенная программа. Процессы удобны для одновреманного выполнения нескольких задач. Поток похож на процесс, но ключевое отличие в том что поток разделяет ресурсы процессора и память. Потоки нужны когда у вас есть общий ресурс который вы намеренно хотите раздеялть.

При использовании потоков программа работает только на одном процессоре. Различные задачи, внутри потоков выполняются на одном ядре, а операционная система управляет, когда ваша программа работает с каким потоком.

Модуль threading в python нужет для упарвления потоками. При помощи класса Thread мы создаём поток, target указывет какую функцию будет исполнять поток, а args передаёт этой функции аргументы. Что бы поток начал действовать нажно написать t.start()

In [12]:
import time
import threading

def countdown(n):
    for i in range(n):
        print(n - i - 1, "left")
        time.sleep(1)
        
t = threading.Thread(target=countdown, args=(3, ))
t.start()

2 left
1 left
0 left


Поток так же можно создавать с помощью наследования класса Thread

In [8]:
import time
import threading

class CountdownThread(threading.Thread):
    def __init__(self, n):
        super().__init__()
        self.n = n
        
    def run(self): # вызывается методом start.
        for i in range(self.n):
            print(self.n - i - 1, "left")
            time.sleep(1)

t = CountdownThread(3)
t.start()

2 left
1 left
0 left


У каждого потока есть имя и идентификатор. По умолчанию потокам даётся имя "Thread-N"

In [11]:
import threading

print(threading.Thread().name)
print(threading.Thread(name="NumberCruncher").name)

t = threading.Thread()
t.start()
t.ident

Thread-9
NumberCruncher


13800

При помощи функции enumerate() можно получить список всех потоков

In [13]:
import threading

threading.enumerate()

[<_MainThread(MainThread, started 6880)>,
 <Thread(Thread-4, started daemon 12012)>,
 <Heartbeat(Thread-5, started daemon 6028)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 1808)>,
 <ParentPollerWindows(Thread-3, started daemon 5700)>]

Чтобы указать одному потоку дождаться завершения другого потока, вам нужно вызывать .join()

In [20]:
import threading
import time

def arf(a):
    time.sleep(a)
    print(10)

t = threading.Thread(target=arf, args=(5, ))

t.start()
t.join()

10


Проверить, выполняется ли поток, можно с помощью метода is_alive:

In [14]:
import threading
import time

t = threading.Thread(target=time.sleep, args=(5, ))

t.start()
print(t.is_alive())
t.join()
time.sleep(5)
print(t.is_alive())

True
False


Демон — это поток, созданный с аргументом daemon=True. Отличие потока-демона от обычного потока в том, что
потоки-демоны автоматически уничтожаются при выходе из интерпретатора. Уничтожение потока-демона не подразумевает процедуру финализации, поэтому следует быть аккуратным при использовании демонов для задач, работающих с ресурсами.

Если в программе запущены потоки, которые не являются демонами, то программа будет ожидать завершения этих потоков, прежде чем сможет завершится. Тем не менее, потоки, которые являются демонами, при закрытие программы просто убиваются, в каком бы они состояние ни находились.

In [15]:
import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,), daemon=True)
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    logging.info("Main    : all done")

13:00:22: Main    : before creating thread
13:00:22: Main    : before running thread
13:00:22: Thread 1: starting
13:00:22: Main    : wait for the thread to finish
13:00:22: Main    : all done
13:00:24: Thread 1: finishing


In [1]:
import threading
import time

t = threading.Thread(target=time.sleep, args=(5, ), daemon=True)
t.start()

t.daemon

True

С потоками возникает некоторая проблема. Например, два потока имеют чередующийся доступ к одному общему объекту, перезаписывая результаты друг друга. Подобные условия могут возникнуть, когда один поток освобождает память или закрывает дескриптор файла, прежде чем другой поток завершит доступ к нему. 

С помощью Lock можно избежать этой проблемы

In [18]:
import logging
import random
import threading
import time

class Counter:
    
    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start
        
    def increment(self):
        logging.debug('Waiting for lock')
        self.lock.acquire()
     
        try:
            logging.debug('Acquired lock')
            self.value = self.value + 1
        finally:
            self.lock.release()

def worker(c):
    for i in range(2):
        pause = random.random()
        logging.debug('Sleeping %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('Done')
        
logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()

In [19]:
logging.debug('Counter: %d', counter.value)

функция acquire даёт блокировку, а release реализует

Модуль multiprocessing

Этот модуль нужен для управления процессами. Местами он похож на предыдущий

In [11]:
import multiprocessing


def worker(num):
    """Рабочая функция"""
    print('Worker:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()


In [5]:
import os
from multiprocessing import Process
 
 
def doubler(number):
    """
    Функция умножитель на два
    """
    result = number * 2
    proc = os.getpid()
    print('{0} doubled to {1} by process id: {2}'.format(
        number, result, proc))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()
    
    for proc in procs:
        proc.join()

Как и у потоков у процессов есть свои имена

In [8]:
import multiprocessing
import time


def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')

def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')

if __name__ == "__main__":
    service = multiprocessing.Process(
    name="my_service",
    target=my_service,
    )
    worker_l = multiprocessing.Process(
    name='worker 1',
    target=worker,
    )
    
    worker_2 = multiprocessing.Process(
    target=worker,
    )
    
    worker_l.start()
    worker_2.start()
    service.start()

Демон-процессы. По умолчанию выход из основной программы осуществляется лишь после
того, как завершатся все дочерние процессы. Но иногда запускаются фоновые
процессы, которые выполняются, не блокируя выход из основной программы

In [9]:
import multiprocessing
import time
import sys


def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True
    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False
    d.start()
    time.sleep(1)
    n.start()


Вывод не включает сообщение “Exiting” из процесса-демона, поскольку все процессы, не являющиеся демонами (включая основную программу), успевают завершиться, прежде чем процесс-демон пробудится после приостановки на 2 секунды.


Join. На этот раз, поскольку основной процесс ожидает завершения процесса-демона, используя метод join (), соответствующая строка “Exiting” есть в выводе.

In [None]:
import multiprocessing
import time
import sys


def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True
    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False
    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()

terminate останавливает процесс принудительно, важно вызвать метод join() для процесса после принудительного прекращения его работы

In [None]:
import multiprocessing
import time


def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__' :
    p = multiprocessing.Process(target=slow_worker)

    print('BEFORE:', p, p.is_alive())
    p.start()

    print('DURING:', p, p.is_alive())
    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())

Создание процессов с помощью подклассов

In [10]:
import multiprocessing


class Worker(multiprocessing.Process):
    def run(self):
        print('In {}'.format(self.name))
        return
    
    
if __name__ == '__main__' :
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()


Для выполнения возложенной на него работы производный класс должен переопределить метод run ().

Lokc есть и у процессов. Чтобы не дать процессам конфликтовать друг с другом, мы используем объект Lock. Этот код зациклится над нашим списком трех объектов и создаст процесс для каждого из них. Каждый процесс будет вызывать нашу функцию, и передавать её одному из объектов. Так как мы используем замки, следующий процесс в строке будет ждать, пока замок не снимается, после чего он сможет продолжить.

In [None]:
from multiprocessing import Process, Lock


def printer(item, lock):
    """
    Выводим то что передали
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()


if __name__ == '__main__':
    lock = Lock()
    items = ['tango', 'foxtrot', 10]

    for item in items:
        p = Process(target=printer, args=(item, lock))
        p.start()

Pool ещё один способ создавать процессы Здесь мы создали экземпляр Pool и указали ему создать три рабочих процесса. Далее мы используем метод map для отображения функции для каждого процесса. Наконец мы выводим результат, что в нашем случае является списком: [10, 20, 40].

In [None]:
from multiprocessing import Pool


def doubler(number):
    return number * 2


if __name__ == '__main__':
    numbers = [5, 10, 20]
    pool = Pool(processes=3)
    print(pool.map(doubler, numbers))

Модуль asyncio

Допустим, у вас есть скрипт, который запрашивает данные от трех разных серверов. Иногда выполнение запроса к одному из этих серверов может неожиданно занять слишком много времени. Представьте, что получение данных со второго сервера занимает 10 секунд. Пока вы ждете, весь сценарий фактически ничего не делает.

Что если не дожидаться второго запроса, выполнить третий, а затем вернуться ко второму и продолжит с того места, где он был прерван? Это и будет асинхронный подход – переключение между задачами для минимизации времени простоя.

Этот модул основан на трёх китах

цикл событий (event loop) по большей части всего лишь управляет выполнением различных задач: регистрирует поступление и запускает в подходящий момент

сопрограммы — специальные функции, похожие на генераторы python, от которых ожидают (await), что они будут отдавать управление обратно в цикл событий. Необходимо, чтобы они были запущены именно через цикл событий

Фьючерсы — объекты, в которых хранится текущий результат выполнения какой-либо задачи. Это может быть информация о том, что задача ещё не обработана или уже полученный результат; а может быть вообще исключение


In [None]:
import asyncio  
import time  
from datetime import datetime

async def custom_sleep():  
    print('SLEEP', datetime.now())
    time.sleep(1)
    
async def factorial(name, number):  
    f = 1
    for i in range(2, number+1):
        print('Task {}: Compute factorial({})'.format(name, i))
        await custom_sleep()
        f *= i
    print('Task {}: factorial({}) is {}\n'.format(name, number, f))

    
start = time.time()  
loop = asyncio.get_event_loop()
tasks = [  
    asyncio.ensure_future(factorial("A", 3)),
    asyncio.ensure_future(factorial("B", 4)),
]

loop.run_until_complete(asyncio.wait(tasks))  
loop.close()
end = time.time()  
print("Total time: {}".format(end - start))

In [None]:
import asyncio  
import time  
from datetime import datetime

async def custom_sleep():  
    print('SLEEP {}\n'.format(datetime.now()))
    await asyncio.sleep(1)

async def factorial(name, number):  
    f = 1
    for i in range(2, number+1):
        print('Task {}: Compute factorial({})'.format(name, i))
        await custom_sleep()
        f *= i
    print('Task {}: factorial({}) is {}\n'.format(name, number, f))

start = time.time()  
loop = asyncio.get_event_loop()
tasks = [  
    asyncio.ensure_future(factorial("A", 3)),
    asyncio.ensure_future(factorial("B", 4)),
]
loop.run_until_complete(asyncio.wait(tasks))  
loop.close()
end = time.time()  
print("Total time: {}".format(end - start))

Сопрограмма могут быть запущены только из другой сопрограммы, или обёрнуты в задачу с помощью create_task

После того, как у нас оказались 2 задачи, объединим их, используя wait

И, наконец, отправим на выполнение в цикл событий через run_until_complete


Используя await в какой-либо корутине, мы таким образом объявляем, что корутина может отдавать управление обратно в event loop, который, в свою очередь, запустит какую-либо следующую задачу

Задание

При помощимодуля threading создайте два потока, один поток пусть выполняет функцию сложения двух чисел, второй пусть умножает два числа.