# Параллельные вычисления

Материалы:
* Макрушин С.В. Лекция 10: Параллельные вычисления
* https://docs.python.org/3/library/multiprocessing.html

## Задачи для совместного разбора

1. Посчитайте, сколько раз встречается каждый из символов (заглавные и строчные символы не различаются) в файле `Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt` и в файле `Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt`.

In [4]:
import csv
%%file count_letters.py
from collections import Counter

def count_letters(file):
	with open(file) as fp:
		text = fp.read().lower()
	return Counter(text)

Writing count_letters.py


In [2]:
count_letters(r"F:\Programming\Python\BigDataPT-BDPT-\Lesson10/10_multiprocessing_data\10_multiprocessing_data\Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt")

Counter({'с': 50084,
         'п': 25652,
         'а': 73555,
         'и': 62030,
         'б': 16016,
         'о': 106740,
         ',': 26973,
         ' ': 182305,
         'ч': 16492,
         'т': 59813,
         'к': 30802,
         'л': 42328,
         'н': 60920,
         'г': 16174,
         'у': 27309,
         'в': 43700,
         'е': 80972,
         'й': 9747,
         'э': 3203,
         'р': 39784,
         'b': 25,
         'o': 104,
         'k': 16,
         's': 96,
         'c': 42,
         'a': 98,
         'f': 23,
         'e': 162,
         '.': 9864,
         'n': 114,
         't': 98,
         ':': 984,
         'h': 48,
         'p': 29,
         '/': 22,
         '\n': 8583,
         'u': 86,
         'r': 76,
         'd': 38,
         'v': 65,
         'i': 235,
         'y': 5,
         '_': 8,
         '-': 3558,
         '1': 384,
         '0': 110,
         '9': 100,
         '6': 271,
         'm': 54,
         'l': 46,
         'ж': 10552,
     

2. Решить задачу 1, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создать свой собственный процесс.

In [9]:
import multiprocessing as mp
from pprint import pprint as pp

In [None]:
import count_letters

In [13]:
files = [r"F:\Programming\Python\BigDataPT-BDPT-\Lesson10\10_multiprocessing_data\10_multiprocessing_data\Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt",
         r"F:\Programming\Python\BigDataPT-BDPT-\Lesson10/10_multiprocessing_data\10_multiprocessing_data\Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt"]

pool = mp.Pool(processes=4)
results = [pool.apply(count_letters.count_letters, args=[f]) for f in files]
pp(results)

[Counter({' ': 45076,
          'о': 23130,
          'е': 20054,
          'а': 18236,
          'т': 14245,
          'н': 14240,
          'и': 13587,
          'с': 11507,
          'л': 9961,
          'р': 9482,
          'в': 9398,
          'м': 7106,
          'к': 6744,
          'д': 6681,
          ',': 6372,
          'у': 6044,
          'п': 5489,
          'я': 5458,
          'ь': 4857,
          'ч': 4113,
          'б': 3980,
          'г': 3948,
          'ы': 3869,
          'з': 3355,
          '.': 2954,
          '\n': 2734,
          'ж': 2297,
          'й': 2028,
          'ш': 1943,
          '—': 1726,
          'х': 1535,
          '\xa0': 1472,
          'ю': 1323,
          'e': 1200,
          '-': 900,
          'э': 836,
          'ц': 817,
          '!': 718,
          'ф': 634,
          'a': 590,
          'щ': 587,
          'l': 571,
          '?': 571,
          'n': 459,
          's': 429,
          ';': 406,
          'm': 401,
          'o':

## Лабораторная работа 10

1. Разбейте файл `recipes_full.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `id_tag_nsteps_*.csv`. Каждый файл содержит 3 столбца: `id`, `tag` и `n_steps`, разделенных символом `;`. Для разбора строк используйте `csv.reader`.

__Важно__: вы не можете загружать в память весь файл сразу. Посмотреть на первые несколько строк файла вы можете, написав код, который считывает эти строки.

Подсказка: примерное кол-во строк в файле - 2.3 млн.

Фрагмент одного из файлов, которые должны получиться в результате:
```
id;tag;n_steps
137739;60-minutes-or-less;11
137739;time-to-make;11
137739;course;11
```


In [1]:
import multiprocessing as mp
import pandas as pd
import csv

from pprint import pprint as pp

In [2]:
file_names = [f"id_tag_nsteps_{i}.csv" for i in range(1, 9)]
file_names

['id_tag_nsteps_1.csv',
 'id_tag_nsteps_2.csv',
 'id_tag_nsteps_3.csv',
 'id_tag_nsteps_4.csv',
 'id_tag_nsteps_5.csv',
 'id_tag_nsteps_6.csv',
 'id_tag_nsteps_7.csv',
 'id_tag_nsteps_8.csv']

In [137]:
import ast

with open("10_multiprocessing_data/10_multiprocessing_data/recipes_full.csv", 'r', newline='', encoding='utf-8') as readfile:
	reader = csv.reader(readfile, delimiter=',')
	i = 0
	header = next(reader)

	for name_id in range(len(file_names)):
		with open(f"out/{file_names[name_id]}", "w", newline='') as writefile:
			writer = csv.writer(writefile, delimiter=';')
			writer.writerows([["id", "tag", "n_steps"]])
			for row in reader:
				if i == (name_id+1) * 300000:
					break
				i += 1
				for tag in ast.literal_eval(row[5]):
					writer.writerows([[row[1], tag, row[6]]])


2. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает среднее значение количества шагов для каждого тэга и возвращает результат в виде словаря.

In [11]:
%%file avgNumOfSteps.py
import pandas as pd

def avg_num_of_steps(file_name: str) -> dict:
	df = pd.read_csv(file_name, delimiter=";")
	grouped_df = df.groupby("tag")
	sum_of_steps = grouped_df.sum()["n_steps"]
	count_of_steps = grouped_df.count()["n_steps"]
	avg = sum_of_steps/count_of_steps
	return avg.to_dict()

Overwriting avgNumOfSteps.py


In [170]:
avg_num_of_steps("out/id_tag_nsteps_1.csv")

{'1-day-or-more': 4.585138162808065,
 '15-minutes-or-less': 4.978743845322445,
 '3-steps-or-less': 4.688845401174168,
 '30-minutes-or-less': 7.5650156360334915,
 '4-hours-or-less': 10.122780092085069,
 '5-ingredients-or-less': 5.365439868655082,
 '60-minutes-or-less': 9.31799336650083,
 'Throw the ultimate fiesta with this sopaipillas recipe from Food.com.': 3.4979591836734696,
 'a1-sauce': 3.493458708094849,
 'african': 4.281183932346723,
 'american': 7.555473591029802,
 'amish-mennonite': 3.5908726981585266,
 'angolan': 3.4979773462783172,
 'appetizers': 6.243116578793204,
 'apples': 4.766677306096393,
 'april-fools-day': 3.553635998392929,
 'argentine': 3.5873949579831934,
 'artichoke': 3.5382139983909897,
 'asian': 6.42365233911185,
 'asparagus': 4.019281663516068,
 'australian': 4.244301994301995,
 'austrian': 3.566666666666667,
 'avocado': 3.4665022605836415,
 'bacon': 4.074665676077266,
 'baja': 3.495065789473684,
 'baked-beans': 3.4942291838417145,
 'baking': 3.570494483040458,

3. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга по всем файлам, полученным в задаче 1, и возвращает результат в виде словаря. Не используйте параллельных вычислений. При реализации выделите функцию, которая объединяет результаты обработки отдельных файлов. Модифицируйте код из задачи 2 таким образом, чтобы получить результат, имея результаты обработки отдельных файлов. Определите, за какое время задача решается для всех файлов.

In [49]:
def avg_steps_in_all_files(file_names_list) -> dict:
	from functools import reduce

	sum_all = reduce(lambda x,y: x+y, [pd.DataFrame.from_dict(avg_num_of_steps("out/" + i), orient="index") for i in file_names_list])
	out = (sum_all / len(file_names))
	return out[0].to_dict()

In [50]:
avg_steps_in_all_files(file_names)

{'1-day-or-more': 4.459966880692864,
 '15-minutes-or-less': 4.9908400335992935,
 '3-steps-or-less': 4.731775636152013,
 '30-minutes-or-less': 7.608229951710424,
 '4-hours-or-less': 10.078432046410873,
 '5-ingredients-or-less': 5.3538637919625325,
 '60-minutes-or-less': 9.416607077638588,
 'Throw the ultimate fiesta with this sopaipillas recipe from Food.com.': 3.5167459786849373,
 'a1-sauce': 3.5306930533445002,
 'african': 4.369745244254025,
 'american': 7.586744931907737,
 'amish-mennonite': 3.5674631998442448,
 'angolan': 3.4937118163962064,
 'appetizers': 6.226964849131035,
 'apples': 4.863346112891727,
 'april-fools-day': 3.5132917106592103,
 'argentine': 3.5672403423588306,
 'artichoke': 3.4949008832841857,
 'asian': 6.453290714944966,
 'asparagus': 4.049990165155667,
 'australian': 4.222064798196951,
 'austrian': 3.5811518342049142,
 'avocado': 3.5251769300832243,
 'bacon': 4.1087102876039125,
 'baja': 3.5394751522539223,
 'baked-beans': 3.4906156754334616,
 'baking': 3.63817278

In [102]:
%timeit avg_steps_in_all_files(file_names)

3.29 s ± 50.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [53]:
%%time
avg_steps_in_all_files(file_names)

Wall time: 3.42 s


{'1-day-or-more': 4.459966880692864,
 '15-minutes-or-less': 4.9908400335992935,
 '3-steps-or-less': 4.731775636152013,
 '30-minutes-or-less': 7.608229951710424,
 '4-hours-or-less': 10.078432046410873,
 '5-ingredients-or-less': 5.3538637919625325,
 '60-minutes-or-less': 9.416607077638588,
 'Throw the ultimate fiesta with this sopaipillas recipe from Food.com.': 3.5167459786849373,
 'a1-sauce': 3.5306930533445002,
 'african': 4.369745244254025,
 'american': 7.586744931907737,
 'amish-mennonite': 3.5674631998442448,
 'angolan': 3.4937118163962064,
 'appetizers': 6.226964849131035,
 'apples': 4.863346112891727,
 'april-fools-day': 3.5132917106592103,
 'argentine': 3.5672403423588306,
 'artichoke': 3.4949008832841857,
 'asian': 6.453290714944966,
 'asparagus': 4.049990165155667,
 'australian': 4.222064798196951,
 'austrian': 3.5811518342049142,
 'avocado': 3.5251769300832243,
 'bacon': 4.1087102876039125,
 'baja': 3.5394751522539223,
 'baked-beans': 3.4906156754334616,
 'baking': 3.63817278

4. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создайте свой собственный процесс. Определите, за какое время задача решается для всех файлов.

In [15]:
%%file fdtd.py
import pandas as pd

from avgNumOfSteps import avg_num_of_steps

def from_dict_to_dfs(file_name):
	return pd.DataFrame.from_dict(avg_num_of_steps("out/" + file_name), orient='index')

Overwriting fdtd.py


In [101]:
from functools import reduce
# from fdtd import from_dict_to_dfs

def from_dict_to_dfs123(file_name):
	print(111)
	return pd.DataFrame.from_dict(avg_num_of_steps("out/" + file_name), orient='index')

# if __name__ == '__main__':
process = mp.Process(target=from_dict_to_dfs123, args=(file_names[0], ))
process.start()

In [None]:
from functools import reduce
from fdtd import from_dict_to_dfs

# processes = [mp.Process(target=from_dict_to_dfs, args=(x, )) for x in file_names]
# for p in processes:
# 	p.start()
#
# for p in processes:
# 	p.join()



results = [pool.apply(from_dict_to_dfs, args=(x, )) for x in file_names]

sum_all = reduce(lambda x,y: x+y, results)
out = (sum_all / len(file_names))
print(out[0].to_dict())

In [98]:
def parallel(file_names):
	from functools import reduce
	from fdtd import from_dict_to_dfs

	pool = mp.Pool()
	results = [pool.apply_async(from_dict_to_dfs, args=(x, )) for x in file_names]

	sum_all = reduce(lambda x,y: x+y, [i.get() for i in results])
	out = (sum_all / len(file_names))
	return out[0].to_dict()

In [31]:
%timeit parallel(file_names)

988 ms ± 24.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [83]:
%%time
print(parallel(file_names))

{'1-day-or-more': 4.459966880692864, '15-minutes-or-less': 4.9908400335992935, '3-steps-or-less': 4.731775636152013, '30-minutes-or-less': 7.608229951710424, '4-hours-or-less': 10.078432046410873, '5-ingredients-or-less': 5.3538637919625325, '60-minutes-or-less': 9.416607077638588, 'Throw the ultimate fiesta with this sopaipillas recipe from Food.com.': 3.5167459786849373, 'a1-sauce': 3.5306930533445002, 'african': 4.369745244254025, 'american': 7.586744931907737, 'amish-mennonite': 3.5674631998442448, 'angolan': 3.4937118163962064, 'appetizers': 6.226964849131035, 'apples': 4.863346112891727, 'april-fools-day': 3.5132917106592103, 'argentine': 3.5672403423588306, 'artichoke': 3.4949008832841857, 'asian': 6.453290714944966, 'asparagus': 4.049990165155667, 'australian': 4.222064798196951, 'austrian': 3.5811518342049142, 'avocado': 3.5251769300832243, 'bacon': 4.1087102876039125, 'baja': 3.5394751522539223, 'baked-beans': 3.4906156754334616, 'baking': 3.6381727872519374, 'bananas': 4.009

5. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Создайте фиксированное количество процессов (равное половине количества ядер на компьютере). При помощи очереди `multiprocessing.queue` передайте названия файлов для обработки процессам и при помощи другой очереди заберите от них ответы.

In [1]:
%%file fdtd2.py
from avgNumOfSteps import avg_num_of_steps
import pandas as pd


def from_dict_to_dfs2(file_name, output):
	return output.put(pd.DataFrame.from_dict(avg_num_of_steps("out/" + file_name), orient='index'))

Overwriting fdtd2.py


In [1]:
import multiprocessing as mp
import pandas as pd
import csv

from pprint import pprint as pp

In [14]:
from fdtd2 import from_dict_to_dfs2

# def test():
NUMBER_OF_PROCESSES = 8
task_queue = mp.Queue()
done_queue = mp.Queue()

for file in file_names:
    task_queue.put(file)

pool = mp.Pool(processes=NUMBER_OF_PROCESSES)
# pool.apply_async(from_dict_to_dfs2, args=(task_queue.get(), done_queue))
# mp.Process(target=from_dict_to_dfs2, args=(task_queue, done_queue)).start()

# print(task_queue.get())
# print(task_queue.get())
print(done_queue.qsize())

id_tag_nsteps_1.csv
id_tag_nsteps_2.csv
0


In [2]:
file_names = [f"id_tag_nsteps_{i}.csv" for i in range(1, 9)]
file_names

['id_tag_nsteps_1.csv',
 'id_tag_nsteps_2.csv',
 'id_tag_nsteps_3.csv',
 'id_tag_nsteps_4.csv',
 'id_tag_nsteps_5.csv',
 'id_tag_nsteps_6.csv',
 'id_tag_nsteps_7.csv',
 'id_tag_nsteps_8.csv']

In [1]:
import multiprocessing as mp
import pandas as pd
import csv

from pprint import pprint as pp

file_names = [f"id_tag_nsteps_{i}.csv" for i in range(1, 9)]
# file_names

def test():
	from fdtd2 import from_dict_to_dfs2
	from functools import reduce

	NUMBER_OF_PROCESSES = 8
	task_queue = mp.Queue()
	done_queue = mp.Queue()

	for file in file_names:
	    task_queue.put(file)

	processes = [mp.Process(target=from_dict_to_dfs2, args=(task_queue.get(), done_queue)) for _ in range(task_queue.qsize())]

	for p in processes:
		p.start()
		p.join()

	# results = [done_queue.get() for _ in processes]
	# for i in range(NUMBER_OF_PROCESSES):
	#     mp.Process(target=from_dict_to_dfs2, args=(task_queue.get(), done_queue)).start()

	# print(task_queue.qsize())

	# sum_all = reduce(lambda x,y: x+y, [done_queue.get() for _ in range(NUMBER_OF_PROCESSES)])
	# out = (sum_all / len(file_names))
	# return out[0].to_dict()

In [None]:
test()

In [None]:
from fdtd2 import from_dict_to_dfs2

output = mp.Queue()
processes = [mp.Process(target=from_dict_to_dfs2, args=(x, output)) for x in file_names]

for p in processes:
	p.start()

for p in processes:
	p.join()

results = [output.get() for p in processes]

In [4]:
import multiprocessing

def worker():
    """worker function"""
    print('Worker')
    return

if __name__ == '__main__':
    # multiprocessing.set_start_method('spawn')
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

In [1]:
import multiprocessing

def worker(read, write):
    while not read.empty():
        name_proc = multiprocessing.current_process().name
        x = read.get()
        res = x*x
        print(name_proc, res)
        write.put(res)
    else:
        read.close()
        write.close()

write = multiprocessing.Queue()
read = multiprocessing.Queue()
[read.put(x) for x in range(3, 7)]

NUM_CORE = 2
procs = []
for i in range(NUM_CORE):
    p = multiprocessing.Process(target=worker, args=(read, write,))
    procs.append(p)
    p.start()

[proc.join() for proc in procs]
print([write.get() for _ in range(write.qsize())])

[]


In [17]:
import multiprocessing
from multiprocessing import Process
import ololo

# if __name__ == '__main__':
p1 = Process(target=ololo.foozz)
p2 = Process(target=ololo.foozz2)

p1.start()
p2.start()

p1.join()
p2.join()

In [8]:
import multiprocessing
def cube(num):
	print("Cube: {}".format(num * num * num))
# if __name__ == "__main__":
p1 = multiprocessing.Process(target=cube, args=(5,))
p1.start()
p1.join()