1. **Сравнение подходов для синхронизации потоков (1б)**

Напишите программу, которая моделирует доступ к общему ресурсу из пяти потоков. Каждый поток должен:

1. Заблокировать доступ к ресурсу.
2. Печатать сообщение о начале работы.
3. "Использовать" ресурс (симулируйте это с помощью `time.sleep(1)`).
4. Печатать сообщение об окончании работы и освобождать ресурс.

Сравните два подхода к синхронизации:

1. Используя `threading.Lock`.
2. Используя `threading.Semaphore`, где одновременно доступ к ресурсу могут иметь два потока.

**Пример использования:**
```python
import threading
import time

def access_resource_with_lock(lock):
    # Реализуйте доступ к ресурсу с использованием lock

def access_resource_with_semaphore(semaphore):
    # Реализуйте доступ к ресурсу с использованием semaphore

# Запустите пять потоков с lock и пять потоков с semaphore
```

**Один из возможных выводов (недетерминированный результат):**
```
С lock:
Поток 0 начал работу.
Поток 0 завершил работу.
Поток 1 начал работу.
Поток 1 завершил работу.
Поток 2 начал работу.
Поток 2 завершил работу.
Поток 3 начал работу.
Поток 3 завершил работу.
Поток 4 начал работу.
Поток 4 завершил работу.

С semaphore:
Поток 0 начал работу.
Поток 1 начал работу.
Поток 0 завершил работу.
Поток 2 начал работу.
Поток 1 завершил работу.
Поток 3 начал работу.
Поток 2 завершил работу.
Поток 4 начал работу.
Поток 3 завершил работу.
Поток 4 завершил работу.
```


2. **Очередь задач с задержкой на потоках (2б)**

Реализуйте систему обработки задач с очередью `queue.PriorityQueue`. Программа должна состоять из:

1. **Производителя** (`producer`): добавляет задачи с приоритетами от 1 до 5 (1 — самый высокий) в очередь с интервалом 0.5 секунды.
2. **Потребителя** (`consumer`): извлекает задачи из очереди в порядке приоритета и обрабатывает их (задержка 1 секунда на задачу).

Если потребитель не получает новую задачу за 2 секунды, он завершает выполнение.

**Пример использования:**
```python
from queue import PriorityQueue
import threading
import time

def producer(queue):
    for i in range(5, 0, -1): # Добавление задачи с такими приоритетами в очередь в такой последовательности
        # Ваш код

def consumer(queue):
    # Извлекайте задачи в порядке приоритета и обрабатывайте их

# Создайте потоки для producer и consumer
```

**Пример возможного вывода (недетерминированный результат)**
```
Добавлена задача с приоритетом 5
Обработка Задача с приоритетом 5
Добавлена задача с приоритетом 4
Обработка Задача с приоритетом 4Добавлена задача с приоритетом 3

Добавлена задача с приоритетом 2
Обработка Задача с приоритетом 2Добавлена задача с приоритетом 1

Обработка Задача с приоритетом 1
Обработка Задача с приоритетом 3
Очередь пуста, завершение.
```


3. **Ускорение обработки данных с помощью `ProcessPoolExecutor` / `multiprocessing.Pool` / `modin`  (2б)**

Напишите программу, которая читает большой CSV-файл с колонками `col1` и `col2`. Используйте `ProcessPoolExecutor` для выполнения следующих операций:

1. Для каждой строки вычислить результат: $ \text{result} = \text{col1}^2 + 2 \cdot \text{col2} $.
2. Записать результаты обратно в новый CSV-файл.

Добавьте измерение времени выполнения.

**Пример использования::**
```python
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool


def process_row(row):
    # Реализуйте вычисление для одной строки

df = pd.DataFrame({
    'col1': range(1, 1000001),
    'col2': range(1000000, 0, -1)
})

# Реализуйте чтение, обработку и запись данных с использованием ProcessPoolExecutor / multiprocessing.Pool. Удалось ли добиться ускорения с помощью распараллеливания?
# Попробуйте использовать разные размеры чанков – 25000, 200000, 500000,

# А теперь попробуйте использовать библиотеку `modin`. Сравните результаты
```


### Задача 1

In [1]:
import threading
import time

def access_resource_with_lock(n, lock):
    with lock:
        print(f"Поток {n} начал выполнение")
        time.sleep(1)
        print(f"Поток {n} завершился")

lock = threading.Lock()

threads = [threading.Thread(target=access_resource_with_lock, args=(i,lock)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Поток 0 начал выполнение
Поток 0 завершился
Поток 1 начал выполнение
Поток 1 завершился
Поток 2 начал выполнение
Поток 2 завершился
Поток 3 начал выполнение
Поток 3 завершился
Поток 4 начал выполнение
Поток 4 завершился


### Задача 2

In [2]:
from queue import PriorityQueue
import threading
import time

def producer(queue):
    for i in range(5, 0, -1): # Добавление задачи с такими приоритетами в очередь в такой последовательности
        queue.put((i, f'task_{i}'))
        time.sleep(0.5)
        print(f'Добавлена задача с приоритетом {i}')

def consumer(queue):
    while True:
        try:
            item = queue.get(timeout=2)
        except:
            break
        time.sleep(1)
        print(f"Обработка задачи с приоритетом {item[0]}")
        queue.task_done()

queue = PriorityQueue()

producer_thread = threading.Thread(target=producer, args=(queue,))
consumer_thread = threading.Thread(target=consumer, args=(queue,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Добавлена задача с приоритетом 5
Обработка задачи с приоритетом 5
Добавлена задача с приоритетом 4
Добавлена задача с приоритетом 3
Обработка задачи с приоритетом 4
Добавлена задача с приоритетом 2
Добавлена задача с приоритетом 1
Обработка задачи с приоритетом 2
Обработка задачи с приоритетом 1
Обработка задачи с приоритетом 3


### Задача 3

In [1]:
import pandas as pd

df = pd.DataFrame({
    'col1': range(1, 1000001),
    'col2': range(1000000, 0, -1)
})
df


Unnamed: 0,col1,col2
0,1,1000000
1,2,999999
2,3,999998
3,4,999997
4,5,999996
...,...,...
999995,999996,5
999996,999997,4
999997,999998,3
999998,999999,2


In [2]:
df = pd.read_csv('df.csv')

FileNotFoundError: [Errno 2] No such file or directory: 'df.csv'

In [3]:
import pandas as pd
import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool

def process_row(row):
    return row['col1']**2 + 2 * row['col2']

def process_chunk(df_part):
    df_part['result'] = df_part.apply(process_row, axis=1)
    return df_part


def parallel_apply(df, func, type_processing, chunk_size=25000):
    n_cores = len(df) // chunk_size
    chunks = [df[i:i+chunk_size] for i in range(0, len(df), chunk_size)]

    with type_processing(n_cores) as pool:
        results = pool.map(func, chunks)

    return pd.concat(results)


for chunk in [25000, 200_000, 500_000]:
    print(f'chunk: {chunk}')
    start_time = time.time()
    df_pool = parallel_apply(df, process_chunk, type_processing=Pool, chunk_size=chunk)
    print(f'total time with Pool: {(time.time() - start_time):.2f}')

    start_time = time.time()
    df_process_pool = parallel_apply(df, process_chunk, type_processing=ProcessPoolExecutor, chunk_size=chunk)
    print(f'total time with ProcessPoolExecutor: {(time.time() - start_time):.2f}')

chunk: 25000
total time with Pool: 9.78
total time with ProcessPoolExecutor: 11.51
chunk: 200000
total time with Pool: 12.41
total time with ProcessPoolExecutor: 8.60
chunk: 500000
total time with Pool: 8.57
total time with ProcessPoolExecutor: 9.67


In [4]:
! pip install "modin[dask]"



In [5]:
import modin.pandas as mpd

modin_df = mpd.DataFrame(df)

for chunk in [25000, 200_000, 500_000]:
    start_time = time.time()
    modin_df_res = modin_df.apply(process_row, axis=1)
    print(f'total time with modin: {(time.time() - start_time):.2f}')

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:46131
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:36155'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46637'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:42851 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42851
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:55022
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:45827 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:45827
INFO:distributed.core:Starting established connection to tcp://127

total time with modin: 24.19
total time with modin: 26.18
total time with modin: 29.78


### Выводы:
Чтение и запись результатов вынес за пределы цикла, тк это ничем не отличается, а расчеты проводил в цикле разными методами. Самым быстрым оказался Pool из multiprocessing в разбивке с самым большим чанком.
Самым медленным стал modin

In [7]:
df_pool

Unnamed: 0,col1,col2,result
0,1,1000000,2000001
1,2,999999,2000002
2,3,999998,2000005
3,4,999997,2000010
4,5,999996,2000017
...,...,...,...
999995,999996,5,999992000026
999996,999997,4,999994000017
999997,999998,3,999996000010
999998,999999,2,999998000005


In [8]:
start_time = time.time()
df_res = df.apply(process_row, axis=1)
print(f'total time with just pandas apply: {(time.time() - start_time):.2f}')

total time with just pandas apply: 10.32
