<a href="https://colab.research.google.com/github/cuzmyk/data_analysis/blob/main/DA_1pr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![Spark Image](https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/1200px-Apache_Spark_logo.svg.png)

# Обработка данных с использованием RDD

##### Установка зависимостей

In [1]:
!git clone --recursive https://github.com/tester170/Other.git

Cloning into 'Other'...
remote: Enumerating objects: 56, done.[K
remote: Counting objects: 100% (56/56), done.[K
remote: Compressing objects: 100% (54/54), done.[K
remote: Total 56 (delta 13), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (56/56), 32.77 MiB | 10.31 MiB/s, done.
Resolving deltas: 100% (13/13), done.
Updating files: 100% (26/26), done.


In [2]:
!ls Other/

archive.zip  beauty.csv  columns.csv  email.csv  images		 README.md	xl
bank.csv     births.csv  data.zip     excel.py	 passengers.csv  responses.csv	сollege_data.csv


In [3]:
!unzip "/content/Other/data.zip" -d "/content/"

Archive:  /content/Other/data.zip
   creating: /content/data/
  inflating: /content/__MACOSX/._data  
  inflating: /content/data/new_customers.csv  
  inflating: /content/__MACOSX/data/._new_customers.csv  
   creating: /content/data/ml-1m/
  inflating: /content/__MACOSX/data/._ml-1m  
  inflating: /content/data/Meal_Info.csv  
  inflating: /content/__MACOSX/data/._Meal_Info.csv  
  inflating: /content/data/fakefriends.csv  
  inflating: /content/__MACOSX/data/._fakefriends.csv  
  inflating: /content/data/sample_linear_regression_data.txt  
  inflating: /content/__MACOSX/data/._sample_linear_regression_data.txt  
  inflating: /content/data/College.csv  
  inflating: /content/__MACOSX/data/._College.csv  
  inflating: /content/data/fake_customers.csv  
  inflating: /content/__MACOSX/data/._fake_customers.csv  
  inflating: /content/data/movielens_ratings.csv  
  inflating: /content/__MACOSX/data/._movielens_ratings.csv  
  inflating: /content/data/Book.txt  
  inflating: /content/__MAC

## Запуск сеанса Spark

### Используем PySpark в среде Jupyter Notebook, обеспечивая правильную настройку путей и переменных среды

1. **`import findspark`**: Импортируем модуль `findspark`. Модуль `findspark` предназначен для облегчения настройки и использования Apache Spark в среде Python. Он помогает найти путь к установленной версии Spark и добавить его в переменную среды `PYTHONPATH`.

2. **`findspark.init()`**: Этот вызов инициализирует модуль `findspark`. Он выполняет следующие действия:
    - Ищет установленный Spark.
    - Добавляет путь к Spark в переменную среды `PYTHONPATH`.
    - Позволяет использовать PySpark без необходимости явно указывать путь к Spark.

In [4]:
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
import findspark
findspark.init()

### Инициализируем Spark-контекст, для начала работы с данными в Spark

Импортируем два класса из библиотеки PySpark:
    - `SparkConf`: Этот класс предоставляет возможность настройки параметров конфигурации для вашего Spark-приложения. Вы можете установить различные параметры, такие как название вашего приложения (`setAppName`) и режим запуска (`setMaster`).
    - `SparkContext`: Этот класс является основной точкой входа для взаимодействия с кластером Spark. Он устанавливает связь между вашим Spark-приложением и ресурсным менеджером (например, YARN или локальным режимом):

In [6]:
from pyspark import SparkConf, SparkContext


Создаем объект `SparkConf` и устанавливаем два параметра:
    - `setMaster("local")`: Этот параметр указывает, что мы запускаем Spark в локальном режиме (на одной машине). В реальном кластере вы бы указали адрес ресурсного менеджера.
    - `setAppName("Data Analysis")`: Этот параметр задает имя вашего Spark-приложения.

После чего, мы создаем объект `SparkContext`, передавая ему нашу конфигурацию (`conf`). Это позволяет приложению взаимодействовать с кластером Spark.

In [7]:
conf = SparkConf().setMaster("local").setAppName("Data Analysis")
sc = SparkContext(conf = conf)

Объект `sc` (или **SparkContext**) представляет собой основной интерфейс для взаимодействия с кластером Apache Spark.

Давайте рассмотрим, что означает каждая из строк в выводе:

1. **Version (Версия)**:  Установленная версия Spark. В данном случае это **v3.0.0**.

2. **Master (Мастер)**: Указывает, какой ресурсный менеджер используется для запуска Spark. В нашем случае это **local**, что означает, что Spark работает в локальном режиме. В реальных кластерах здесь может быть адрес ресурсного менеджера, такого как YARN или Mesos.

3. **AppName (Имя приложения)**: Это имя, созданного Spark-приложения. В данном случае оно называется **Data Analysis**.

**Spark UI** предоставляет информацию о состоянии Spark-приложения, выполненных задачах, использовании ресурсов и других аспектах. Вы можете использовать его для мониторинга и отладки вашего Spark-кода.

In [8]:
sc

> *Используем набор данных MovieLens 1M, который можно получить на веб-сайте [Grouplens](https://grouplens.org/datasets/movielens/).*

In [9]:
ls data/ml-1m

movies.dat  ratings.dat  README  users.dat


*Давайте прочитаем файл Ratings.dat и создадим RDD рейтингов*

In [10]:
ratingsRDD = sc.textFile("data/ml-1m/ratings.dat")

In [11]:
ratingsRDD.take(5)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

* Мы прочитали текстовый файл и вывели первые 5 строк, используя действие 'take'.

*Теперь, если мы проверим файл readme, указанный в наборе данных, то увидим там следующие подписи столбцов данных:*

>*UserID::MovieID::Rating::Timestamp*

*Давайте проверим подсчет каждого заданного рейтинга. Но сначала нам нужно разделить наши данные, и для этого нам нужно использовать трансформирование (преобразование).*

1. **`ratingsRDD`**: RDD (Resilient Distributed Dataset), содержащий информацию о рейтингах.

2. **`map(lambda x: x.split('::')[2])`**:
    - `map`: Это операция трансформирования, которая применяет заданную функцию ко всем элементам RDD.
    - `lambda x: x.split('::')[2]`: Это анонимная функция (лямбда-функция), которая принимает один аргумент `x`. В данном случае, `x` представляет собой строку (с рейтингами фильмов).
    - `x.split('::')`: Это разделение строки `x` по разделителю `'::'`. Например, если `x` была строкой `"123::4::5"`, то `x.split('::')` вернет список `["123", "4", "5"]`.
    - `[2]`: Это индекс, который выбирает третий элемент из списка (индексация начинается с 0). В данном случае, это будет третий элемент после разделения строки, то есть рейтинг фильма.

In [12]:
ratings = ratingsRDD.map(lambda x: x.split('::')[2])

In [13]:
ratings.take(5)

['5', '3', '3', '4', '5']

- **`countByValue()`**: Этот метод (действие) возвращает словарь, где ключами являются уникальные значения из RDD, а значениями - количество раз, которое каждое уникальное значение встречается в RDD.

In [14]:
result = ratings.countByValue()

In [15]:
type(result)

collections.defaultdict

In [16]:
result

defaultdict(int,
            {'5': 226310, '3': 261197, '4': 348971, '2': 107557, '1': 56174})

*Итак, вы можете видеть, насколько легко было сформировать счетчик рейтингов. Поскольку он вернул словарь, можно отсортировать и распечатать результаты.*

In [17]:
import collections
sortedResults = collections.OrderedDict(sorted(result.items()))
print(f"{'Ratings':10}{'Count'}\n")
for key, value in sortedResults.items():
    print(f"{'★'* int(key):{10}}{value}")

Ratings   Count

★         56174
★★        107557
★★★       261197
★★★★      348971
★★★★★     226310


*Давайте рассмотрим другой пример и проверим, какие фильмы имеют самый высокий рейтинг.*

In [18]:
def loadMovieNames():
    movieNames = {}
    with open("data/ml-1m/movies.dat", encoding= 'ISO-8859-1') as f:
        for line in f:
            fields = line.split('::')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

### Распределим словарь с названиями фильмов на все узлы кластера, чтобы использовать его в наших Spark-задачах

Далее мы используем метод `broadcast` объекта `sc` (SparkContext), чтобы создать **распределенную переменную** (broadcast variable) на всех узлах кластера. Давайте разберемся, что это означает:

- **Broadcast Variables**: Это **только для чтения** общие переменные, которые кэшируются и доступны на всех узлах в кластере. Они используются для распределения данных, которые не меняются, между задачами (tasks) на разных узлах. Вместо отправки этих данных с каждой задачей, Spark распределяет broadcast переменные на рабочие узлы с помощью эффективных алгоритмов передачи данных, чтобы уменьшить затраты на коммуникацию.

- **Зачем использовать broadcast переменные?**: Представьте, что у вас есть большой словарь с названиями фильмов, и вы хотите использовать его во всех задачах на разных узлах. Вместо того чтобы отправлять этот словарь с каждой задачей, вы можете создать broadcast переменную, которая будет кэширована на каждом узле. Задачи будут использовать этот кэшированный словарь при выполнении преобразований.

- **Создание Broadcast переменной**: В данном случае мы создаем broadcast переменную, используя метод `broadcast` объекта `sc`. Мы передаем в него результат вызова функции `loadMovieNames()`, которая возвращает словарь с названиями фильмов.

- **Использование Broadcast переменной**: После создания broadcast переменной, вы можете получить доступ к ее значению с помощью `.value`. Например, `nameDict.value` даст вам доступ к загруженному словарю с названиями фильмов.

In [19]:
nameDict = sc.broadcast(loadMovieNames())

In [20]:
movies = ratingsRDD.map(lambda x: (int(x.split("::")[1]), 1))

In [21]:
movies.take(5)

[(1193, 1), (661, 1), (914, 1), (3408, 1), (2355, 1)]

### Подсчитаем общее количества просмотров для каждого фильма

1. **`movieCounts = movies.reduceByKey(lambda x, y: x + y)`**:
    - В данной строке мы используем метод `reduceByKey` на RDD.

2. **`reduceByKey`**:
    - Это **трансформация** (transformation) в **Apache Spark**, применяемая к **key-value RDD**.
    - Она группирует значения, соответствующие каждому ключу, а затем применяет функцию сокращения (reduction function) к значениям каждой группы.
    - В данном случае, функция сокращения - это **лямбда-функция** `lambda x, y: x + y`, которая складывает значения `x` и `y`.

3. **Что происходит дальше?**:
    - После выполнения этой строки кода, у нас будет новый RDD `movieCounts`, где каждый ключ ассоциирован с одним сокращенным значением (в данном случае, суммой всех значений для каждого ключа).

4. **Пример**:
    - Предположим, у нас есть RDD `movies`, где ключами являются идентификаторы фильмов, а значениями - количество просмотров каждого фильма.
    - `reduceByKey` объединяет значения для каждого ключа и выполняет суммирование.
    - Например, если у нас есть `(movie_id, views)` пары:
        ```
        (1, 10)
        (2, 15)
        (1, 5)
        (3, 8)
        ```
    - После применения `reduceByKey`, мы получим:
        ```
        (1, 15)  # 10 + 5
        (2, 15)
        (3, 8)
        ```

In [22]:
movieCounts = movies.reduceByKey(lambda x, y: x + y)

In [23]:
movieCounts.take(5)

[(1193, 1725), (661, 525), (914, 636), (3408, 1315), (2355, 1703)]

### Поменяем местами ключи и значения

1. **`flipped = movieCounts.map(lambda x: (x[1], x[0]))`**:
    - В этой строке мы используем метод `map` на RDD (Resilient Distributed Dataset) с названием `movieCounts`.
    - Мы применяем анонимную функцию (лямбда-функцию) к каждому элементу RDD.
    - Цель - поменять местами ключи и значения в каждой паре (K, V) в RDD.
    - В данном случае, мы меняем местами количество просмотров фильма (значение) и идентификатор фильма (ключ).

2. **`sortedMovies = flipped.sortByKey(ascending=False)`**:
    - Затем, мы используем метод `sortByKey` на RDD `flipped`.
    - Мы сортируем элементы RDD по ключу (количество просмотров фильма) в убывающем порядке (по умолчанию).
    - В результате получается новый RDD `sortedMovies`, где фильмы отсортированы по количеству просмотров.

В итоге, `sortedMovies` будет содержать фильмы, отсортированные по убыванию количества просмотров. Например, фильм с наибольшим количеством просмотров будет первым в списке.

In [24]:
flipped = movieCounts.map( lambda x : (x[1], x[0]))
sortedMovies = flipped.sortByKey(ascending=False)

In [25]:
sortedMovies.take(5)

[(3428, 2858), (2991, 260), (2990, 1196), (2883, 1210), (2672, 480)]

### Сформируем новый RDD, с ключами, соответствующими названиям фильмов

1. **`sortedMoviesWithNames = sortedMovies.map(lambda countMovie: (nameDict.value[countMovie[1]], countMovie[0]))`**:
    - В данной строке мы используем метод `map` на RDD (Resilient Distributed Dataset) с названием `sortedMovies`.
    - Мы применяем анонимную функцию (лямбда-функцию) к каждому элементу RDD.
    - Цель - создать новый RDD, где ключами будут названия фильмов (полученные из `nameDict`) и значениями - количество просмотров фильма.
    - В данном случае, мы опять меняем местами ключи и значения.

2. **Пример**:
    - Предположим, у нас есть RDD `sortedMovies`, где ключами являются идентификаторы фильмов, а значениями - количество просмотров каждого фильма.
    - `nameDict` - это broadcast переменная, содержащая соответствие между идентификаторами фильмов и их названиями.
    - `sortedMoviesWithNames` будет содержать новый RDD, где ключами будут названия фильмов, а значениями - количество просмотров.

3. **Пример вывода**:
    ```
    [('Shawshank Redemption, The (1994)', 12345),
     ('Pulp Fiction (1994)', 9876),
     ('Forrest Gump (1994)', 5678),
     ...]
    ```

    - В данном примере, фильм "Shawshank Redemption, The (1994)" имеет 12345 просмотров, "Pulp Fiction (1994)" - 9876 просмотров, и так далее.

In [26]:
sortedMoviesWithNames = sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))

*Топ-10 самых рейтинговых фильмов.*

In [27]:
sortedMoviesWithNames.take(10)

[('American Beauty (1999)', 3428),
 ('Star Wars: Episode IV - A New Hope (1977)', 2991),
 ('Star Wars: Episode V - The Empire Strikes Back (1980)', 2990),
 ('Star Wars: Episode VI - Return of the Jedi (1983)', 2883),
 ('Jurassic Park (1993)', 2672),
 ('Saving Private Ryan (1998)', 2653),
 ('Terminator 2: Judgment Day (1991)', 2649),
 ('Matrix, The (1999)', 2590),
 ('Back to the Future (1985)', 2583),
 ('Silence of the Lambs, The (1991)', 2578)]

*Посмотрим на фильмы с наибольшим рейтингом 5 звезд*

Давайте разберем этот код по частям:

1. **`def filter_five_star(line):`**:Определим функцию с именем `filter_five_star`.

    - `line`: Это аргумент функции - строка, которую мы хотим проверить.

2. **`splited_line = line.split("::")`**: Далее, мы разделяем строку `line` на части, используя разделитель `"::"`. Результатом будет список, содержащий разделенные части.

3. **`if splited_line[2] == '5':`**: Здесь мы проверяем, является ли третий элемент списка равным строке `'5'`.

4. **`return line`**: Если условие выполняется (третий элемент равен `'5'`), функция возвращает исходную строку `line`.

In [28]:
def filter_five_star(line):
    splited_line= line.split("::")
    if splited_line[2] == '5':
        return line

- **`five_start_ratingsRDD = ratingsRDD.filter(lambda x: filter_five_star(x))`**: В этой строке мы используем метод `filter` на RDD `ratingsRDD`. Давайте разберемся, что это означает:

    - `filter`: Это трансформация, которая применяет заданную функцию к каждому элементу RDD и возвращает новый RDD, содержащий только те элементы, для которых функция возвращает `True`.
    - `lambda x: filter_five_star(x)`: Это анонимная функция (лямбда-функция), которая принимает один аргумент `x` (элемент RDD) и вызывает функцию `filter_five_star(x)`.

- **`five_start_ratingsRDD.take(5)`**: Здесь мы используем метод `take(5)` на новом RDD `five_start_ratingsRDD`, чтобы получить первые пять элементов.

*В результате выполнения этого кода, у нас будет новый RDD `five_start_ratingsRDD`, содержащий только строки с рейтингами, равными `'5'`:*

In [29]:
five_start_rattingsRDD= ratingsRDD.filter(lambda x: filter_five_star(x))
five_start_rattingsRDD.take(5)

['1::1193::5::978300760',
 '1::2355::5::978824291',
 '1::1287::5::978302039',
 '1::2804::5::978300719',
 '1::595::5::978824268']

In [30]:
five_start_movies = five_start_rattingsRDD.map(lambda x: (int(x.split("::")[1]), 1))

five_start_movieCounts = five_start_movies.reduceByKey(lambda x, y: x + y)

flipped = five_start_movieCounts.map( lambda x : (x[1], x[0]))

five_start_sortedMovies = flipped.sortByKey(ascending=False)

five_start_sortedMoviesWithNames = five_start_sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))

In [31]:
five_start_sortedMoviesWithNames.take(10)

[('American Beauty (1999)', 1963),
 ('Star Wars: Episode IV - A New Hope (1977)', 1826),
 ('Raiders of the Lost Ark (1981)', 1500),
 ('Star Wars: Episode V - The Empire Strikes Back (1980)', 1483),
 ("Schindler's List (1993)", 1475),
 ('Godfather, The (1972)', 1475),
 ('Shawshank Redemption, The (1994)', 1457),
 ('Matrix, The (1999)', 1430),
 ('Saving Private Ryan (1998)', 1405),
 ('Sixth Sense, The (1999)', 1385)]

### Задание №1. Выведите общее количество фильмов по годам (применяя алгоритм действий по аналогии с примерами выше)

1. Загрузите данные о фильмах с помощью метода `sc.textFile("data/ml-1m/movies.dat")` и сохраните результат в переменную `moviesRDD`:


In [32]:
moviesRDD = sc.textFile("data/ml-1m/movies.dat")

2. Выведите первые 5 записей из `moviesRDD`:


In [33]:
moviesRDD.take(5)

["1::Toy Story (1995)::Animation|Children's|Comedy",
 "2::Jumanji (1995)::Adventure|Children's|Fantasy",
 '3::Grumpier Old Men (1995)::Comedy|Romance',
 '4::Waiting to Exhale (1995)::Comedy|Drama',
 '5::Father of the Bride Part II (1995)::Comedy']

3. Извлеките год из строки 'Toy Story (1995)' с помощью среза:

take(кол-во строк) - вывести строку (кол-во)

[0] - индекс (строки)

.split('::'): Это разделение строки x по разделителю '::'.

[№ элемента по счету]: Это индекс, который выбирает № элемента из списка

.split('(') - разбиение строки

[:-1] удаляет последний символ

In [34]:
year = moviesRDD.take(1)[0].split('::')[1].split('(')[1][:-1]
print(year)

1995


4. Импортируйте модуль `re` с помощью команды `import re`. Найдите год выпуска фильма 'Grumpier Old Men (1995)' с помощью регулярного выражения `r'\([0-9]{4}\)$' и метода `re.search()`:


In [35]:
import re

In [36]:
year_gom = re.search(r'\([0-9]{4}\)$', moviesRDD.take(3)[2].split('::')[1]).group()
print(year_gom)

(1995)


6. Определите функцию `get_year(line)`, которая принимает строку, разделяет её по символу '::', извлекает год выпуска фильма и возвращает кортеж `(year, 1)`:


In [37]:
def get_year(line):
  line = line.split('::')
  year = re.search(r'\([0-9]{4}\)$', line[1]).group()
  return (year, 1)

7. Примените функцию `get_year(x)` к каждому элементу `moviesRDD` с помощью метода `map()`, результат сохраните в переменную `year_RDD`:

In [38]:
year_RDD = moviesRDD.map(lambda x: get_year(x))

8. Выведите первые 5 записей из `year_RDD`:

In [39]:
year_RDD.take(5)

[('(1995)', 1), ('(1995)', 1), ('(1995)', 1), ('(1995)', 1), ('(1995)', 1)]

9. Сгруппируйте данные в `year_RDD` по годам и подсчитайте количество фильмов для каждого года с помощью метода `reduceByKey()`, результат сохраните в переменную `yearCounts`:

In [40]:
yearCounts = year_RDD.reduceByKey(lambda x, y: x + y)

10. Выведите первые 5 записей из `yearCounts`:

In [41]:
yearCounts.take(5)

[('(1995)', 342),
 ('(1994)', 257),
 ('(1996)', 345),
 ('(1976)', 21),
 ('(1993)', 165)]

11. Отсортируйте `yearCounts` по годам в возрастающем порядке с помощью метода `sortByKey()`, результат сохраните в переменную `ascending_sorted_yearCounts`:

In [42]:
ascending_sorted_yearCounts = yearCounts.sortByKey()

12. Выведите первые 5 записей из `ascending_sorted_yearCounts`:


In [43]:
ascending_sorted_yearCounts.take(5)

[('(1919)', 3), ('(1920)', 2), ('(1921)', 1), ('(1922)', 2), ('(1923)', 3)]

13. Отсортируйте `yearCounts` по годам в убывающем порядке с помощью метода `sortByKey`, результат сохраните в переменную `descending_sorted_yearCounts`:

In [44]:
descending_sorted_yearCounts = yearCounts.sortByKey(ascending=False)

14. Выведите первые 5 записей из `descending_sorted_yearCounts`:

In [45]:
descending_sorted_yearCounts.take(5)

[('(2000)', 156),
 ('(1999)', 283),
 ('(1998)', 337),
 ('(1997)', 315),
 ('(1996)', 345)]

15. Переставьте местами ключи и значения в `yearCounts` с помощью метода `map()`, где каждый элемент преобразуется в кортеж `(x[1], x[0])`. Результат сохраните в переменную `flipped`.

In [46]:
flipped = yearCounts.map( lambda x : (x[1], x[0]))

16. Отсортируйте `flipped` по количеству фильмов в убывающем порядке с помощью метода `sortByKey(ascending= False)`. Результат сохраните в переменную `descending_sorted_yearCounts`.

In [47]:
descending_sorted_yearCounts = flipped.sortByKey(ascending=False)

17. Выведите первые 10 записей из `descending_sorted_yearCounts`:

In [48]:
descending_sorted_yearCounts.take(10)

[(345, '(1996)'),
 (342, '(1995)'),
 (337, '(1998)'),
 (315, '(1997)'),
 (283, '(1999)'),
 (257, '(1994)'),
 (165, '(1993)'),
 (156, '(2000)'),
 (104, '(1986)'),
 (102, '(1992)')]

### Задание №2. Определите наиболее активную возрастную группу пользователей

1. Выведите первые 5 записей из `ratingsRDD`:


In [49]:
ratingsRDD.take(5)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

2. Определите функцию `load_age_group()`, которая создает словарь `age_group` с возрастными группами и словарь `user_ageGroup`, где каждому пользователю сопоставляется его возрастная группа. Данные для `user_ageGroup` загружаются из файла "data/ml-1m/users.dat".


- Пример возрастных групп в словаре:


```python
age_group = {
        '1': 'Under 18',
        '18': '18-24',
        '25': '25-34',
        '35': '35-44',
        '45': '45-49',
        '50': '50-55',
        '56': '56+'
    }
```



In [50]:
def load_age_group():
    age_group = {
        '1': 'Under 18',
        '18': '18-24',
        '25': '25-34',
        '35': '35-44',
        '45': '45-49',
        '50': '50-55',
        '56': '56+'
    }

    user_ageGroup = {}
    with open('data/ml-1m/users.dat', 'r') as f:
        for line in f:
            user_id, gender, age, occupation, zip_code = line.strip().split('::')
            user_ageGroup[user_id] = age_group.get(age, 'Unknown')

    return user_ageGroup

3. Вызовите функцию `load_age_group()` и результат распространите по всем узлам с помощью метода `sc.broadcast()`. Результат сохраните в переменную `ageGroupDict`:

In [51]:
ageGroupDict = sc.broadcast(load_age_group())

4. Преобразуйте `ratingsRDD` в новый RDD `users_ratings`, где каждый элемент - это кортеж `(userID, 1)`.


In [52]:
users_ratings = ratingsRDD.map(lambda x: (x.split('::')[0], 1))

5. Сгруппируйте `users_ratings` по пользователям и подсчитайте количество оценок для каждого пользователя с помощью метода `reduceByKey()`. Результат сохраните в переменную `count_user_ratings`.

In [53]:
count_user_ratings = users_ratings.reduceByKey(lambda x, y: x + y)

6. Выведите первые 5 записей из `count_user_ratings` с помощью метода `take(5)`:

In [54]:
count_user_ratings.take(5)

[('1', 53), ('2', 129), ('3', 51), ('4', 21), ('5', 198)]

7. Переставьте местами ключи и значения в `count_user_ratings` с помощью метода `map()`, где каждый элемент преобразуется в кортеж `(x[1], x[0])`. Результат сохраните в переменную `flipped`:

In [55]:
flipped = count_user_ratings.map(lambda x : (x[1], x[0]))

8. Преобразуйте RDD `flipped`, в котором каждый элемент имеет вид `(count, userID)`, в новый RDD `age_group_count`. Для этого для каждого элемента:
  - Извлеките идентификатор пользователя `userID` (вторая часть кортежа);
  - С помощью broadcast-словаря (`ageGroupDict`) найдите соответствующую возрастную группу;
  - Сформируйте новый кортеж вида `(ageGroup, count)`, где `count` — это сохранённое ранее количество оценок.

In [56]:
age_group_count = flipped.map(lambda x : (ageGroupDict.value.get(x[1]), x[0]))

9. Сгруппируйте `age_group_count` по возрастным группам и подсчитайте общее количество оценок для каждой группы с помощью метода `reduceByKey()`. Результат сохраните в переменную `age_group_counts`:

In [57]:
age_group_counts = age_group_count.reduceByKey(lambda x, y: x+y)

10. Выведите все записи из `age_group_counts` с помощью метода `collect()`:

In [58]:
age_group_counts.collect()

[('Under 18', 27211),
 ('56+', 38780),
 ('25-34', 395556),
 ('45-49', 83633),
 ('50-55', 72490),
 ('35-44', 199003),
 ('18-24', 183536)]

11. Отсортируйте `age_group_counts` по количеству оценок в убывающем порядке и переставьте местами ключи и значения. Результат выведите с помощью метода `collect()`:

In [59]:
age_group_counts_f =  age_group_counts.map(lambda x : (x[1], x[0]))
age_group_counts_f.collect()

[(27211, 'Under 18'),
 (38780, '56+'),
 (395556, '25-34'),
 (83633, '45-49'),
 (72490, '50-55'),
 (199003, '35-44'),
 (183536, '18-24')]

### Задание №3. Определите среднее количество друзей по возрастам



1. Загрузите набор данных фейковых пользователей социальной сети:

In [60]:
fakefriends_RDD = sc.textFile('/content/data/fakefriends.csv')

2. Выведите первые 5 строк из набора данных

In [61]:
fakefriends_RDD.take(5)

['0,Will,33,385',
 '1,Jean-Luc,26,2',
 '2,Hugh,55,221',
 '3,Deanna,40,465',
 '4,Quark,68,21']

3. Посчитайте общее количество строк

In [62]:
fakefriends_RDD.count()

500

4. Определите функцию для извлечения возраста и количества друзей

In [63]:
def age_friends (line):
    line_split = line.split(',') #разделитель
    age = int(line_split[2])
    friends = int(line_split[3])
    return (age, friends)

5. Примените функцию к данным

In [64]:
age_friends_RDD = fakefriends_RDD.map(age_friends)
age_friends_RDD.take(5)

[(33, 385), (26, 2), (55, 221), (40, 465), (68, 21)]

6. Сгруппируйте данные по возрасту: суммарное количество друзей и число пользователей для данного возраста

In [65]:
age_friends_count = age_friends_RDD.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
age_friends_count.take(5)

[(33, (3904, 12)),
 (26, (4115, 17)),
 (55, (3842, 13)),
 (40, (4264, 17)),
 (68, (2696, 10))]

7. Вычислите среднее количество друзей для каждого возраста:

In [66]:
age_friends_count_mean = age_friends_count.mapValues(lambda x: x[0] / x[1])
age_friends_count_mean.take(5)

[(33, 325.3333333333333),
 (26, 242.05882352941177),
 (55, 295.53846153846155),
 (40, 250.8235294117647),
 (68, 269.6)]

8. Выведите отсортированный по возрасту результат

In [67]:
age_friends_count_mean_sort = age_friends_count_mean.sortByKey()
age_friends_count_mean_sort.take(5)

[(18, 343.375),
 (19, 213.27272727272728),
 (20, 165.0),
 (21, 350.875),
 (22, 206.42857142857142)]

### **Дополнительные примеры:**

#### **1. Поиск метеостанции с минимальной температурой**

*Загрузим другой набор данных*

In [68]:
temp = sc.textFile("data/1800.csv")

In [69]:
temp.take(5)

['ITE00100554,18000101,TMAX,-75,,,E,',
 'ITE00100554,18000101,TMIN,-148,,,E,',
 'GM000010962,18000101,PRCP,0,,,E,',
 'EZE00100082,18000101,TMAX,-86,,,E,',
 'EZE00100082,18000101,TMIN,-135,,,E,']

*Найдем метеостанции с минимальной температурой:*

In [70]:
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    # Преобразуем градусы Цельсия в градусы Фарингейты
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

In [71]:
tempRDD = temp.map(parseLine)

In [72]:
tempRDD.take(5)

[('ITE00100554', 'TMAX', 18.5),
 ('ITE00100554', 'TMIN', 5.359999999999999),
 ('GM000010962', 'PRCP', 32.0),
 ('EZE00100082', 'TMAX', 16.52),
 ('EZE00100082', 'TMIN', 7.699999999999999)]

In [73]:
# Отбираем записи с типом 'TMIN'
minTemps = tempRDD.filter(lambda x: "TMIN" in x[1])

In [74]:
minTemps.take(5)

[('ITE00100554', 'TMIN', 5.359999999999999),
 ('EZE00100082', 'TMIN', 7.699999999999999),
 ('ITE00100554', 'TMIN', 9.5),
 ('EZE00100082', 'TMIN', 8.599999999999998),
 ('ITE00100554', 'TMIN', 23.72)]

In [75]:
# Формируем пары (stationID, temperature)
stationTemps = minTemps.map(lambda x: (x[0], x[2]))

In [76]:
# Сводим по ключу – по каждой станции выбираем минимальную температуру
minTemps = stationTemps.reduceByKey(lambda x, y: round(min(x,y), 2))

In [77]:
print("Минимальные температуры по станциям:")
minTemps.collect()

Минимальные температуры по станциям:


[('ITE00100554', 5.36), ('EZE00100082', 7.7)]

#### **2. Подсчет частоты упоминания слов в тексте**

*Загрузим новый датасет и посчитаем количество упоминаний каждого слова в файле:*

In [78]:
book = sc.textFile("data/Book.txt")

In [79]:
book.take(2)

['Self-Employment: Building an Internet Business of One',
 'Achieving Financial and Personal Freedom through a Lifestyle Technology Business']

Определим функцию normalizeWords, которая принимает текст, преобразует его в нижний регистр и разделяет на слова. Разделение происходит по любому символу, который не является буквой или цифрой (\W+):

In [80]:
import re

# Функция для нормализации слов: приведение к нижнему регистру и разбиение строки на слова
def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

Применяем функцию normalizeWords к каждой строке в RDD book и объединим результаты в один RDD - words:

In [81]:
# Разбиваем строки на слова
words = book.flatMap(normalizeWords)

Преобразуем каждое слово в пару (word, 1), а затем подсчитаем количество каждого слова, используя reduceByKey():

In [82]:
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

In [83]:
print("Первые 5 пар (слово, количество):")
print(wordCounts.take(5))

Первые 5 пар (слово, количество):
[('self', 111), ('employment', 75), ('building', 33), ('an', 178), ('internet', 26)]


Меняем местами каждую пару (слово, количество) в wordCounts, делая количество ключом, и затем сортируем результаты по ключу (количеству) в порядке убывания:

In [84]:
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

Отобразим десять самых часто встречающихся слов:

In [85]:
print("Топ-10 наиболее часто встречающихся слов:")
print(wordCountsSorted.take(10))

Топ-10 наиболее часто встречающихся слов:
[(1878, 'you'), (1828, 'to'), (1420, 'your'), (1292, 'the'), (1191, 'a'), (970, 'of'), (934, 'and'), (772, ''), (747, 'that'), (649, 'it')]


----