 # Подготовка окружения

In [1]:
import opendatasets as od
import pandas as pd
import time
from pathlib import Path

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


## Скачиваем датасет

In [2]:
dataset_path = Path('homework/hw1/imdb-user-reviews', 'imdb-user-reviews.csv')
if not dataset_path.is_file():
    od.download('https://www.kaggle.com/datasets/sadmadlad/imdb-user-reviews')

Skipping, found downloaded files in ".\imdb-user-reviews" (use force=True to force download)


Сделайте mapper и reducer, чтобы посчитать среднее и дисперсию оценок за фильм.

Реализация через цикл (предпогаем, что мы не знаем сколько у нас фильмов):

In [3]:
import json

n, mean, M2 = 0, 0.0, 0
# проходим циклом по содержимому загруженной папки
for path in Path('imdb-user-reviews').glob('**/*'):
    # если находим файл и этот файл .json:
    if path.is_file() and path.suffix == '.json':
        # открываем файл для чтения с кодировкой utf8, дав ему алиас f:
        with open(path, 'r', encoding='utf8') as f:
            # сохраняем содержимое из json в виде словаря в info
            info = json.load(f)
        # сохраняем оценку, обращаясь к ней по соответствующему ключу
        score = float(info['movieIMDbRating'])
        # увеличиваем количество фильмов на 1
        n += 1
        # разница между оценкой и средним
        delta = score - mean
        # корректируем среднее, учитывая delta и количество фильмов
        mean += delta / n
        # корректируем среднее квадратичное
        M2 += delta * (score - mean)

print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865


На основе этого кода соберите mapper и reducer:

In [4]:
# функция, которая будет применяться в map
def mapper(path):
    # если находим файл и этот файл .json:
    if path.is_file() and path.suffix == '.json':
        # открываем файл для чтения с кодировкой utf8, дав ему алиас f:
        with open(path, 'r', encoding='utf8') as f:
            # сохраняем содержимое из json в виде словаря в info
            info = json.load(f)
        # возвращаем кортеж с оценкой, обращаясь к ней по соответствующему ключу
        return (float(info['movieIMDbRating']), )


# функция, которая будет применяться в reduce к элементам последовательности
# на вход принимает 2 обязательных параметра
# (первый - аккумулированное ранее значение, второй - следующий элемент последовательности)
def reducer(score_data1, score_data2):
    # если на вход пришли пустые значения обоих параметров:
    if score_data1 is None and score_data2 is None:
        # верннем None
        return None
    # если пустым на входе является только значение первого параметра
    elif score_data1 is None:
        # вернем значение второго параметра
        return score_data2
    # если пустым на входе является только значение второго параметра
    elif score_data2 is None:
        # вернем значение второго параметра
        return score_data1
    # если на входе оба значения не пустые:
    else:
        # инициализируем список оценок, пока он пуст
        scores = []
        # если размерность первого параметра reducer равна 1, мы на первой итерации reduce:
        if len(score_data1) == 1:
            # инициализируем искомые переменные, их значения пока не знаем, поэтому 0
            n, mean, M2 = 0, 0.0, 0
            # добавляем в список оценок первую оценку (нулевой элемент кортежа из mapper)
            scores.append(score_data1[0])
        # если размерность первого параметра reducer отлична от 1:
        else:
            # в искомые переменные записываем значения из предыдущей итерации reduce
            n, mean, M2 = score_data1
        # в список оценок добавляем очередное значения второго параметра reducer
        scores.append(score_data2[0])
        # проходим циклом по списку оценок:
        for score in scores:
            # корректируем количество
            n += 1
            # считаем разницу между оценкой и средним
            delta = score - mean
            # корректируем среднее, учитывая delta и количество фильмов
            mean += delta / n
            # корректируем среднее квадратичное
            M2 += delta * (score - mean)
        # возвращаем искомые значения n, mean, M2
        return n, mean, M2

In [5]:
from functools import reduce

In [6]:
%%time
n, mean, M2 = reduce( # кумулятивно применяет функцию reducer к элементам итерируемой последовательности map, сводя её к единственному значению
                reducer, # вернет искомые значения n, mean, M2
                map( # вернет итерируемую последовательность кортежей с оценками фильмов для reduce
                    mapper, # вернет кортеж с оценкой фильма из .json
                    Path('imdb-user-reviews').glob('**/*') # итерируемое содержимое анализируемой папки (вложенные папки и файлы)
                )
            )
print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865
CPU times: total: 15.6 ms
Wall time: 8 ms


In [7]:
from joblib import Parallel, delayed

In [8]:
%%time
n, mean, M2 = reduce(
                reducer,
                Parallel(n_jobs=2)( # параллельный запуск функции mapper с входным параметром path. Количество параллельных потоков = 2
                    # количество итераций равно числу вложенных в целевую директорию папок (пропускаем) и файлов (обрабатываем .json)
                    delayed(mapper)(path) for path in Path('imdb-user-reviews').glob('**/*')
                )
            )
print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865
CPU times: total: 93.8 ms
Wall time: 696 ms


Видим, что в данном случае распараллеливание задач не дало нам положительного эффекта.

Времени на обработку было затрачено заметно больше.

Мы имеем дело с небольшим набором данных, для обработки которого распараллеливание будет лишним

и использование дополнительных функций приводит к увеличению времени обработки задач.

Однако, попробуем заметно увеличить объем данных для обработки - в 100_000 раз:

In [9]:
%%time
n, mean, M2 = reduce(
                reducer,
                map(
                    mapper,
                    # увеличим наш список в 100_000 раз
                    list(Path('imdb-user-reviews').glob('**/*')) * 100_000
                )
            )
print(mean, (M2 / n) ** (1/2))

8.03000000000032 1.0517128885774973
CPU times: total: 4min 32s
Wall time: 5min 25s


In [10]:
%%time
n, mean, M2 = reduce(
                reducer,
                Parallel(n_jobs=2)(
                    delayed(mapper)(path) for path in list(Path('imdb-user-reviews').glob('**/*')) * 100_000
                )
            )
print(mean, (M2 / n) ** (1/2))

8.03000000000032 1.0517128885774973
CPU times: total: 24.2 s
Wall time: 3min 9s


Теперь мы видим, что в результате распараллеливания задач мы заметно сократили время их обработки.

Таким образом, при работе с большим набором данных будет оптимальным использование параллелизации.

Попробуем увеличить количество параллельных потоков с 2 до 6:

In [11]:
%%time
n, mean, M2 = reduce(
                reducer,
                # Увеличим количество параллельных заданий до 6
                Parallel(n_jobs=6)(
                    delayed(mapper)(path) for path in list(Path('imdb-user-reviews').glob('**/*')) * 100_000
                )
            )
print(mean, (M2 / n) ** (1/2))

8.03000000000032 1.0517128885774973
CPU times: total: 37.2 s
Wall time: 1min 36s


В результате увеличения количества параллельных потоков обработки данных время работы еще более существенно сократилось.

Что также подтверждает оптимальность использования параллелизации выполнения задач при работе с большими наборами данных.