# Робота з Spark RDD

## Найпопулярніші пари продуктів

В якості датасету для завдання необхідно використати [Amazon Reviews](https://www.kaggle.com/snap/amazon-fine-food-reviews).

> Для більш зручної розробки логіки в додатково в Класрумі є скорочений файл схожої структури `sample.csv`.

Датасет має наступну структуру. Друга колонка - ідентифікатор продукту, третя - ідентифікатор юзера:

```
Id,ProductId,UserId,ProfileName,HelpfulnessNumerator,HelpfulnessDenominator,Score,Time,Summary,Text
```

Наприклад:

| Id | ProductId | UserId | ProfileName | ...інші колонки |
|----|-----------|--------|-------------|-----------------|
| 1  | B1        | A2     | Patron      | ...             |

### Опис завдання

1. Зчитати скорочений або повний датасет як RDD. Після читання вирізати лише потрібні колонки: `UserId` та `ProductId`.
2. Створити `RDD`, що містить пари (`tuple`) `UserId` та список всіх `ProductId` для всіх продуктів, які придбав цей юзер. В списку повинні бути лише **унікальні** продукти (`ProductId` для одного юзера не повинні повторюватись). Наприклад:

```python
("A1", ["B1", "B2", "B5"])
("A2", ["B1", "B3", "B5"])
...
```

3. Взявши списки продуктів для кожного юзера, отримати всі пари продуктів які він міг купувати разом. Для кожної такої пари створити `tuple` де першим елементом є пара, другим число `1`. Наприклад для попереднього списку:
```python
("B1,B2", 1)
("B1,B5", 1)
("B2,B5", 1)
("B1,B3", 1)
("B1,B5", 1)
("B3,B5", 1)
...
```

> Два продукти вважаються придбаними разом, якщо вони обидва з’являються у списку, який ви отримали на попередньому кроці.

4. Підрахувати кількість всіх пар продуктів, відсортувати їх за кількістю від найбільшої до найменшої.
5. Взяти лише перші `10` пар продуктів та їх кількість. Наприклад:
```python
("B1,B5", 23495)
("B2,B5", 3340)
("B3,B5", 217)
...
```
6. Зберегти результат в текстовий файл.

## Конфігурація

- `number_cores`: Кількість ядер, виділених під Spark
- `memory_gb`: Обʼєм оперативної памʼяті, виділеної під Spark (в Гб)
- `is_full_dataset`: Читати повний чи скорочений датасет.

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=e633b0a22868a60ab4f39d4fe32052db59b70189faa6096cc12c3f4f2faab3ff
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
number_cores = 2
memory_gb = 4
is_full_dataset = False

In [None]:
from google.colab import files
uploaded = files.upload()

Saving sample.csv to sample.csv


In [None]:
from pyspark import SparkConf, SparkContext
import os

number_cores = 2
memory_gb = 4
conf = (
    SparkConf()
        .setAppName("Spark Rdd Task")
        .setMaster(f'local[{number_cores}]')
        .set('spark.driver.memory', f'{memory_gb}g')
)

sc = SparkContext(conf=conf)

## Рішення

In [None]:
if is_full_dataset:
    if not os.path.exists('Reviews.csv'):
        sc.stop()
        raise Exception("""
            Download the 'Reviews.csv' file from https://www.kaggle.com/datasets/snap/amazon-fine-food-reviews
            and put it in 'input' folder
        """)
    else:
        inputRdd = sc.textFile("Reviews.csv")
else:
    inputRdd = sc.textFile("sample.csv")

In [None]:
# Видалення рядку заголовку
filteredInput = inputRdd.filter(lambda line: line.startswith("Id,") == False)

In [None]:
# Ваш код починається тут
# працювати треба з RDD `filteredInput`
# Розділення коженого рядка по комі і вибирання лише UserId та ProductId
paired_rdd = filteredInput.map(lambda line: line.split(",")).map(lambda x: (x[2], x[1]))

# Групування ProductId за UserId та вибір унікальних значення ProductId для кожного UserId
grouped_rdd = paired_rdd.groupByKey().mapValues(lambda x: list(set(x)))

# Створення пар продуктів, які користувач купував разом
def generate_pairs(product_list):
    pairs = []
    for i in range(len(product_list)):
        for j in range(i + 1, len(product_list)):
            pairs.append((product_list[i], product_list[j]))
    return pairs

# Створення пари продуктів для кожного користувача
product_pairs_rdd = grouped_rdd.flatMapValues(generate_pairs)

# Присвоєння кожній парі значення 1 та підрахунок їх кількість
counted_pairs_rdd = product_pairs_rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

# Відсортування пари за кількістю від найбільшої до найменшої
sorted_pairs_rdd = counted_pairs_rdd.sortBy(lambda x: x[1], ascending=False)

# Взяти перші 10 пар
top_10_pairs = sorted_pairs_rdd.take(10)

# відображення результату перед збереженням
for pair, count in top_10_pairs:
    print(f"{pair}: {count}")

# Збереження результату в текстовий файл
sc.parallelize(top_10_pairs).map(lambda x: ','.join([str(i) for i in x])).coalesce(1).saveAsTextFile("top_product_pairs.txt")


## Зупинка Spark

In [None]:
sc.stop()

NameError: name 'sc' is not defined