<a href="https://colab.research.google.com/github/TaniaZharova2205/HSE/blob/main/Python_1_2/DZ3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1. **Сравнение подходов для синхронизации потоков (1б)**

Напишите программу, которая моделирует доступ к общему ресурсу из пяти потоков. Каждый поток должен:

1. Заблокировать доступ к ресурсу.
2. Печатать сообщение о начале работы.
3. "Использовать" ресурс (симулируйте это с помощью `time.sleep(1)`).
4. Печатать сообщение об окончании работы и освобождать ресурс.

Сравните два подхода к синхронизации:

1. Используя `threading.Lock`.
2. Используя `threading.Semaphore`, где одновременно доступ к ресурсу могут иметь два потока.

In [None]:
import threading
import time


def access_resource_with_lock(lock, thread_id):
    with lock:
        print(f"Поток {thread_id} начал работу.")
        time.sleep(1)
        print(f"Поток {thread_id} завершил работу.")


def access_resource_with_semaphore(semaphore, thread_id):
    with semaphore:
        print(f"Поток {thread_id} начал работу.")
        time.sleep(1)
        print(f"Поток {thread_id} завершил работу.")


def main():
    # Синхронизация с Lock
    print("С lock:")
    lock = threading.Lock()
    threads = []
    for i in range(5):
        thread = threading.Thread(target=access_resource_with_lock, args=(lock, i))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    # Синхронизация с Semaphore
    print("\n С semaphore:")
    semaphore = threading.Semaphore(2)
    threads = []
    for i in range(5):
        thread = threading.Thread(target=access_resource_with_semaphore, args=(semaphore, i))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

С lock:
Поток 0 начал работу.
Поток 0 завершил работу.
Поток 1 начал работу.
Поток 1 завершил работу.
Поток 2 начал работу.
Поток 2 завершил работу.
Поток 3 начал работу.
Поток 3 завершил работу.
Поток 4 начал работу.
Поток 4 завершил работу.

 С semaphore:
Поток 0 начал работу.
Поток 1 начал работу.
Поток 0 завершил работу.
Поток 2 начал работу.
Поток 1 завершил работу.
Поток 3 начал работу.
Поток 2 завершил работу.
Поток 4 начал работу.
Поток 3 завершил работу.
Поток 4 завершил работу.


2. **Очередь задач с задержкой на потоках (2б)**

Реализуйте систему обработки задач с очередью `queue.PriorityQueue`. Программа должна состоять из:

1. **Производителя** (`producer`): добавляет задачи с приоритетами от 1 до 5 (1 — самый высокий) в очередь с интервалом 0.5 секунды.
2. **Потребителя** (`consumer`): извлекает задачи из очереди в порядке приоритета и обрабатывает их (задержка 1 секунда на задачу).

Если потребитель не получает новую задачу за 2 секунды, он завершает выполнение.

In [None]:
from queue import PriorityQueue
import threading
import time

def producer(queue):
    for i in range(5, 0, -1): # Добавление задачи с такими приоритетами в очередь в такой последовательности
        print(f"Добавлена задача с приоритетом {i}")
        time.sleep(0.5)
        queue.put(i)

def consumer(queue):
    while True:
          try:
              item = queue.get(timeout=2)
          except Exception:
              print("Очередь пуста, завершение.")
              break
          else:
              print(f"Обработка Задача с приоритетом {item}")
              time.sleep(1)
              queue.task_done()

def main():
    queue = PriorityQueue()

    producer_thread = threading.Thread(target=producer, args=(queue,))
    producer_thread.start()

    consumer_thread = threading.Thread(target=consumer, args=(queue,))
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()


if __name__ == '__main__':
    main()


Добавлена задача с приоритетом 5
Добавлена задача с приоритетом 4Обработка Задача с приоритетом 5

Добавлена задача с приоритетом 3
Добавлена задача с приоритетом 2
Обработка Задача с приоритетом 3
Добавлена задача с приоритетом 1
Обработка Задача с приоритетом 1
Обработка Задача с приоритетом 2
Обработка Задача с приоритетом 4
Очередь пуста, завершение.


3. **Ускорение обработки данных с помощью `ProcessPoolExecutor` / `multiprocessing.Pool` / `modin`  (2б)**

Напишите программу, которая читает большой CSV-файл с колонками `col1` и `col2`. Используйте `ProcessPoolExecutor` для выполнения следующих операций:

1. Для каждой строки вычислить результат: $ \text{result} = \text{col1}^2 + 2 \cdot \text{col2} $.
2. Записать результаты обратно в новый CSV-файл.

Добавьте измерение времени выполнения.


In [None]:
! pip install "modin[dask]"

In [None]:
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool
import time


def process_row(row):
    row['result'] = row['col1']**2 + 2 * row['col2']
    return row

def process_with_process_pool_executor(input_file, output_file, row_size, pool_size):
    print(f"=== ProcessPoolExecutor с чанком {row_size} ===")
    start_time = time.time()
    rows = pd.read_csv(input_file, chunksize=row_size)

    results = []
    with ProcessPoolExecutor(max_workers=pool_size) as executor:
        futures = [executor.submit(process_row, chunk) for chunk in rows]
        results = [future.result() for future in futures]

    final_df = pd.concat(results)
    final_df.to_csv(output_file, index=False)

    end_time = time.time()
    print(f"ProcessPoolExecutor завершено за {end_time - start_time:.2f} секунд.\n")

def process_with_multiprocessing_pool(input_file, output_file, row_size, pool_size):
    print(f"=== multiprocessing.Pool с чанком {row_size} ===")
    start_time = time.time()
    rows = pd.read_csv(input_file, chunksize=row_size)

    results = []
    with Pool(processes=pool_size) as pool:
        results = pool.map(process_row, rows)

    final_df = pd.concat(results)
    final_df.to_csv(output_file, index=False)

    end_time = time.time()
    print(f"multiprocessing.Pool завершено за {end_time - start_time:.2f} секунд.\n")

def process_modin(input_file, output_file):
    print(f"=== Обычная обработка с Pandas ===")
    start_time = time.time()
    df = pd.read_csv(input_file)
    df['result'] = df['col1']**2 + 2 * df['col2']
    df.to_csv(output_file, index=False)
    end_time = time.time()
    print(f"Pandas завершено за {end_time - start_time:.2f} секунд.\n")

def main():
    input_file = 'large_file.csv'
    output_file_executor = 'processed_file_executor.csv'
    output_file_modin = 'processed_file_modin.csv'
    output_file_pool = 'processed_file_pool.csv'
    df = pd.DataFrame({
        'col1': range(1, 1000001),
        'col2': range(1000000, 0, -1)
    })
    df.to_csv(input_file, index=False)
    for row_size in [25000, 200000, 500000]:
        process_with_process_pool_executor(input_file, output_file_executor, row_size, pool_size=4)
        process_with_multiprocessing_pool(input_file, output_file_pool, row_size, pool_size=4)
    process_modin(input_file, output_file_modin)

if __name__ == '__main__':
    main()

=== ProcessPoolExecutor с чанком 25000 ===
ProcessPoolExecutor завершено за 2.12 секунд.

=== multiprocessing.Pool с чанком 25000 ===
multiprocessing.Pool завершено за 2.16 секунд.

=== ProcessPoolExecutor с чанком 200000 ===
ProcessPoolExecutor завершено за 2.03 секунд.

=== multiprocessing.Pool с чанком 200000 ===
multiprocessing.Pool завершено за 3.67 секунд.

=== ProcessPoolExecutor с чанком 500000 ===
ProcessPoolExecutor завершено за 2.70 секунд.

=== multiprocessing.Pool с чанком 500000 ===
multiprocessing.Pool завершено за 2.39 секунд.

=== Обычная обработка с Pandas ===
Pandas завершено за 3.06 секунд.

