In [29]:
%pip install -q mrjob

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:

import os


if files is not None:
    # Шаг 1: загрузка kaggle.json
    if not os.path.exists("kaggle.json") and not os.path.exists("/root/.kaggle/kaggle.json"):
        print("Выберите файл kaggle.json, скачанный из аккаунта Kaggle (Account → Create New API Token)")
        uploaded = files.upload()  # откроет диалог выбора файла

    # Шаг 2: настройка каталога Kaggle
    os.makedirs("/root/.kaggle", exist_ok=True)
    if os.path.exists("kaggle.json"):
        !mv kaggle.json /root/.kaggle/
    !chmod 600 /root/.kaggle/kaggle.json

    # Шаг 3: установка и использование клиента Kaggle
    !pip install -q kaggle

    dataset_id = "cynthiarempel/amazon-us-customer-reviews-dataset"
    target_tsv = "amazon_reviews_us_Electronics_v1_00.tsv"

    print(f"Скачиваем {target_tsv} из датасета {dataset_id}...")
    !kaggle datasets download -d $dataset_id -f $target_tsv -p .

    # Kaggle упаковывает файл в zip, распакуем
    import zipfile

    zip_path = target_tsv + ".zip"
    if os.path.exists(zip_path):
        print("Распаковка zip-архива...")
        with zipfile.ZipFile(zip_path, "r") as zf:
            zf.extractall(".")
        os.remove(zip_path)

    if os.path.exists(target_tsv):
        dataset_path = target_tsv
        size_gb = os.path.getsize(dataset_path) / (1024**3)
        print(f"\n✓ Файл успешно скачан из Kaggle: {dataset_path}")
        print(f"Размер файла: {size_gb:.2f} ГБ")
        print("Переменная dataset_path обновлена и указывает на скачанный файл.")

In [31]:
%%writefile amazon_mr_analysis.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class AmazonReviewsMR(MRJob):

    def mapper(self, _, line):
        """Map: извлекает product_id и rating из каждой строки"""
        if line.startswith("marketplace"):
            return  # Пропускаем заголовок

        try:
            row = next(csv.reader([line], delimiter='\t'))
            if len(row) >= 15:
                product_id = row[3]
                product_title = row[5]
                star_rating = float(row[7])
                yield product_id, (star_rating, product_title, 1)
        except:
            pass

    def reducer_collect(self, product_id, values):
        """Reducer: агрегирует рейтинги по продуктам"""
        total_rating = 0.0
        count = 0
        title = ""

        for rating, product_title, cnt in values:
            total_rating += rating
            count += cnt
            if not title:
                title = product_title

        if count >= 50:  # Фильтр: минимум 50 отзывов
            avg_rating = total_rating / count
            yield None, (avg_rating, count, product_id, title)

    def reducer_sort(self, _, values):
        """Сортировка по рейтингу"""
        sorted_values = sorted(values, key=lambda x: x[0], reverse=True)
        for avg_rating, count, product_id, title in sorted_values[:20]:
            yield product_id, {
                "avg_rating": round(avg_rating, 2),
                "review_count": count,
                "title": title
            }

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer_collect),
            MRStep(reducer=self.reducer_sort)
        ]

if __name__ == "__main__":
    AmazonReviewsMR.run()


Writing amazon_mr_analysis.py


In [33]:
# Запуск MapReduce анализа
print("Запуск MapReduce анализа...")
!python amazon_mr_analysis.py {dataset_path} > mapreduce_results.txt
print("MapReduce анализ завершен")

Запуск MapReduce анализа...
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/amazon_mr_analysis.root.20251218.142628.345984
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/amazon_mr_analysis.root.20251218.142628.345984/output
Streaming final output from /tmp/amazon_mr_analysis.root.20251218.142628.345984/output...
Removing temp directory /tmp/amazon_mr_analysis.root.20251218.142628.345984...
MapReduce анализ завершен


In [35]:
# Визуализация результатов MapReduce
import ast

if os.path.exists("mapreduce_results.txt"):
    print("Топ-20 товаров по среднему рейтингу (MapReduce):")
    print("=" * 100)

    results = []
    with open("mapreduce_results.txt", "r") as f:
        for line in f:
            if not line.strip():
                continue
            try:
                product_id, stats_str = line.strip().split("\t", 1)
                stats = ast.literal_eval(stats_str)
                results.append((product_id, stats))
            except:
                pass

    for i, (pid, stats) in enumerate(results[:20], 1):
        print(f"{i:2d}. {pid[:20]:20s} | Рейтинг: {stats['avg_rating']:.2f} | "
              f"Отзывов: {stats['review_count']:4d} | {stats['title'][:50]}")


Топ-20 товаров по среднему рейтингу (MapReduce):
 1. "B00ZZ0VPBS"         | Рейтинг: 5.00 | Отзывов:   54 | CITI ULTRA Series - PREMIUM HDMI CABLE HIGH SPEED 
 2. "B00ZZ0VSJC"         | Рейтинг: 5.00 | Отзывов:   52 | CITI - PREMIUM 25FT 3.5mm Stereo Male to 2RCA Male
 3. "B003L150R8"         | Рейтинг: 4.96 | Отзывов:   52 | TETC Cable (3 Feet) - Optical Digital Audio Cable
 4. "B00EQG0CI4"         | Рейтинг: 4.94 | Отзывов:   53 | Logitech L-LU18 Battery Replacement (1250mAh, 3.7V
 5. "B005LR04WQ"         | Рейтинг: 4.93 | Отзывов:   92 | Panasonic N2QAYB000485 Remote Control Compatible w
 6. "B005T3LKKM"         | Рейтинг: 4.93 | Отзывов:  137 | Mediabridge FLEX Series HDMI Cable - High-Speed Su
 7. "B00CELMZG0"         | Рейтинг: 4.93 | Отзывов:  195 | Underwater Audio Waterproof iPod Mega Bundle
 8. "B00SAM93YO"         | Рейтинг: 4.93 | Отзывов:   73 | Waterproof Bluetooth Speaker IPX7 Grade by Tech Ac
 9. "B0043GGDGC"         | Рейтинг: 4.93 | Отзывов:   57 | 1800mAh Battery Pac

In [49]:
# Установка Java и Spark

import os
import sys
import google.colab

!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Скачивание и распаковка Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Установка findspark
%pip install -q findspark

In [53]:
spark = None
spark_available = False

import findspark
findspark.init()


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, max as spark_max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

spark = SparkSession.builder \
    .appName("AmazonReviewsAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("Spark сессия создана")
print(f"Spark версия: {spark.version}")


Spark сессия создана
Spark версия: 4.0.1


In [54]:
# Определение схемы данных
schema = StructType([
        StructField("marketplace", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("review_id", StringType(), True),
        StructField("product_id", StringType(), True),
        StructField("product_parent", StringType(), True),
        StructField("product_title", StringType(), True),
        StructField("product_category", StringType(), True),
        StructField("star_rating", DoubleType(), True),
        StructField("helpful_votes", IntegerType(), True),
        StructField("total_votes", IntegerType(), True),
        StructField("vine", StringType(), True),
        StructField("verified_purchase", StringType(), True),
        StructField("review_headline", StringType(), True),
        StructField("review_body", StringType(), True),
        StructField("review_date", StringType(), True)
    ])

print("Схема данных определена")


Схема данных определена


In [61]:
# Загрузка данных в Spark DataFrame

print("Загрузка данных в Spark DataFrame.")

df = spark.read \
    .option("delimiter", "\t") \
    .option("header", "false") \
    .schema(schema) \
    .csv(dataset_path)

print(f"Данные загружены")
print(f"Количество строк: {df.count():,}")
print(f"Количество партиций: {df.rdd.getNumPartitions()}")

print("\nПервые 5 строк:")
df.select("product_id", "product_title", "product_category", "star_rating").show(5, truncate=False)

Загрузка данных в Spark DataFrame.
Данные загружены
Количество строк: 3,093,870
Количество партиций: 13

Первые 5 строк:
+----------+--------------------------------------------------------------------------------------------------------------------------+----------------+-----------+
|product_id|product_title                                                                                                             |product_category|star_rating|
+----------+--------------------------------------------------------------------------------------------------------------------------+----------------+-----------+
|product_id|product_title                                                                                                             |product_category|NULL       |
|B00428R89M|yoomall 5M Antenna WIFI RP-SMA Female to Male Extensionl Cable                                                            |Electronics     |5.0        |
|B000068O48|Hosa GPM-103 3.5mm TRS to 1/4" TRS Adaptor

In [57]:
# Анализ данных с использованием Spark SQL
# группировка и агрегация


print(f"Отзывов в категории Electronics: {df.count():,}")

# Группировка по product_id и вычисление статистики
product_stats = df.groupBy("product_id") \
    .agg(
        avg("star_rating").alias("avg_rating"),
        count("*").alias("review_count"),
        spark_max("product_title").alias("product_title"),
        spark_max("product_category").alias("product_category")
    ) \
    .filter(col("review_count") >= 50) \
    .orderBy(col("avg_rating").desc())

print(f"Товаров с >= 50 отзывами: {product_stats.count()}")

# Показываем топ-20 результатов
print("\nТоп-20 товаров по среднему рейтингу:")
print("=" * 100)
product_stats.select(
    "product_id",
    "avg_rating",
    "review_count",
    "product_title"
).show(20, truncate=False)


Отзывов в категории Electronics: 3,093,870
Товаров с >= 50 отзывами: 10632

Топ-20 товаров по среднему рейтингу:
+----------+------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_id|avg_rating        |review_count|product_title                                                                                                                                                                  |
+----------+------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|B00ZZ0VPBS|5.0               |54          |CITI ULTRA Series - PREMIUM HDMI CABLE HIGH SPEED 50 FEET [15 M] Black Nylon Brading with 24k Gold Plated                                                                      |
|B0

In [58]:
# Сохранение результатов в файл
output_path = "spark_results"

try:
    # Сохраняем результаты в CSV
    product_stats.coalesce(1).write \
        .mode("overwrite") \
        .option("header", "true") \
        .csv(output_path)

    print(f"Результаты сохранены в {output_path}/")

    # Также сохраняем в более удобном формате (первые 100 записей)
    top_products = product_stats.limit(100).toPandas()
    top_products.to_csv("spark_top_products.csv", index=False, encoding='utf-8')
    print(f"Топ-100 товаров сохранены в spark_top_products.csv")

except Exception as e:
    print(f"Ошибка при сохранении: {e}")


Результаты сохранены в spark_results/
Топ-100 товаров сохранены в spark_top_products.csv


In [60]:
print("Дополнительная статистика:")
print("=" * 80)

# Общая статистика по рейтингам
print("\n1. Общая статистика по рейтингам:")
df.select("star_rating").describe().show()

# Товары с наибольшим количеством отзывов
print("\n2. Товары с наибольшим количеством отзывов:")
df.groupBy("product_id", "product_title") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10) \
    .show(truncate=False)


Дополнительная статистика:

1. Общая статистика по рейтингам:
+-------+-----------------+
|summary|      star_rating|
+-------+-----------------+
|  count|          3093861|
|   mean|4.035506443243571|
| stddev|1.387438223328423|
|    min|              1.0|
|    max|              5.0|
+-------+-----------------+


2. Товары с наибольшим количеством отзывов:
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|product_id|product_title                                                                                                                                                                                                |count|
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [62]:
%pip install luigi



In [44]:
%%writefile spark_luigi_script.py
import sys
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, max as spark_max

dataset_file = sys.argv[1] if len(sys.argv) > 1 else os.environ.get('DATASET_PATH', 'amazon_reviews_us_Electronics_v1_00.tsv')


# Инициализация Spark
spark = SparkSession.builder \
    .appName("AmazonReviewsLuigi") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Загрузка данных
df = spark.read \
    .option("delimiter", "\t") \
    .option("header", "false") \
    .option("inferSchema", "true") \
    .csv(dataset_file)

# Назначение имен колонкам
columns = ["marketplace", "customer_id", "review_id", "product_id",
           "product_parent", "product_title", "product_category", "star_rating",
           "helpful_votes", "total_votes", "vine", "verified_purchase",
           "review_headline", "review_body", "review_date"]
df = df.toDF(*columns)

# Анализ
result = df.filter(col("product_category").contains("Electronics")) \
    .groupBy("product_id") \
    .agg(
        avg("star_rating").alias("avg_rating"),
        count("*").alias("review_count"),
        spark_max("product_title").alias("product_title")
    ) \
    .filter(col("review_count") >= 50) \
    .orderBy(col("avg_rating").desc())

# Сохранение результатов
result.limit(100).toPandas().to_csv("luigi_spark_output.csv", index=False)

spark.stop()
print("Spark analysis completed")


Writing spark_luigi_script.py


In [45]:
%%writefile luigi_pipeline.py

import luigi
import subprocess
import os

DATASET_FILE = os.environ.get('DATASET_PATH', 'amazon_reviews_us_Electronics_v1_00.tsv')


class CheckData(luigi.Task):
    """Проверка наличия данных"""

    def output(self):
        return luigi.LocalTarget(DATASET_FILE)

    def run(self):
         if not os.path.exists(DATASET_FILE):
            raise FileNotFoundError(f"Dataset not found: {DATASET_FILE}")

class RunSparkAnalysis(luigi.Task):
    """Запуск Spark анализа"""

    def requires(self):
        return CheckData()

    def output(self):
        return luigi.LocalTarget("luigi_spark_output.csv")

    def run(self):
        subprocess.run(
            ["/content/spark-3.5.0-bin-hadoop3/bin/spark-submit",
             "spark_luigi_script.py", DATASET_FILE],
            check=True
        )

class GenerateReport(luigi.Task):
    """Генерация финального отчета"""

    def requires(self):
        return RunSparkAnalysis()

    def output(self):
        return luigi.LocalTarget("final_report.txt")

    def run(self):
        import pandas as pd
        from datetime import datetime

        df = pd.read_csv("luigi_spark_output.csv")

        with self.output().open('w') as f:
            f.write(f"Amazon Reviews Analysis Report\n")
            f.write(f"Generated: {datetime.now()}\n")
            f.write(f"=" * 80 + "\n\n")
            f.write(f"Top products found: {len(df)}\n")
            f.write(f"Best rated product: {df.iloc[0]['product_title']}\n")
            f.write(f"Average rating: {df.iloc[0]['avg_rating']:.2f}\n")

if __name__ == "__main__":
    luigi.run()


Writing luigi_pipeline.py


In [46]:
# Запуск Luigi pipeline

if 'IN_COLAB' in globals() and IN_COLAB and spark_available:
    print("Запуск Luigi pipeline...")
    !python -m luigi --module luigi_pipeline GenerateReport --local-scheduler

    # Показываем финальный отчет
    if os.path.exists("final_report.txt"):
        print("\n" + "=" * 80)
        print("ФИНАЛЬНЫЙ ОТЧЕТ:")
        print("=" * 80)
        with open("final_report.txt", "r") as f:
            print(f.read())
else:
    print("Luigi pipeline доступен только в Google Colab с установленным Spark")


Запуск Luigi pipeline...
DEBUG: Checking if GenerateReport() is complete
DEBUG: Checking if RunSparkAnalysis() is complete
INFO: Informed scheduler that task   GenerateReport__99914b932b   has status   PENDING
DEBUG: Checking if CheckData() is complete
INFO: Informed scheduler that task   RunSparkAnalysis__99914b932b   has status   PENDING
INFO: Informed scheduler that task   CheckData__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 40796] Worker Worker(salt=6622933019, workers=1, host=627db7e8b709, username=root, pid=40796) running   RunSparkAnalysis()
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/18 14:37:24 INFO SparkContext: Running Spark version 4.0.1
25/12/18 14:37:24 INFO SparkContext: OS info Linux, 6.6.105+, amd64
25/12/18 14:37:24 INFO SparkContext: Java version 17.0.17
25/12/18 14:37:24 WARN NativeCodeLoader: Unable

In [48]:
# Очистка ресурсов Spark
if 'spark' in globals() and spark is not None:
    try:
        spark.stop()
        print("Spark сессия остановлена")
    except:
        pass


Spark сессия остановлена
