1. **Сравнение подходов для синхронизации потоков (1б)**

Напишите программу, которая моделирует доступ к общему ресурсу из пяти потоков. Каждый поток должен:

1. Заблокировать доступ к ресурсу.
2. Печатать сообщение о начале работы.
3. "Использовать" ресурс (симулируйте это с помощью `time.sleep(1)`).
4. Печатать сообщение об окончании работы и освобождать ресурс.

Сравните два подхода к синхронизации:

1. Используя `threading.Lock`.
2. Используя `threading.Semaphore`, где одновременно доступ к ресурсу могут иметь два потока.

**Пример использования:**
```python
import threading
import time

def access_resource_with_lock(lock):
    # Реализуйте доступ к ресурсу с использованием lock

def access_resource_with_semaphore(semaphore):
    # Реализуйте доступ к ресурсу с использованием semaphore

# Запустите пять потоков с lock и пять потоков с semaphore
```

**Один из возможных выводов (недетерминированный результат):**
```
С lock:
Поток 0 начал работу.
Поток 0 завершил работу.
Поток 1 начал работу.
Поток 1 завершил работу.
Поток 2 начал работу.
Поток 2 завершил работу.
Поток 3 начал работу.
Поток 3 завершил работу.
Поток 4 начал работу.
Поток 4 завершил работу.

С semaphore:
Поток 0 начал работу.
Поток 1 начал работу.
Поток 0 завершил работу.
Поток 2 начал работу.
Поток 1 завершил работу.
Поток 3 начал работу.
Поток 2 завершил работу.
Поток 4 начал работу.
Поток 3 завершил работу.
Поток 4 завершил работу.
```


2. **Очередь задач с задержкой на потоках (2б)**

Реализуйте систему обработки задач с очередью `queue.PriorityQueue`. Программа должна состоять из:

1. **Производителя** (`producer`): добавляет задачи с приоритетами от 1 до 5 (1 — самый высокий) в очередь с интервалом 0.5 секунды.
2. **Потребителя** (`consumer`): извлекает задачи из очереди в порядке приоритета и обрабатывает их (задержка 1 секунда на задачу).

Если потребитель не получает новую задачу за 2 секунды, он завершает выполнение.

**Пример использования:**
```python
from queue import PriorityQueue
import threading
import time

def producer(queue):
    for i in range(5, 0, -1): # Добавление задачи с такими приоритетами в очередь в такой последовательности
        # Ваш код

def consumer(queue):
    # Извлекайте задачи в порядке приоритета и обрабатывайте их

# Создайте потоки для producer и consumer
```

**Пример возможного вывода (недетерминированный результат)**
```
Добавлена задача с приоритетом 5
Обработка Задача с приоритетом 5
Добавлена задача с приоритетом 4
Обработка Задача с приоритетом 4Добавлена задача с приоритетом 3

Добавлена задача с приоритетом 2
Обработка Задача с приоритетом 2Добавлена задача с приоритетом 1

Обработка Задача с приоритетом 1
Обработка Задача с приоритетом 3
Очередь пуста, завершение.
```


3. **Ускорение обработки данных с помощью `ProcessPoolExecutor` / `multiprocessing.Pool` / `modin`  (2б)**

Напишите программу, которая читает большой CSV-файл с колонками `col1` и `col2`. Используйте `ProcessPoolExecutor` для выполнения следующих операций:

1. Для каждой строки вычислить результат: $ \text{result} = \text{col1}^2 + 2 \cdot \text{col2} $.
2. Записать результаты обратно в новый CSV-файл.

Добавьте измерение времени выполнения.

**Пример использования::**
```python
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool


def process_row(row):
    # Реализуйте вычисление для одной строки

df = pd.DataFrame({
    'col1': range(1, 1000001),
    'col2': range(1000000, 0, -1)
})

# Реализуйте чтение, обработку и запись данных с использованием ProcessPoolExecutor / multiprocessing.Pool. Удалось ли добиться ускорения с помощью распараллеливания?
# Попробуйте использовать разные размеры чанков – 25000, 200000, 500000,

# А теперь попробуйте использовать библиотеку `modin`. Сравните результаты
```


##1

In [7]:
import multiprocessing
import threading
import time


# Реализуйте доступ к ресурсу с использованием lock
def access_resource_with_lock(n, lock):
    with lock:
        print(f"Процесс {n} c lock начал выполнение")
        time.sleep(1)
        print(f"Процесс {n} c lock завершил выполнение")


# Реализуйте доступ к ресурсу с использованием semaphore
def access_resource_with_semaphore(n, semaphore):
    with semaphore:
        print(f"Процесс {n} c semaphore начал выполнение")
        time.sleep(1)
        print(f"Процесс {n} c semaphore завершил выполнение")


# Запустите пять потоков с lock и пять потоков с semaphore
if __name__ == "__main__":
    lock = multiprocessing.Lock()
    treads_with_lock = [threading.Thread (target=access_resource_with_lock, args=(i, lock)) for i in range(5)]
    start=time.time()
    for tread in treads_with_lock:
        tread.start()
    for tread in treads_with_lock:
        tread.join()
    print(time.time() - start)
    print()
    semaphore = multiprocessing.Semaphore(2)
    treads_with_semaphore = [threading.Thread (target=access_resource_with_semaphore, args=(i, semaphore)) for i in range(5)]
    start=time.time()
    for tread in treads_with_semaphore:
        tread.start()
    for tread in treads_with_semaphore:
        tread.join()
    print(time.time() - start)

Процесс 0 c lock начал выполнение
Процесс 0 c lock завершил выполнение
Процесс 1 c lock начал выполнение
Процесс 1 c lock завершил выполнение
Процесс 2 c lock начал выполнение
Процесс 2 c lock завершил выполнение
Процесс 3 c lock начал выполнение
Процесс 3 c lock завершил выполнение
Процесс 4 c lock начал выполнение
Процесс 4 c lock завершил выполнение
5.019611358642578

Процесс 0 c semaphore начал выполнение
Процесс 1 c semaphore начал выполнение
Процесс 0 c semaphore завершил выполнение
Процесс 2 c semaphore начал выполнение
Процесс 1 c semaphore завершил выполнение
Процесс 3 c semaphore начал выполнение
Процесс 2 c semaphore завершил выполнение
Процесс 3 c semaphore завершил выполнение
Процесс 4 c semaphore начал выполнение
Процесс 4 c semaphore завершил выполнение
3.0093400478363037


##2

In [23]:
from queue import PriorityQueue
import threading
import time


que = PriorityQueue()


def producer(que):
    for i in range(5, 0, -1): # Добавление задачи с такими приоритетами в очередь в такой последовательности
        que.put(i)
        print(f"Добавляется задача с приоритетом: {i}")
        time.sleep(0.5)

def consumer(que):
    while not que.empty() or time.sleep(2):
        item = que.get() # Извлекайте задачи в порядке приоритета и обрабатывайте их
        print(f"Обрабатывается задача с приоритетом {item}")
        time.sleep(1)
        que.task_done()


# Создайте потоки для producer и consumer
producer_thread = threading.Thread(target=producer, args=(que, ))
consumer_thread = threading.Thread(target=consumer, args=(que, ))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Добавляется задача с приоритетом: 5
Обрабатывается задача с приоритетом 5
Добавляется задача с приоритетом: 4
Добавляется задача с приоритетом: 3
Обрабатывается задача с приоритетом 3
Добавляется задача с приоритетом: 2
Добавляется задача с приоритетом: 1
Обрабатывается задача с приоритетом 1
Обрабатывается задача с приоритетом 2
Обрабатывается задача с приоритетом 4


##3

In [44]:
import pandas as pd
from concurrent.futures import ProcessPoolExecutor


def process_row(row):
    return row[0]**2+2*row[1] # Реализуйте вычисление для одной строки

df = pd.DataFrame({
    'col1': range(1, 1000001),
    'col2': range(1000000, 0, -1)
})

# Реализуйте чтение, обработку и запись данных с использованием ProcessPoolExecutor
with ProcessPoolExecutor(4) as process_executor:
    start = time.time()
    results = list(process_executor.map(process_row, df.values.tolist()))
    pd.DataFrame(results).to_csv('new.csv', index=False)
    print(time.time() - start)

268.51463651657104


In [45]:
pd.read_csv('/content/new.csv')

Unnamed: 0,0
0,2000001
1,2000002
2,2000005
3,2000010
4,2000017
...,...
999995,999992000026
999996,999994000017
999997,999996000010
999998,999998000005


In [47]:
# Реализуйте чтение, обработку и запись данных с использованием multiprocessing.Pool.
from multiprocessing import Pool

with Pool() as pool:
      start = time.time()
      results = pool.map(process_row, df.values.tolist())
      pd.DataFrame(results).to_csv('new_pool.csv', index=False)
      print(time.time() - start)

2.6061363220214844


In [51]:
#without parallelism
start = time.time()
res = [process_row(row) for row in df.values.tolist()]
pd.DataFrame(results).to_csv('new_without_parallelism.csv', index=False)
print(time.time() - start)

#Удалось ли добиться ускорения с помощью распараллеливания?
#Нет, многопроцессность работает дольше

1.57847261428833


In [52]:
from multiprocessing.dummy import Pool

with Pool(4) as pool:
    start = time.time()
    results = pool.map(process_row, df.values.tolist())
    pd.DataFrame(results).to_csv('new_threads.csv', index=False)
    print(time.time() - start)

8.756702423095703


In [69]:
# Попробуйте использовать разные размеры чанков – 25000, 200000, 500000,
def process_chunk(chunk):
    return pd.DataFrame([process_row(row) for row in chunk.values.tolist()])


def parallel_apply(df, func, chank_size):
    chunks = [df[i-chank_size : i] for i in range(chank_size, len(df), chank_size)]

    with Pool(4) as pool:
        results = pool.map(func, chunks)

    return pd.concat(results)

# chank size 25k
start = time.time()
result_df = parallel_apply(df, process_chunk, 25000)
pd.DataFrame(result_df).to_csv('new_chank_25k.csv', index=False)
print(f'Chank_size = 25k. Execution time: {time.time() - start}')

#chank size 200k

start = time.time()
result_df = parallel_apply(df, process_chunk, 200000)
pd.DataFrame(result_df).to_csv('new_chank_200k.csv', index=False)
print(f'Chank_size = 200k. Execution time: {time.time() - start}')

#chank size 500k

start = time.time()
result_df = parallel_apply(df, process_chunk, 500000)
pd.DataFrame(result_df).to_csv('new_chank_500k.csv', index=False)
print(f'Chank_size = 500k. Execution time: {time.time() - start}')

Chank_size = 25k. Execution time: 1.6118719577789307
Chank_size = 200k. Execution time: 1.28285551071167
Chank_size = 500k. Execution time: 0.7979354858398438


In [68]:
# А теперь попробуйте использовать библиотеку `modin`. Сравните результаты
! pip install "modin[dask]"

Collecting modin[dask]
  Downloading modin-0.32.0-py3-none-any.whl.metadata (17 kB)
Collecting distributed>=2.22.0 (from modin[dask])
  Downloading distributed-2024.12.0-py3-none-any.whl.metadata (3.3 kB)
Collecting dask>=2.22.0 (from modin[dask])
  Downloading dask-2024.12.0-py3-none-any.whl.metadata (3.7 kB)
Collecting sortedcontainers>=2.0.5 (from distributed>=2.22.0->modin[dask])
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tblib>=1.6.0 (from distributed>=2.22.0->modin[dask])
  Downloading tblib-3.0.0-py3-none-any.whl.metadata (25 kB)
Collecting zict>=3.0.0 (from distributed>=2.22.0->modin[dask])
  Downloading zict-3.0.0-py2.py3-none-any.whl.metadata (899 bytes)
Downloading distributed-2024.12.0-py3-none-any.whl (1.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dask-2024.12.0-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [70]:
import modin.pandas as mpd

# Преобразование pandas DataFrame в modin DataFrame
modin_df = mpd.DataFrame(df)

# Выполнение вычислений
start=time.time()
res = modin_df['col1']**2 + 2*modin_df['col2']
pd.DataFrame(res).to_csv('new_modin.csv', index=False)
print(f'Modin execution time: {time.time() - start}')

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:43507
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:34703'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40679'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:38305 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:38305
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:52354
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:38585 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:38585
INFO:distributed.core:Starting established connection to tcp://127

Modin execution time: 1.8624153137207031
