<a href="https://colab.research.google.com/github/TarasMuzychuk/Practise_Task-3/blob/main/Task_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5bc62682fc34f11b743a807e0e097b7bf92809421b799ef4b0fa8c1d035d6e2d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# Робота з Spark RDD

## Найпопулярніші пари продуктів

В якості датасету для завдання необхідно використати [Amazon Reviews](https://www.kaggle.com/snap/amazon-fine-food-reviews).

> Для більш зручної розробки логіки в додатково в Класрумі є скорочений файл схожої структури `sample.csv`.

Датасет має наступну структуру. Друга колонка - ідентифікатор продукту, третя - ідентифікатор юзера:

```
Id,ProductId,UserId,ProfileName,HelpfulnessNumerator,HelpfulnessDenominator,Score,Time,Summary,Text
```

Наприклад:

| Id | ProductId | UserId | ProfileName | ...інші колонки |
|----|-----------|--------|-------------|-----------------|
| 1  | B1        | A2     | Patron      | ...             |

### Опис завдання

1. Зчитати скорочений або повний датасет як RDD. Після читання вирізати лише потрібні колонки: `UserId` та `ProductId`.
2. Створити `RDD`, що містить пари (`tuple`) `UserId` та список всіх `ProductId` для всіх продуктів, які придбав цей юзер. В списку повинні бути лише **унікальні** продукти (`ProductId` для одного юзера не повинні повторюватись). Наприклад:

```python
("A1", ["B1", "B2", "B5"])
("A2", ["B1", "B3", "B5"])
...
```

3. Взявши списки продуктів для кожного юзера, отримати всі пари продуктів які він міг купувати разом. Для кожної такої пари створити `tuple` де першим елементом є пара, другим число `1`. Наприклад для попереднього списку:
```python
("B1,B2", 1)
("B1,B5", 1)
("B2,B5", 1)
("B1,B3", 1)
("B1,B5", 1)
("B3,B5", 1)
...
```

> Два продукти вважаються придбаними разом, якщо вони обидва з’являються у списку, який ви отримали на попередньому кроці.

4. Підрахувати кількість всіх пар продуктів, відсортувати їх за кількістю від найбільшої до найменшої.
5. Взяти лише перші `10` пар продуктів та їх кількість. Наприклад:
```python
("B1,B5", 23495)
("B2,B5", 3340)
("B3,B5", 217)
...
```
6. Зберегти результат в текстовий файл.

## Конфігурація

- `number_cores`: Кількість ядер, виділених під Spark
- `memory_gb`: Обʼєм оперативної памʼяті, виділеної під Spark (в Гб)
- `is_full_dataset`: Читати повний чи скорочений датасет.

In [28]:
number_cores = 2
memory_gb = 4
is_full_dataset =False

In [29]:
from pyspark import SparkConf, SparkContext
import os

number_cores = 2
memory_gb = 4
conf = (
    SparkConf()
        .setAppName("Spark Rdd Task")
        .setMaster(f'local[{number_cores}]')
        .set('spark.driver.memory', f'{memory_gb}g')
)

sc = SparkContext(conf=conf)

## Рішення


In [30]:
if is_full_dataset:
    if not os.path.exists('Reviews.csv'):
        sc.stop()
        raise Exception("""
            Download the 'Reviews.csv' file from https://www.kaggle.com/datasets/snap/amazon-fine-food-reviews
            and put it in 'input' folder
        """)
    else:
        inputRdd = sc.textFile("Reviews.csv")
else:
    inputRdd = sc.textFile("sample.csv")

In [31]:
# Видалення рядку заголовку
filteredInput = inputRdd.filter(lambda line: line.startswith("Id,") == False)

In [33]:
# Ваш код починається тут
# працювати треба з RDD `filteredInput`
 # Розділяємо вхідний потік на окремі частини
separate_parts = filteredInput.map(lambda row: row.split(','))

# Створюємо пари (`Клієнт` та `Товар`)
client_product_pairs = separate_parts.map(lambda item: (item[2], item[1]))

# Групуємо за клієнтом та об'єднуємо всі товари у список унікальних продуктів
unique_products_per_client = client_product_pairs.groupByKey().mapValues(lambda product: list(set(product)))
unique_products_per_client.take(10)

# Завантажуємо необхідні модулі
from itertools import permutations
from collections import defaultdict

# Функція для створення пар товарів та підрахунку їх кількості
def get_product_pairs_count(products):
    product_pairs_count = defaultdict(int)
    for pair in permutations(products, 2):
        product_pairs_count[pair] += 1
    return product_pairs_count.items()

# Створюємо всі можливі пари товарів для кожного клієнта
product_pairs_count_per_client = unique_products_per_client.flatMapValues(get_product_pairs_count)

# Підсумовуємо кількість кожної пари товарів
total_product_pairs_count = product_pairs_count_per_client.map(lambda x: (x[1], 1)).reduceByKey(lambda x, y: x + y)

# Сортуємо пари товарів за кількістю від найбільшої до найменшої
sorted_product_pairs_count = total_product_pairs_count.sortBy(lambda x: x[1], ascending=False)

# Беремо лише перші 10 пар товарів та їх кількість
product_pairs_count = sorted_product_pairs_count.take(10)

# Виводимо результат
product_pairs_count

# Відкриваємо файл для запису
with open('roduct_pairs_count_v2.txt', 'w') as file:
    # Записуємо заголовок
    file.write(" 10 пар продуктів та їх кількість:\n")
    # Записуємо кожну пару та її кількість
    for pair, count in product_pairs_count:
        file.write(f"{pair}: {count}\n")


## Зупинка Spark

In [34]:
sc.stop()