Задача: сделайте mapper и reducer, чтобы посчитать среднее и дисперсию оценок за фильм.

In [1]:
!pip install opendatasets

You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [2]:
import opendatasets as od
import pandas as pd
import time
from pathlib import Path

In [3]:
import json
from functools import reduce

In [4]:
dataset_path = Path('imdb-user-reviews', 'song_lyrics.csv')
if not dataset_path.is_file():
    od.download('https://www.kaggle.com/datasets/sadmadlad/imdb-user-reviews')

Skipping, found downloaded files in "./imdb-user-reviews" (use force=True to force download)


In [5]:
!cat imdb-user-reviews/

cat: imdb-user-reviews/: Is a directory


In [6]:
def calc_mean_variance_movieScores():
  n, mean, M2 = 0, 0.0, 0
  for path in Path('imdb-user-reviews').glob('**/*'):
      if path.is_file() and path.suffix == '.json':
          with open(path, 'r') as f:
              info = json.load(f)
          score = float(info['movieIMDbRating'])
          n += 1
          delta = score - mean
          mean += delta / n
          M2 += delta * (score - mean)

  print(mean, (M2 / n) ** (1/2))

In [7]:
# Соберем mapper и reducer: 1 вариант    

In [8]:
def mapper(path):
    if path.is_file() and path.suffix == '.json':
      with open(path, 'r') as f:
        info = json.load(f)
      return (float(info['movieIMDbRating']), )


def reducer(score_data1, score_data2):
    if score_data1 is None and score_data2 is None:
      return None
    elif score_data1 is None:
      return score_data2
    elif score_data2 is None:
      return score_data1
    else:
      scores = []
      if len(score_data1) == 1:
        n, mean, M2 = 0, 0.0, 0
        scores.append(score_data1[0])
      else:
        n, mean, M2 = score_data1
      scores.append(score_data2[0])
      for score in scores:
        n += 1
        delta = score - mean
        mean += delta / n
        M2 += delta * (score - mean)
      return n, mean, M2

In [9]:
# @title Синхронный подход:

In [10]:
%%time
calc_mean_variance_movieScores()

8.03 1.0517128885774865
CPU times: user 6.14 ms, sys: 16.4 ms, total: 22.6 ms
Wall time: 163 ms


In [11]:
# @title Параллельная обработка с использованием map и reduce:

In [12]:
%%time
n, mean, M2 = reduce(reducer, map(mapper, Path('imdb-user-reviews').glob('**/*')))
print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865
CPU times: user 5.7 ms, sys: 16 ms, total: 21.7 ms
Wall time: 139 ms


In [23]:
# @title Параллельная обработка с помощью joblib:

In [13]:
from joblib import Parallel, delayed
import math

In [14]:
%%time
n, mean, M2 = reduce(reducer, Parallel(n_jobs=2)(delayed(mapper)(path) for path in Path ('imdb-user-reviews').glob('**/*')))
print(mean, (M2 / n) ** (1/2))

8.03 1.0517128885774865
CPU times: user 76.9 ms, sys: 45.7 ms, total: 123 ms
Wall time: 603 ms


In [15]:
# Соберем mapper и reducer: 2й вариант    

In [16]:
def mapper(path):
    if path.is_file() and path.suffix == '.json':
        with open(path, 'r') as f:
            info = json.load(f)
        score = float(info['movieIMDbRating'])
        return (score, 1)

def reducer(score_data1, score_data2):
    if score_data1 is None:
        return score_data2
    elif score_data2 is None:
        return score_data1

    sum1, count1 = score_data1
    sum2, count2 = score_data2

    total_sum = sum1 + sum2
    total_count = count1 + count2

    return (total_sum, total_count)

In [18]:
%%time
# map и reduce для расчета суммы и кол-ва оценок
scores_data = reduce(reducer, map(mapper, Path('imdb-user-reviews').glob('**/*.json')))
sum_scores, total_count = scores_data

# среднее значение набора значений, переданных в функцию:  
avg_ = sum_scores / total_count

# дисперсия совокупности значений в столбце таблицы.
sum_squares = reduce(lambda x, y: x + (y[0] - avg_) ** 2, map(mapper, Path('imdb-user-reviews').glob('**/*.json')), 0)

# cтандартное отклонение()показатель того, насколько разбросаны значения в наборе данных)
std_ = math.sqrt(sum_squares / total_count)

print(mean, std_)

8.030000000000001 1.0517128885774862
CPU times: user 28.7 ms, sys: 7.6 ms, total: 36.3 ms
Wall time: 248 ms
