Придуман для написания http серверов, чтобы обрабатывать большшое количество клиентов, не тратя большое количество ресурсов.

In [None]:
import time
import random
from enum import Enum, auto
from threading import Thread # для создания потоков данных

In [None]:
def random_delay():
  return random.random() * 5



def random_countdown():
  return random.randrange(5)



def launch_rocket(delay, countdown):
  """
    delay - задежка
    countdown - количество как бы подготовок перед запуском
  """
  time.sleep(delay)
  # reversed - чтение в обратном порядке
  for i in reversed(range(countdown)):
    print(f"{i+1} ...")
    time.sleep(1)
  print("Rocket launched!")



def rockets():
  N = 10_000
  return [
      (random_delay(), random_countdown())
      for _ in range(N)
  ]


def run_threads():
  """
  target = передаем функцию
  args - аргументы функции
  """
  # добавляем потоки
  threads = [
      Thread(target=launch_rocket, args=(d, c))
      for d, c in rockets()
  ]
  for t in threads: # пробегаемся по элементам потока
    t.start() # запуск потока
  for t in threads:
    t.join() # джойним все потоки


class State(Enum):
  WAITING = auto()
  COUNTING = auto()
  LAUNCHING = auto()


class Op(Enum):
  WAIT = auto()
  STOP = auto()


class Launch:
  def __init__(self, delay, countdown):
    self.state = State.WAITING
    self.delay = delay
    self._countdown = countdown

  def step(self):
    if self._state is State.WAITING:
      return Op.WAIT
    if self._state is State.COUNTING:
      print(f"{self._countdown}...")
      self._countdown -= 1
      if self._countdown == 0:
        self._state = State.LAUNCHING
      return Op.WAIT, 1
    if self._state is State.LAUNCHING:
      print("Rocket launched!")
      return Op.STOP, None
    assert False, self._state

In [None]:
class Op(Enum):
  WAIT = auto()
  STOP = auto()

Op.STOP

<Op.STOP: 2>

In [None]:
launch_rocket(2, 3)

3 ...
2 ...
1 ...
Rocket launched!


Теперь мы хотим запускать все 10.000 ракет сразу.

In [None]:
for d, c in rockets():
  launch_rocket(d, c)

In [None]:
# запускаем 10.000 потоков сразу, но он не такой легкий, каким мы хотел бы это получить
run_threads()

Многопоточность. Ее мы можем использовать, чтобы значительно ускорить работы программ.

In [None]:
import _thread # низкоуровневая параллельность создания нити
import time

# Асинхронный запуск функции, как нити
def print_time(threadName, delay):
  count = 0
  while count < 5:
    time.sleep(delay)
    count += 1
    print(f"{threadName}, {time.ctime(time.time())}")

# Создаем нити
try:
  # Передаем функцию и аргументы этой функции
  _thread.start_new_thread(print_time, ("Thread-1", 2, ) )
  _thread.start_new_thread(print_time, ("Thread-2", 4, ) )
except:
  print("Error: unable to start thread")

while 1:
  pass

Thread-1, Mon Jan 29 21:43:00 2024
Thread-2, Mon Jan 29 21:43:02 2024
Thread-1, Mon Jan 29 21:43:02 2024
Thread-1, Mon Jan 29 21:43:04 2024
Thread-2, Mon Jan 29 21:43:06 2024
Thread-1, Mon Jan 29 21:43:06 2024
Thread-1, Mon Jan 29 21:43:08 2024
Thread-2, Mon Jan 29 21:43:10 2024
Thread-2, Mon Jan 29 21:43:14 2024
Thread-2, Mon Jan 29 21:43:18 2024


KeyboardInterrupt: 

In [None]:
import threading # позволяет взаимодействовать с нитью
import time


class MyThread(threading.Thread): # -> объект нити и за счет этого мы можем с ней взаимодействовать
  def __init__(self, name, delay):
    threading.Thread.__init__(self)
    self.name = name
    self.delay = delay

  def run(self):
    print("Starting thread %s." % self.name)
    thread_count_down(self.name, self.delay)
    print("Finished thread %s." % self.name)


def thread_count_down(name, delay):
  counter = 5

  while counter:
    time.sleep(delay)
    print("Thread %s counting down: %i..." % (name, counter))
    counter -= 1

In [None]:
import time

start = time.perf_counter() # позволяет замерить время работы функции (1)

def do_something():
  print("Sleeping 1 second: ")
  time.sleep(1)
  print("Done Sleeping...")

do_something()

finish = time.perf_counter() # (2)

Sleeping 1 second: 
DOne Sleeping...
