# Горбунов Иван

## HW1 Вариант 2:

Рассчитайте средний рейтинг товаров из набора данных.
Сопоставьте полученные данные из предыдущего пункта с наименованием товаров.
Сформируйте RDD товаров с рейтингом меньшим 3. Выведите топ-10 товаров с наименьшим рейтингом.
Сохраните результат в постоянное хранилище.

## Инициализация и Скачивание данных

In [2]:
import os

java17_home = "/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home"

if not os.path.exists(java17_home):
    java17_home = "/Library/Java/JavaVirtualMachines/openjdk-17.jdk/Contents/Home"

os.environ["JAVA_HOME"] = java17_home

print(f"JAVA_HOME установлен: {os.environ['JAVA_HOME']}")

import shutil
import json
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("HW1_Task2_Amazon").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)

print("Spark Context запущен!")

# cкачивание данных
files = {
    "reviews_Electronics_5.json.gz": "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Electronics_5.json.gz",
    "meta_Electronics.json.gz": "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Electronics.json.gz"
}

for filename, url in files.items():
    if not os.path.exists(filename):
        print(f"Скачиваю {filename}...")
        result = os.system(f"curl -O -L {url}")
        if result == 0:
            print(f"-> {filename} успешно скачан.")
        else:
            print(f"-> Ошибка при скачивании {filename}.")
    else:
        print(f"Файл {filename} уже существует.")

JAVA_HOME установлен: /Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home


26/01/11 14:49:27 WARN Utils: Your hostname, MacBook.local resolves to a loopback address: 127.0.0.1; using 192.168.31.202 instead (on interface en0)
26/01/11 14:49:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/11 14:49:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Context запущен!
Файл reviews_Electronics_5.json.gz уже существует.
Файл meta_Electronics.json.gz уже существует.


26/01/11 14:49:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Загрузка и Парсинг

In [3]:
import os
import json

# Это разрешает Spark создавать процессы на маке без ошибки безопасности
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"

from pyspark import SparkContext, SparkConf

try:
    sc.stop()
except:
    pass

conf = SparkConf().setAppName("HW1_Task2_Amazon").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR") # Чтобы убрать лишний красный шум в логах

print("Spark Context перезапущен с фиксом для macOS!")

# загрузка отзывов 
reviews_raw = sc.textFile("reviews_Electronics_5.json.gz")
reviews_rdd = reviews_raw.map(json.loads) \
                         .map(lambda x: (x['asin'], float(x['overall'])))

# загрузка товаров 
meta_raw = sc.textFile("meta_Electronics.json.gz")

def safe_parse_meta(line):
    try:
        # eval выполняет строку как код питона
        data = eval(line)
        return (data['asin'], data.get('title', 'Unknown Title'))
    except Exception:
        # Если строка битая, возвращаем None (пропускаем её)
        return None

products_rdd = meta_raw.map(safe_parse_meta) \
                       .filter(lambda x: x is not None)

print("Данные загружены.")
print("Пример отзыва:", reviews_rdd.first())
print("Пример товара:", products_rdd.first())

26/01/11 14:49:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark Context перезапущен с фиксом для macOS!
Данные загружены.


                                                                                

Пример отзыва: ('0528881469', 5.0)


[Stage 1:>                                                          (0 + 1) / 1]

Пример товара: ('0132793040', 'Kelby Training DVD: Mastering Blend Modes in Adobe Photoshop CS5 By Corey Barker')


                                                                                

## Расчет среднего рейтинга

In [4]:
# начальное значение аккумулятора: (0.0 сумма, 0 количество)
start_acc = (0.0, 0)

# Функция внутри одной партиции: прибавляем рейтинг к сумме, +1 к счетчику
seq_op = lambda acc, rating: (acc[0] + rating, acc[1] + 1)

# Функция объединения партиций: складываем суммы и счетчики
comb_op = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])

# Считаем
agg_rdd = reviews_rdd.aggregateByKey(start_acc, seq_op, comb_op)

# Вычисляем среднее: сумма / количество
# Результат
avg_ratings = agg_rdd.mapValues(lambda x: x[0] / x[1])

print("Средние рейтинги рассчитаны. Пример:", avg_ratings.take(5))

[Stage 2:>                                                          (0 + 1) / 1]

Средние рейтинги рассчитаны. Пример: [('0528881469', 2.4), ('0594451647', 4.2), ('0594481813', 4.0), ('0972683275', 4.461187214611872), ('1400501466', 3.953488372093023)]


                                                                                

## Объединение (Join) и Фильтрация

In [None]:
# соединяем рейтинги с названиями товаров
joined_rdd = avg_ratings.join(products_rdd)

# фильтруем: оставляем только те, где рейтинг < 3
low_rated_rdd = joined_rdd.filter(lambda x: x[1][0] < 3.0)

# упрощаем структуру для удобства: (Рейтинг, Название, ASIN)
result_rdd = low_rated_rdd.map(lambda x: (x[1][0], x[1][1], x[0]))

print(f"Найдено товаров с рейтингом < 3: {result_rdd.count()}")



Найдено товаров с рейтингом < 3: 2625


                                                                                

Вывод Топ-10 и Сохранение

In [6]:
# вывод Топ-10
top_10_worst = result_rdd.takeOrdered(10, key=lambda x: x[0])

print("\n" + "="*50)
print("ТОП-10 ТОВАРОВ С НАИМЕНЬШИМ РЕЙТИНГОМ")
print("="*50)
for i, (rating, title, asin) in enumerate(top_10_worst, 1):
    print(f"{i}. [{rating:.2f}] {title} (ID: {asin})")

output_dir = "hw1_task2_output"

if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

# Сохраняем как текстовый файл
result_rdd.sortBy(lambda x: x[0]).saveAsTextFile(output_dir)

print(f"\nРезультат успешно сохранен в папку: {output_dir}")


ТОП-10 ТОВАРОВ С НАИМЕНЬШИМ РЕЙТИНГОМ
1. [1.00] Agfa ePhoto SMILE 0.2MP Digital Camera (ID: B00000JBIA)
2. [1.00] GE 24746 Futura HDTV Ready Antenna (ID: B000F1ORW6)
3. [1.00] ATI TV Wonder 200 PCI Video Card w/PVR Capabilities (ID: B000H13L4Y)
4. [1.00] Dynex-DX-AP100 Adapter Mini DVI to Mini-DIN (ID: B0013WI5SS)
5. [1.00] Maxtor OneTouch 4 - 1 TB USB 2.0 Desktop External Hard Drive STM310005OTA3E1-RK (ID: B001BLN91Y)
6. [1.00] Rapid USB Charger Adapters include: Wall and Car + USB Cable for Samsung Galaxy TAB (P1000) (ID: B004GGRPGG)
7. [1.00] RCA DRC8335 DVD Recorder &amp; VCR Combo With Built-In Tuner (ID: B000NNFS4C)
8. [1.00] StarTech HDMISPL1HH 1 feet Standard HDMI Cable - 1x HDMI (M) to 2x HDMI (F) (Discontinued by Manufacturer) (ID: B00111JODG)
9. [1.00] Zeikos 57-in-1 USB 2.0 Flash Memory Card Reader ZE-CR201 (ID: B001T9N0R6)
10. [1.00] NEEWER&reg; Photographic Barn Door &amp; Honeycomb Grid &amp; Gel Set for Alienbees Alienbee Flash (ID: B003KIQTXG)

Результат успешно сохра