In [None]:
import threading
import time

def cpu_task():
    total = 0
    for i in range(10**7):
        total += i

threads = [threading.Thread(target=cpu_task) for _ in range(4)]


In [None]:
start = time.time()
for t in threads:
    t.start()
for t in threads:
    t.join()
end = time.time()

print(f"Время выполнения с потоками: {end - start:.2f} секунд")

Время выполнения с потоками: 3.83 секунд


In [None]:
start = time.time()
[cpu_task() for _ in range(4)]
end = time.time()

print(f"Время выполнения без потоков: {end - start:.2f} секунд")

Время выполнения без потоков: 2.69 секунд


In [None]:
from multiprocessing import Process

def print_task(n):
    print(f"Процесс {n} начал выполнение")
    time.sleep(1)
    print(f"Процесс {n} завершился")

processes = [Process(target=print_task, args=(i,)) for i in range(3)]
for p in processes:
    p.start()
for p in processes:
    p.join()

Процесс 0 начал выполнение
Процесс 1 начал выполнениеПроцесс 2 начал выполнение

Процесс 0 завершился
Процесс 1 завершился
Процесс 2 завершился


In [None]:
import multiprocessing

def increment(counter, lock):
    for _ in range(10**6):
        with lock:
        # lock.acquire()
            counter.value += 1
        # lock.release()

if __name__ == "__main__":
    counter = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    processes = [multiprocessing.Process(target=increment, args=(counter, lock)) for _ in range(2)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"Result: {counter.value}")


Result: 2000000


In [None]:
import threading

counter = 0

def increment():
    global counter
    for _ in range(10**6):
        temp = counter
        temp += 1
        counter = temp

threads = [threading.Thread(target=increment) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Result: {counter}")


Result: 2000000


In [None]:
import threading

event = threading.Event()

def waiter():
    print("Жду события...")
    event.wait()  # Блокируем поток, пока событие не установлено
    print("Событие произошло!")

def setter():
    print("Готовлюсь установить событие...")
    event.set()  # Устанавливаем событие, разблокируя ожидающий поток

thread1 = threading.Thread(target=waiter)
thread2 = threading.Thread(target=setter)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

Жду события...
Готовлюсь установить событие...
Событие произошло!


In [None]:
import threading
from queue import Queue

queue = Queue()

def producer():
    for i in range(5):
        queue.put(i)
        print(f"Производитель добавил: {i}")

def consumer():
    while not queue.empty():
        item = queue.get()
        print(f"Потребитель взял: {item}")
        queue.task_done()

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
producer_thread.join()

consumer_thread.start()
consumer_thread.join()

Производитель добавил: 0
Производитель добавил: 1
Производитель добавил: 2
Производитель добавил: 3
Производитель добавил: 4
Потребитель взял: 0
Потребитель взял: 1
Потребитель взял: 2
Потребитель взял: 3
Потребитель взял: 4


In [None]:
import multiprocessing
import time

semaphore = multiprocessing.Semaphore(2)

def task(n):
    with semaphore:  # Гарантируем, что только 2 процесса выполняют эту секцию
        print(f"Процесс {n} начал выполнение")
        time.sleep(1)
        print(f"Процесс {n} завершил выполнение")

if __name__ == "__main__":
    processes = [multiprocessing.Process(target=task, args=(i,)) for i in range(5)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()


Процесс 0 начал выполнение
Процесс 1 начал выполнение
Процесс 0 завершил выполнение
Процесс 2 начал выполнение
Процесс 1 завершил выполнение
Процесс 3 начал выполнение
Процесс 2 завершил выполнение
Процесс 4 начал выполнение
Процесс 3 завершил выполнение
Процесс 4 завершил выполнение


In [None]:
import threading

barrier = threading.Barrier(3)

def worker(n):
    print(f"Поток {n} достиг барьера")
    barrier.wait()  # Ожидание других потоков
    print(f"Поток {n} продолжил выполнение")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]

for t in threads:
    t.start()
for t in threads:
    t.join()

In [None]:
import threading
import requests

def fetch_data(url):
    response = requests.get(url)
    print(f"Получены данные с {url}: {len(response.text)} символов")

urls = ['https://example.com', 'https://httpbin.org/get', 'https://api.github.com']
threads = [threading.Thread(target=fetch_data, args=(url,)) for url in urls]

for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

Получены данные с https://example.com: 1256 символов
Получены данные с https://api.github.com: 2262 символов
Получены данные с https://httpbin.org/get: 308 символов


In [None]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
import aiohttp
import asyncio

async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            content = await response.text()
            print(f"Получены данные с {url}: {len(content)} символов")

async def main():
    urls = ['https://example.com', 'https://httpbin.org/get', 'https://api.github.com']
    tasks = [fetch_data(url) for url in urls]
    await asyncio.gather(*tasks)

asyncio.run(main())

Получены данные с https://example.com: 1256 символов
Получены данные с https://api.github.com: 2262 символов
Получены данные с https://httpbin.org/get: 312 символов


In [None]:
%%time
from concurrent.futures import ProcessPoolExecutor
import math

def calculate_factorial(n):
    print(f"Вычисляю факториал для {n}")
    return math.factorial(n)

numbers = [100000, 200000, 300000]

with ProcessPoolExecutor(max_workers=2) as executor:
    results = list(executor.map(calculate_factorial, numbers))

# print(f"Результаты: {[len(str(res)) for res in results]} цифр")

Вычисляю факториал для 100000
Вычисляю факториал для 200000
Вычисляю факториал для 300000
CPU times: user 42.3 ms, sys: 29.5 ms, total: 71.7 ms
Wall time: 2.92 s


In [None]:
%%time
from concurrent.futures import ProcessPoolExecutor
import math

def calculate_factorial(n):
    print(f"Вычисляю факториал для {n}")
    return math.factorial(n)

numbers = [100000, 200000, 300000]

with ProcessPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(calculate_factorial, numbers))

# print(f"Результаты: {[len(str(res)) for res in results]} цифр")

Вычисляю факториал для 100000Вычисляю факториал для 200000
Вычисляю факториал для 300000

CPU times: user 29.7 ms, sys: 38.5 ms, total: 68.2 ms
Wall time: 1.85 s


In [None]:
from multiprocessing import Pool

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

with Pool(4) as pool:
    results = pool.map(square, numbers)

print(f"Квадраты чисел: {results}")

Квадраты чисел: [1, 4, 9, 16, 25]


In [None]:
from multiprocessing import Process, Pipe

def sender(pipe):
    pipe.send("Сообщение из отправителя")

def receiver(pipe):
    message = pipe.recv()
    print(f"Получено сообщение: {message}")

parent_conn, child_conn = Pipe()

p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))

p1.start()
p1.join()
p2.start()
p2.join()

Получено сообщение: Сообщение из отправителя


In [None]:
from multiprocessing.dummy import Pool

def fetch_url(url):
    import requests
    response = requests.get(url)
    return len(response.text)

urls = ['https://example.com', 'https://httpbin.org/get', 'https://api.github.com']

with Pool(4) as pool:
    results = pool.map(fetch_url, urls)

print(f"Размеры ответов: {results}")

Размеры ответов: [1256, 308, 2262]


In [None]:
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Функция для загрузки данных
def fetch_data(url):
    response = requests.get(url)
    return response.text

# Функция для обработки данных
def process_data(data):
    return len(data)

urls = ['https://example.com', 'https://httpbin.org/get', 'https://api.github.com']

# Загрузка данных с помощью потоков
with ThreadPoolExecutor(max_workers=3) as thread_executor:
    raw_data = list(thread_executor.map(fetch_data, urls))

# Обработка данных с помощью процессов
with ProcessPoolExecutor(max_workers=3) as process_executor:
    results = list(process_executor.map(process_data, raw_data))

print(f"Результаты обработки: {results}")

Результаты обработки: [1256, 308, 2262]


In [None]:
import pandas as pd
import numpy as np

# Генерация большого DataFrame
N_ROWS = 10**6
df = pd.DataFrame({
    'col1': np.random.randint(0, 100, N_ROWS),
    'col2': np.random.rand(N_ROWS),
    'col3': np.random.randint(0, 50, N_ROWS)
})

In [None]:
from multiprocessing import Pool

# Функция для обработки одной части DataFrame
def process_chunk(chunk):
    chunk['result'] = chunk['col1'] * chunk['col2'] + chunk['col3']
    return chunk

# Разделение DataFrame на части
def parallel_apply(df, func, n_cores=4):
    chunk_size = len(df) // n_cores
    chunks = [df[i * chunk_size:(i + 1) * chunk_size] for i in range(n_cores)]

    with Pool(n_cores) as pool:
        results = pool.map(func, chunks)

    return pd.concat(results)

# Применение функции параллельно
result_df = parallel_apply(df, process_chunk, n_cores=4)

In [None]:
! pip install "modin[dask]"

Collecting modin[dask]
  Downloading modin-0.32.0-py3-none-any.whl.metadata (17 kB)
Collecting distributed>=2.22.0 (from modin[dask])
  Downloading distributed-2024.11.2-py3-none-any.whl.metadata (3.3 kB)
Collecting dask>=2.22.0 (from modin[dask])
  Downloading dask-2024.11.2-py3-none-any.whl.metadata (3.7 kB)
Collecting sortedcontainers>=2.0.5 (from distributed>=2.22.0->modin[dask])
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tblib>=1.6.0 (from distributed>=2.22.0->modin[dask])
  Downloading tblib-3.0.0-py3-none-any.whl.metadata (25 kB)
Collecting zict>=3.0.0 (from distributed>=2.22.0->modin[dask])
  Downloading zict-3.0.0-py2.py3-none-any.whl.metadata (899 bytes)
Downloading distributed-2024.11.2-py3-none-any.whl (1.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dask-2024.11.2-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import modin.pandas as mpd

# Преобразование pandas DataFrame в modin DataFrame
modin_df = mpd.DataFrame(df)

# Выполнение вычислений
modin_df['result'] = modin_df['col1'] * modin_df['col2'] + modin_df['col3']

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:42893
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44473'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38903'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:43345', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:43345
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:50270
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:40697', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker co

In [None]:
import polars as pl

# Преобразование pandas DataFrame в polars DataFrame
polars_df = pl.from_pandas(df)

# Выполнение вычислений
polars_df = polars_df.with_columns((pl.col("col1") * pl.col("col2") + pl.col("col3")).alias("result"))

In [None]:
import time

# Функция для измерения времени выполнения
def benchmark(func, *args, **kwargs):
    start_time = time.time()
    result = func(*args, **kwargs)
    end_time = time.time()
    print(f"{func.__name__} выполнено за {end_time - start_time:.2f} секунд")
    return result

# Тестовые функции
def test_pandas_multiprocessing():
    return parallel_apply(df, process_chunk, n_cores=4)

# def test_modin():
#     modin_df = mpd.DataFrame(df)
#     modin_df['result'] = modin_df['col1'] * modin_df['col2'] + modin_df['col3']
#     return modin_df

def test_polars():
    polars_df = pl.from_pandas(df)
    return polars_df.with_columns((pl.col("col1") * pl.col("col2") + pl.col("col3")).alias("result"))

# Запуск бенчмарка
benchmark(test_pandas_multiprocessing)
# benchmark(test_modin)
benchmark(test_polars)
pass

test_pandas_multiprocessing выполнено за 0.74 секунд
test_polars выполнено за 0.09 секунд


In [None]:
def process_pandas(chunk):
    chunk['result'] = chunk['col1'] * chunk['col2'] + chunk['col3']
    return chunk

def test_pandas():
    return process_pandas(df)


benchmark(test_pandas)

test_pandas выполнено за 0.01 секунд


Unnamed: 0,col1,col2,col3,result
0,77,0.965153,42,116.316808
1,71,0.491469,6,40.894321
2,85,0.928030,1,79.882522
3,84,0.413701,33,67.750871
4,59,0.012272,37,37.724064
...,...,...,...,...
999995,20,0.305980,49,55.119592
999996,28,0.438685,3,15.283171
999997,22,0.170794,19,22.757478
999998,47,0.628686,27,56.548244


In [None]:
from joblib import Parallel, delayed

# Обработка частей DataFrame
def process_chunk(chunk):
    chunk['result'] = chunk['col1'] * chunk['col2'] + chunk['col3']
    return chunk

# Распараллеливание по частям DataFrame
def parallel_apply_chunks(df, func, n_jobs=4):
    chunks = np.array_split(df, n_jobs)
    results = Parallel(n_jobs=n_jobs)(delayed(func)(chunk) for chunk in chunks)
    return pd.concat(results)

# Применение
benchmark(parallel_apply_chunks, df, process_row, 4)