# Лабораторная работа №3

**Датасет:** `yelp_academic_dataset_business.json` (Yelp Open Dataset, бизнесы)

В файле содержится информация о заведениях (рестораны, кафе, салоны и т.д.) на Yelp:
- `business_id` — идентификатор бизнеса
- `name` — название
- `city`, `state` — город и штат
- `stars` — средний рейтинг (1–5)
- `review_count` — количество отзывов
- `categories` — строка с перечислением категорий через запятую

---

## Исследовательский вопрос

**Вопрос:**  
> Какие категории бизнеса в Yelp имеют самый высокий средний рейтинг,  
> если учитывать только категории, в которых не меньше 20 заведений?

---

## Структура работы (соответствует заданию ЛР)

1. **Часть 1. MapReduce (в стиле Hadoop Streaming)**  
   - Используем библиотеку `mrjob` как аналог Hadoop Streaming.  
   - Считаем средний рейтинг по категориям.

2. **Часть 2. Apache Spark**  
   - Повторяем тот же расчёт (средний рейтинг по категориям) через Spark DataFrame API и Spark SQL.  
   - Сравниваем подход с MapReduce.

3. **Часть 3. Витрина данных и оркестрация (Prefect)**  
   - Строим витрину: таблица `category, avg_rating, n_business`.  
   - Собираем пайплайн `check_file → spark_job → export_to_csv`, работающий как DAG (аналог Airflow).

In [18]:
!pip install mrjob pyspark prefect -q
print("Готово: mrjob, pyspark, prefect установлены.")


Готово: mrjob, pyspark, prefect установлены.


In [None]:
from google.colab import files
import json
import pandas as pd

print("Загрузите файл yelp_academic_dataset_business.json")
uploaded = files.upload()

filename = list(uploaded.keys())[0]
print("Используем файл:", filename)


Загрузите файл yelp_academic_dataset_business.json


In [None]:
rows = []
with open(filename, "r", encoding="utf-8") as f:
    for i in range(5):
        rows.append(json.loads(f.readline()))

rows


## Часть 1. MapReduce-подход (mrjob)

Здесь мы имитируем работу Hadoop Streaming:

- **Mapper**:
  - читает строку JSON
  - достаёт `stars` и `categories`
  - разбивает категории по запятой
  - для каждой категории отдаёт пару `(category) -> (stars, 1)`

- **Reducer**:
  - для каждой категории суммирует `stars` и количество бизнесов
  - считает средний рейтинг `avg_rating = sum(stars) / count`

Результат: для каждой категории — средний рейтинг по MapReduce-модели.


In [None]:
%%writefile yelp_business_mrjob.py
from mrjob.job import MRJob
import json

class YelpBusinessCategoryRating(MRJob):
    """
    MapReduce job для вычисления среднего рейтинга по категориям.
    Это аналог Hadoop Streaming: mapper + reducer.
    """

    def mapper(self, _, line):
        # Парсим JSON-строку
        try:
            row = json.loads(line)
        except:
            return

        stars = row.get("stars")
        categories = row.get("categories")

        # Если нет рейтинга или категорий — пропускаем
        if stars is None or categories is None:
            return

        # categories — строка вида "Restaurants, Pizza, Italian"
        for cat in categories.split(","):
            cat = cat.strip()
            if not cat:
                continue
            # Выдаём (category) -> (stars, 1)
            yield cat, (stars, 1)

    def reducer(self, category, values):
        # values — это последовательность (stars, 1)
        total_stars = 0.0
        total_count = 0
        for stars, cnt in values:
            total_stars += float(stars)
            total_count += cnt

        if total_count > 0:
            avg_rating = total_stars / total_count
            # Результат: category -> средний рейтинг
            yield category, avg_rating

if __name__ == "__main__":
    YelpBusinessCategoryRating.run()


In [None]:
# Запуск MapReduce job (аналог запуска Hadoop Streaming)
!python yelp_business_mrjob.py "{filename}" > mr_output.txt
print("MapReduce завершён. Результат записан в mr_output.txt")


In [None]:
mr_results = []

with open("mr_output.txt", "r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line or "\t" not in line:
            continue
        category, avg_rating_str = line.split("\t", 1)
        try:
            avg_rating = float(avg_rating_str)
            mr_results.append((category, avg_rating))
        except ValueError:
            continue

mr_df = pd.DataFrame(mr_results, columns=["category", "avg_rating"])

# Отфильтруем категории, где мало бизнесов, чтобы можно было сравнивать со Spark
# Пока просто сохраняем сырые результаты — количество учтём в Spark.
mr_df_sorted = mr_df.sort_values("avg_rating", ascending=False)
mr_df_sorted.head(20)


## Часть 2. Apache Spark

В этой части используем Apache Spark:

1. Читаем `yelp_academic_dataset_business.json` в Spark DataFrame.
2. Разворачиваем поле `categories` (explode).
3. Считаем:
   - число бизнесов по каждой категории (`n_business`);
   - средний рейтинг `avg_rating`.
4. Ограничиваемся категориями, где `n_business >= 20` — это условие из нашего исследовательского вопроса.
5. Повторяем вычисления через Spark SQL (как аналог Hive).


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("YelpBusinessAnalysis") \
    .master("local[*]") \
    .getOrCreate()

spark


In [None]:
spark_df = spark.read.json(filename)
spark_df.printSchema()
spark_df.show(5, truncate=False)


In [None]:
from pyspark.sql.functions import split, explode, trim, col, count, avg

# Разворачиваем поле categories в отдельные строки
df_cat = (
    spark_df
    .withColumn("category", explode(split(col("categories"), ",")))
    .withColumn("category", trim(col("category")))
    .filter(col("category") != "")
    .filter(col("stars").isNotNull())
)

df_cat.show(10, truncate=False)


In [None]:
rating_dist = (
    spark_df.groupBy("stars")
            .agg(count("*").alias("n_business"))
            .orderBy("stars")
)

print("Распределение бизнесов по среднему рейтингу:")
rating_dist.show()


In [None]:
cat_stats = (
    df_cat.groupBy("category")
          .agg(
              avg("stars").alias("avg_rating"),
              count("*").alias("n_business")
          )
)

# Ограничиваем категории, где не меньше 20 заведений (условие вопроса)
cat_stats_filtered = (
    cat_stats.filter(col("n_business") >= 20)
             .orderBy(col("avg_rating").desc())
)

print("Топ-20 категорий по среднему рейтингу (Spark, n_business >= 20):")
cat_stats_filtered.show(20, truncate=False)


In [None]:
# Регистрируем временную таблицу, чтобы писать SQL-запросы
df_cat.createOrReplaceTempView("business_cat")

print("Топ-20 категорий по среднему рейтингу (Spark SQL, n_business >= 20):")
spark.sql("""
    SELECT
        category,
        AVG(stars) AS avg_rating,
        COUNT(*) AS n_business
    FROM business_cat
    GROUP BY category
    HAVING COUNT(*) >= 20
    ORDER BY avg_rating DESC
    LIMIT 20
""").show(truncate=False)


In [None]:
# Сохраняем витрину категорий: category, avg_rating, n_business
mart_categories = cat_stats_filtered

mart_categories.write.mode("overwrite").parquet("mart_yelp_categories")
print("Витрина 'mart_yelp_categories' сохранена в формате Parquet.")


## Часть 3. Оркестрация пайплайна (Prefect как аналог Airflow)

Построим pipeline:

1. `check_file` — проверка наличия `yelp_academic_dataset_business.json`;
2. `spark_mart` — Spark-задача, которая считает витрину (avg_rating + n_business по категориям) и сохраняет её в Parquet;
3. `export_to_csv` — конвертация витрины в CSV.

Все три шага объединяются в один `flow` (`yelp_pipeline`), который можно запускать по расписанию.


In [None]:
from prefect import flow, task
import os

@task
def check_file(path: str):
    """
    Проверка наличия исходных данных.
    Аналог первой задачи DAG в Airflow:
    'проверить, что данные доступны'.
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Файл {path} не найден")
    size_mb = os.path.getsize(path) / (1024 * 1024)
    print(f"Файл {path} найден, размер ~{size_mb:.2f} MB")
    return path

@task
def spark_mart(path: str):
    """
    Spark-задача: считает витрину категорий (avg_rating, n_business)
    и сохраняет её в формате Parquet.
    """
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import split, explode, trim, col, avg, count

    spark = SparkSession.builder.appName("PrefectYelpJob").getOrCreate()

    df = spark.read.json(path)

    df_cat = (
        df.withColumn("category", explode(split(col("categories"), ",")))
          .withColumn("category", trim(col("category")))
          .filter(col("category") != "")
          .filter(col("stars").isNotNull())
    )

    mart = (
        df_cat.groupBy("category")
              .agg(
                  avg("stars").alias("avg_rating"),
                  count("*").alias("n_business")
              )
              .filter(col("n_business") >= 20)
    )

    out_path = "mart_yelp_categories_prefect"
    mart.write.mode("overwrite").parquet(out_path)
    print(f"Витрина сохранена в {out_path}")
    return out_path

@task
def export_to_csv(parquet_path: str, csv_path: str = "yelp_categories_mart.csv"):
    """
    Экспорт витрины в CSV для пользователя / BI-инструмента.
    Это финальная часть конвейера.
    """
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("ExportMart").getOrCreate()

    df = spark.read.parquet(parquet_path)
    df.toPandas().to_csv(csv_path, index=False)
    print(f"CSV с витриной сохранён в {csv_path}")
    return csv_path

@flow
def yelp_pipeline(path: str):
    """
    Главный flow (аналог DAG в Airflow):
    check_file → spark_mart → export_to_csv
    """
    src = check_file(path)
    mart_parquet = spark_mart(src)
    csv_file = export_to_csv(mart_parquet)
    return csv_file

print("Пайплайн Prefect 'yelp_pipeline' определён.")


In [None]:
result_csv = yelp_pipeline(filename)
print("Pipeline завершён, итоговый CSV:", result_csv)


In [None]:
final_mart = pd.read_csv("yelp_categories_mart.csv")
final_mart.head(20)
