# Параллельные вычисления

Материалы:
* Макрушин С.В. Лекция 10: Параллельные вычисления
* https://docs.python.org/3/library/multiprocessing.html

## Задачи для совместного разбора

[Dostoevskiy Fedor. Igrok - BooksCafe.Net](https://drive.google.com/file/d/1FjleuV-3PdnEXrRVYCLQcneoH0Nt80Xq/view?usp=sharing)

[Prestuplenie i nakazanie](https://drive.google.com/file/d/1vpSLx196TAdHTcY7ID7rWTrLKB_zX3Gd/view?usp=sharing)

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
path_igrok = "/content/sample_data/Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt"
path_prestuplenie = "/content/sample_data/Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt"

1. Посчитайте, сколько раз встречается каждый из символов (заглавные и строчные символы не различаются) в файле `Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt` и в файле `Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt`. 

In [None]:
from collections import Counter

In [None]:
f = open(path_prestuplenie, 'r', encoding="windows-1251")

data = f.readlines()
counter = Counter()
for i in range(len(data)):
  counter += Counter(data[i].lower())
counter.most_common(10)

2. Решить задачу 1, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создать свой собственный процесс. 

In [None]:
import multiprocessing

In [None]:
def func(file_name: str):
  with open(file_name, 'r', encoding="windows-1251") as f:
    data = f.readlines()
    counter = Counter()
    for i in range(len(data)):
      counter += Counter(data[i].lower())
    print(counter.most_common(10))

pr = (multiprocessing.Process(target=func, args=(path_prestuplenie,)), multiprocessing.Process(target=func, args=(path_igrok,)) )
for i in pr:
  i.start()
  i.join()

3.Найти все целые числа от 1 до 400_00, для которых синус от этого числа будет иметь не менее 9 одинаковых цифр (например, sin(139198) =0.30763333543133703 - 9 троек)\
Ответ: список пар (целое число, значение sin для него), для которых значение синуса удовлетворяет условию.

In [None]:
from math import sin

In [None]:
def func(a, b):
  for i in range(a, b):
    counter = Counter()
    sin_i = str(sin(i))
    counter += Counter(sin_i)
    if counter.most_common(1)[0][1] >= 9:
      q.put((i, sin_i))

q = multiprocessing.Queue()
pr = (multiprocessing.Process(target=func, args=(i * 50_000, (i + 1) * 50_000)) for i in range(9))
for i in pr:
  i.start()
  i.join()

while not q.empty():
  print(q.get())

In [None]:
def func(a, b):
  for i in range(a, b):
    counter = Counter()
    sin_i = str(sin(i))
    counter += Counter(sin_i)
    if counter.most_common(1)[0][1] >= 9:
      q.put((i, sin_i))

q = multiprocessing.Queue()
pr = (multiprocessing.Process(target=func, args=(i * 5_000, (i + 1) * 5_001)) for i in range(9))
for i in pr:
  i.start()
  i.join()

while not q.empty():
  print(q.get())

## Лабораторная работа 10

In [16]:
import pandas as pd
import numpy as np
import multiprocessing as mp
import threading as th

[recipes_full.csv](https://drive.google.com/file/d/1uvhT0Uy-xOKUq2TSd1CAP9D2v1ltaalM/view?usp=sharing)

1. Разбейте файл `recipes_full.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `id_tag_nsteps_*.csv`. Каждый файл содержит 3 столбца: `id`, `tag` и `n_steps`, разделенных символом `;`. Для разбора строк используйте `csv.reader`.

__Важно__: вы не можете загружать в память весь файл сразу. Посмотреть на первые несколько строк файла вы можете, написав код, который считывает эти строки.

Подсказка: примерное кол-во строк в файле - 2.3 млн.

```
id;tag;n_steps
137739;60-minutes-or-less;11
137739;time-to-make;11
137739;course;11
```


In [138]:
import csv
import os
import math

# Определение количества файлов, на которые нужно разделить исходный файл.
num_files = 8

# Определение имени исходного файла.
input_file_name = '/content/drive/MyDrive/recipes_full.csv'

# Определение имени выходной папки.
output_folder_name = 'output'

# Создание выходной папки, если она не существует.
if not os.path.exists(output_folder_name):
    os.makedirs(output_folder_name)

# Открытие исходного файла для чтения.
with open(input_file_name, 'r', encoding='utf-8') as input_file:

    # Чтение заголовка исходного файла.
    header = next(input_file)

    # Создание списка для каждого выходного файла.
    output_files = []
    for i in range(num_files):
        output_file_name = f'{output_folder_name}/id_tag_nsteps_{i}.csv'
        output_file = open(output_file_name, 'w', newline='', encoding='utf-8')
        output_files.append(output_file)

    # Создание объектов csv.writer для каждого выходного файла.
    writers = [csv.writer(output_file, delimiter=';') for output_file in output_files]

    # Запись заголовка в каждый выходной файл.
    for writer in writers:
        writer.writerow(['id', 'tag', 'n_steps'])

    # Чтение строк исходного файла и запись их в соответствующие выходные файлы.
    input_reader = csv.reader(input_file, delimiter=',', quotechar='"')
    for row in input_reader:
        id = row[1]
        tag = row[5]
        n_steps = row[6]
        # Определение номера выходного файла, в который будет записана строка.
        output_file_index = math.floor(int(id) / (math.ceil(2300000 / num_files)))
        writers[output_file_index].writerow([id, tag, n_steps])

    # Закрытие всех выходных файлов.
    for output_file in output_files:
        output_file.close()


KeyboardInterrupt: ignored

In [None]:
d=pd.read_csv("/content/output/id_tag_nsteps_3.csv",sep=';')
# Удаляем символы '[ ] ' из строк с тэгами
d['tag'] = d['tag'].str.strip("[ ']+").apply(lambda x: x.replace("'", ""))
    
    # Преобразуем строки в списки тэгов
d['tag'] = d['tag'].str.split(',')
d.explode('tag')

2. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает среднее значение количества шагов для каждого тэга и возвращает результат в виде словаря.

In [None]:
import pandas as pd

def calculate_avg_nsteps_by_tag(file_name):
    # Считываем файл в DataFrame
    df = pd.read_csv(file_name, delimiter=';', usecols=['id', 'tag', 'n_steps'])
    # Удаляем символы '[ ] ' и '' из строк с тэгами
    df['tag'] = df['tag'].str.strip("[ ']+").apply(lambda x: x.replace("'", ""))

    # Преобразуем строки в списки тэгов
    df['tag'] = df['tag'].str.split(',')
    # Удаляем пробелы в начале и конце тэгов
    df['tag'] = df['tag'].apply(lambda x: [t.strip() for t in x])
    # Разбиваем список тэгов на отдельные тэги и создаем отдельную строку для каждого тэга
    df = df.explode('tag')
    
    # Группируем строки по тэгу и вычисляем среднее значение количества шагов
    avg_nsteps_by_tag = df.groupby('tag')['n_steps'].mean().to_dict()

    # Удаляем пустой ключ и ключи с пробелами
    avg_nsteps_by_tag.pop('', None)
    avg_nsteps_by_tag.pop(' ', None)
    return avg_nsteps_by_tag


In [None]:
print(calculate_avg_nsteps_by_tag("/content/output/id_tag_nsteps_0.csv"))

3. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга по всем файлам, полученным в задаче 1, и возвращает результат в виде словаря. Не используйте параллельных вычислений. При реализации выделите функцию, которая объединяет результаты обработки отдельных файлов. Модифицируйте код из задачи 2 таким образом, чтобы иметь возможность получить результат, имея результаты обработки отдельных файлов. Определите, за какое время задача решается для всех файлов.


In [None]:
import time
def merge_results(results_list):
    merged_results = {}
    for result in results_list:
        for tag, avg_nsteps in result.items():
            if tag in merged_results:
                merged_results[tag] += avg_nsteps
            else:
                merged_results[tag] = avg_nsteps
    # Вычисляем среднее значение по каждому тэгу
    num_files = len(results_list)
    for tag in merged_results:
        merged_results[tag] /= num_files
    return merged_results

def calculate_avg_nsteps_by_tag_all_files(folder_path):
    start_time = time.time()
    results_list = []
    for filename in os.listdir(folder_path):
        if filename.endswith(".csv"):
            file_path = os.path.join(folder_path, filename)
            results_list.append(calculate_avg_nsteps_by_tag(file_path))
    avg_nsteps_by_tag_all_files = merge_results(results_list)
    end_time = time.time()
    print(f"Задача выполнена за {end_time - start_time} секунд")
    return avg_nsteps_by_tag_all_files

In [None]:
calculate_avg_nsteps_by_tag_all_files("/content/output")

4. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создайте свой собственный процесс. Определите, за какое время задача решается для всех файлов.

In [None]:

import os
import time
import multiprocessing

if __name__ == '__main__':
    start_time = time.time()
    pool = multiprocessing.Pool(processes=os.cpu_count())
    results = []
    for filename in os.listdir('/content/output/'):
        if filename.endswith('.csv'):
            result = pool.apply_async(calculate_avg_nsteps_by_tag, args=('/content/output/' + filename,))
            results.append(result)
    pool.close()
    pool.join()

    final_result = {}
    for result in results:
        result_dict = result.get()
        for tag, value in result_dict.items():
            if tag not in final_result:
                final_result[tag] = [value]
            else:
                final_result[tag].append(value)

    for tag, values in final_result.items():
        average = sum(values) / len(values)
        final_result[tag] = average

    print(final_result)
    print(f'Time taken: {time.time() - start_time} seconds')


5. (*) Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Создайте фиксированное количество процессов (равное половине количества ядер на компьютере). При помощи очереди передайте названия файлов для обработки процессам и при помощи другой очереди заберите от них ответы. 

In [None]:
import multiprocessing as mp
from collections import defaultdict

if __name__ == '__main__':
    start_time = time.time()
    file_names = os.listdir('/content/output/')
    num_processes = mp.cpu_count() // 2
    pool = mp.Pool(num_processes)
    # Создание очереди и передача имен файлов на обработку процессам
    file_queue = mp.Manager().Queue()
    for file_name in file_names:
        file_queue.put('/content/output/'+file_name)
    # Создание очереди для получения результатов
    result_queue = mp.Manager().Queue()
    
    # Создание функции для обработки очереди файлов
    def process_file_queue():
        while not file_queue.empty():
            file_name = file_queue.get()
            result = calculate_avg_nsteps_by_tag(file_name)
            result_queue.put(result)
    
    # Создание процессов и запуск их на обработку
    processes = []
    for i in range(num_processes):
        p = mp.Process(target=process_file_queue)
        processes.append(p)
        p.start()
    
    # Ожидание завершения всех процессов
    for p in processes:
        p.join()

    
    result_queue.put(None)
    
    # Вывод результата
    print(merge_results(list(iter(result_queue.get, None))))
    print(f'Time taken: {time.time() - start_time} seconds')