In [21]:
import pandas as pd

# Загружаем
ratings = pd.read_csv('books_rating.csv')
books = pd.read_csv('books_data.csv')

# Приводим Title к единому виду
ratings['Title_clean'] = ratings['Title'].str.strip().str.lower()
books['Title_clean'] = books['Title'].str.strip().str.lower()

# Join
merged = pd.merge(
    ratings,
    books[['Title_clean', 'categories']],
    on='Title_clean',
    how='left'
)

# Сохраняем БЕЗ лишних индексов, с заголовком
merged.to_csv('reviews_full.csv', index=False)
print("Объединено строк:", len(merged))
print("Пример жанра:", merged['categories'].iloc[0])

Объединено строк: 3320453
Пример жанра: ['Comics & Graphic Novels']


In [22]:
%%writefile mr_q1.py
from mrjob.job import MRJob
import csv

def length_cat(n_words: int) -> str:
    if n_words < 100:
        return 'short'
    elif n_words <= 500:
        return 'medium'
    else:
        return 'long'

def parse_help(s: str) -> float:
    try:
        a, b = map(int, s.split('/'))
        return a / b if b > 0 else 0.0
    except:
        return 0.0

class MRQ1(MRJob):
    def mapper(self, _, line):
        # Пропускаем заголовок
        if line.startswith('Id,Title') or line.startswith('"Id"'):
            return
        try:
            fields = next(csv.reader([line]))
            if len(fields) < 10:
                return
            helpful = fields[5].strip()  # review/helpfulness
            text = fields[9].strip()      # review/text

            if not text or len(text) < 5:
                return

            n_words = len(text.split())
            cat = length_cat(n_words)
            ratio = parse_help(helpful)

            yield cat, (ratio, 1)
        except:
            pass

    def reducer(self, cat, values):
        total, cnt = 0.0, 0
        for r, c in values:
            total += r * c
            cnt += c
        if cnt > 0:
            yield cat, round(total / cnt, 4)

if __name__ == '__main__':
    MRQ1.run()

Writing mr_q1.py


In [23]:
%%writefile mr_q2.py
from mrjob.job import MRJob
import csv

def extract_genre(cat_str: str) -> str:
    if not cat_str or cat_str in ('nan', '', '[]', 'None'):
        return 'Unknown'
    cat_str = cat_str.strip()
    if not cat_str:
        return 'Unknown'

    # Парсим "['Comics & Graphic Novels']"
    try:
        if cat_str.startswith("['") and cat_str.endswith("']"):
            # Убираем [' и ']
            inner = cat_str[2:-2]
            # Делим по "', '" — стандартный формат списка
            parts = [p.strip() for p in inner.split("', '")]
            if parts and parts[0]:
                return parts[0]
    except:
        pass

    # Fallback: простой split — на случай других форматов
    try:
        if cat_str.startswith('[') and cat_str.endswith(']'):
            inner = cat_str[1:-1].strip()
            if inner:
                # Убираем кавычки и делим
                inner = inner.replace('"', '').replace("'", "")
                parts = [p.strip() for p in inner.split(',')]
                if parts and parts[0]:
                    return parts[0]
    except:
        pass

    # Если строка слишком длинная или похожа на название книги — отбрасываем
    if len(cat_str) > 60 or ':' in cat_str or '(' in cat_str or ' - ' in cat_str or '\n' in cat_str:
        return 'Unknown'

    return cat_str[:50]

class MRQ2(MRJob):
    def mapper(self, _, line):
        # Пропускаем заголовок
        if line.startswith('Id') or line.startswith('"Id"'):
            return
        try:
            fields = next(csv.reader([line]))
            if len(fields) < 12:
                return

            text = fields[9].strip()
            cat_field = fields[11].strip()

            if not text or len(text.split()) < 5:
                return

            n_words = len(text.split())
            genre = extract_genre(cat_field)

            # Фильтруем некорректные "жанры"
            if (genre != 'Unknown'
                and len(genre) >= 2
                and not genre.startswith(('['))
                and not genre[0].isdigit()
                and ' ' in genre):
                yield genre, (n_words, 1)
        except:
            pass

    def reducer(self, genre, values):
        total, cnt = 0, 0
        for w, c in values:
            total += w * c
            cnt += c
        if cnt >= 3:
            avg = round(total / cnt, 1)
            yield genre, (avg, cnt)

if __name__ == '__main__':
    MRQ2.run()

Writing mr_q2.py


In [24]:
# создаём выборку
!head -20000 reviews_full.csv > test.csv

# Запускаем
!python mr_q1.py test.csv > table1_raw.txt
!python mr_q2.py test.csv > table2_raw.txt

print("Готово.")

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr_q1.root.20251211.193501.262727
Running step 1 of 1...
job output is in /tmp/mr_q1.root.20251211.193501.262727/output
Streaming final output from /tmp/mr_q1.root.20251211.193501.262727/output...
Removing temp directory /tmp/mr_q1.root.20251211.193501.262727...
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr_q2.root.20251211.193503.432003
Running step 1 of 1...
job output is in /tmp/mr_q2.root.20251211.193503.432003/output
Streaming final output from /tmp/mr_q2.root.20251211.193503.432003/output...
Removing temp directory /tmp/mr_q2.root.20251211.193503.432003...
Готово.


In [25]:
# Чтение результатов
def read_table1():
    data = {}
    try:
        with open('table1_raw.txt') as f:
            for line in f:
                line = line.strip()
                if line and '\t' in line:
                    cat, val = line.split('\t')
                    cat = cat.strip().strip('"')
                    try:
                        data[cat] = float(val.strip().strip('"'))
                    except:
                        pass
    except Exception as e:
        print("Ошибка чтения table1_raw.txt:", e)
    return data

def read_table2():
    items = []
    try:
        with open('table2_raw.txt') as f:
            for line in f:
                line = line.strip()
                if line and '\t' in line:
                    genre, stats = line.split('\t', 1)
                    genre = genre.strip().strip('"')
                    stats = stats.strip().strip('"')
                    # Парсим [avg, count] или (avg, count)
                    import re
                    match = re.search(r'[\[($]?\s*([\d.]+)\s*,\s*(\d+)', stats)
                    if match:
                        avg = float(match.group(1))
                        cnt = int(match.group(2))
                        items.append((genre, avg, cnt))
    except Exception as e:
        print("Ошибка чтения table2_raw.txt:", e)
    return items

# Получаем данные
table1 = read_table1()
table2 = read_table2()

print("Получено категорий длины:", len(table1))
print("Получено жанров:", len(table2))

# Сохраняем таблицу 1
if table1:
    with open('table1_length_helpfulness.txt', 'w', encoding='utf-8') as f:
        f.write("ТАБЛИЦА 1: Длина отзыва (в словах) → полезность\n")
        f.write("="*60 + "\n")
        f.write(f"{'Категория':<20} | {'Средняя полезность'}\n")
        f.write("-"*35 + "\n")
        for cat in ['short', 'medium', 'long']:
            name = {'short': 'Короткие (<100)', 'medium': 'Средние (100–500)', 'long': 'Длинные (>500)'}[cat]
            val = table1.get(cat, 0.0)
            f.write(f"{name:<20} | {val:.4f}\n")
    print("table1_length_helpfulness.txt сохранена")
else:
    print("Таблица 1 пуста")

# Сохраняем таблицу 2
if table2:
    table2.sort(key=lambda x: x[1], reverse=True)
    with open('table2_genre_length.txt', 'w', encoding='utf-8') as f:
        f.write("ТАБЛИЦА 2: Жанры → средняя длина отзыва (в словах)\n")
        f.write("="*70 + "\n")
        f.write(f"{'Жанр':<40} | {'Слова'} | {'Отзывы'}\n")
        f.write("-"*70 + "\n")
        for genre, avg, n in table2[:20]:
            g = (genre[:37] + '...') if len(genre) > 40 else genre
            f.write(f"{g:<40} | {avg:>6.1f} | {n:>6}\n")
    print("table2_genre_length.txt сохранена")
else:
    print("Таблица 2 пуста")

Получено категорий длины: 3
Получено жанров: 84
table1_length_helpfulness.txt сохранена
table2_genre_length.txt сохранена


In [15]:
print("=" * 70)
print("ЧАСТЬ 2: APACHE SPARK АНАЛИЗ")
print("=" * 70)

!apt-get update -qq > /dev/null
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz > /dev/null

!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

import findspark
findspark.init()

ЧАСТЬ 2: APACHE SPARK АНАЛИЗ
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [16]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("BookReviewsAnalysis") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print(f"Spark готов. Версия: {spark.version}")

Spark готов. Версия: 3.5.0


In [17]:
# Загружаем данные
ratings = spark.read.option("header", "true").csv("books_rating.csv")
books = spark.read.option("header", "true").csv("books_data.csv")

# Подготовка: приводим названия к нижнему регистру
ratings = ratings.withColumn("title_norm", F.lower(F.trim(F.col("Title"))))
books = books.withColumn("title_norm", F.lower(F.trim(F.col("Title"))))

# Join по названию
df = ratings.join(books.select("title_norm", "categories"), "title_norm", "inner")

print(f"Объединено строк: {df.count()}")

Объединено строк: 3320282


In [27]:
# 1. Длина отзыва → в словах
df = df.withColumn("review_words", F.size(F.split(F.col("review/text"), "\\s+")))

# 2. Полезность: парсим "7/7" → 1.0
df = df.withColumn(
    "helpfulness_ratio",
    F.when(F.col("review/helpfulness").rlike(r"\d+/\d+"),
        F.split(F.col("review/helpfulness"), "/")[0].cast("double") /
        F.split(F.col("review/helpfulness"), "/")[1].cast("double")
    ).otherwise(0.0)
)

# 3. Категория длины (3 категории)
df = df.withColumn(
    "length_cat",
    F.when(F.col("review_words") < 100, "short")
     .when(F.col("review_words") <= 500, "medium")
     .otherwise("long")
)

# 4. Жанр: извлекаем первый из списка вида "['Comics & Graphic Novels']"
df = df.withColumn(
    "helpfulness_ratio",
    F.when(
        F.col("review/helpfulness").rlike(r"^\s*\d+\s*/\s*\d+\s*$"),
        F.split(F.col("review/helpfulness"), "/")[0].cast("int") /
        F.when(F.split(F.col("review/helpfulness"), "/")[1].cast("int") > 0,
               F.split(F.col("review/helpfulness"), "/")[1].cast("int"))
         .otherwise(None)
    ).otherwise(0.0)
).na.fill({"helpfulness_ratio": 0.0})

print("Поля извлечены. Пример:")
df.select("review_words", "helpfulness_ratio", "length_cat", "genre").show(3)

Поля извлечены. Пример:
+------------+------------------+----------+----------+
|review_words| helpfulness_ratio|length_cat|     genre|
+------------+------------------+----------+----------+
|          38|               1.0|     short|Study Aids|
|         109|0.7777777777777778|    medium|Study Aids|
|          20|               0.0|     short|Study Aids|
+------------+------------------+----------+----------+
only showing top 3 rows



In [29]:
# Агрегация по категориям длины
table1 = df.groupBy("length_cat") \
    .agg(
        F.round(F.avg("helpfulness_ratio"), 4).alias("avg_helpfulness"),
        F.count("*").alias("review_count")
    ) \
    .orderBy(
        F.when(F.col("length_cat") == "short", 1)
         .when(F.col("length_cat") == "medium", 2)
         .otherwise(3)
    )

print("ТАБЛИЦА 1: Длина отзыва (в словах) → полезность")
table1.show()

# Сохраняем в CSV (1 файл)
table1.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("/content/table1_spark")

print("Таблица 1 сохранена в /content/table1_spark/")

ТАБЛИЦА 1: Длина отзыва (в словах) → полезность
+----------+---------------+------------+
|length_cat|avg_helpfulness|review_count|
+----------+---------------+------------+
|     short|         0.4081|     1463446|
|    medium|         0.5989|      864802|
|      long|          0.691|       28888|
+----------+---------------+------------+

Таблица 1 сохранена в /content/table1_spark/


In [30]:
# Агрегация по жанрам (только с ≥3 отзывами)
table2 = df.groupBy("genre") \
    .agg(
        F.round(F.avg("review_words"), 1).alias("avg_words"),
        F.count("*").alias("review_count")
    ) \
    .filter(F.col("review_count") >= 3) \
    .orderBy(F.desc("avg_words"))

print("ТАБЛИЦА 2: Жанры → средняя длина отзыва (в словах)")
table2.show(10, truncate=False)

# Сохраняем топ-10
table2.limit(10).coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("/content/table2_spark")

print("Таблица 2 сохранена в /content/table2_spark/")

ТАБЛИЦА 2: Жанры → средняя длина отзыва (в словах)
+--------------------------------------------------+---------+------------+
|genre                                             |avg_words|review_count|
+--------------------------------------------------+---------+------------+
|Violinists                                        |566.3    |3           |
|Crowds                                            |551.6    |8           |
|Bruce Wilshire                                    |495.0    |3           |
|Franco-Prussian War, 1870-1871                    |471.3    |3           |
|United States History Civil War, 1861-1865 Fiction|442.0    |4           |
|Liturgies, Early Christian                        |405.7    |6           |
|East End (London, England)                        |394.7    |3           |
|Evangelicalism                                    |392.3    |3           |
|Sensory deprivation                               |389.7    |7           |
|Medical ethics                      

In [31]:
print("Установка Apache Airflow 2.9.0...")

!pip install "apache-airflow==2.9.0" \
  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.12.txt" \
  --quiet

print("Airflow установлен")

Установка Apache Airflow 2.9.0...
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.9/48.9 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.3/13.3 MB[0m [31m81.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m233.4/233.4 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.9/78.9 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m68.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.6/42.6 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━

In [33]:
import os
os.environ["AIRFLOW_HOME"] = "/content/airflow"

!mkdir -p /content/airflow/dags /content/airflow/logs /content/airflow/plugins
!airflow db init > /dev/null 2>&1

print("Airflow инициализирован. Директории созданы.")

Airflow инициализирован. Директории созданы.


In [40]:
%%writefile /content/spark_book_analysis.py
import os
import sys

# Настройка окружения
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

def main():
    spark = SparkSession.builder \
        .appName("BookReviewsAnalysis") \
        .master("local[*]") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.driver.memory", "4g") \
        .getOrCreate()

    try:
        # Загрузка
        ratings = spark.read.option("header", "true").csv("/content/books_rating.csv")
        books = spark.read.option("header", "true").csv("/content/books_data.csv")

        # Подготовка
        ratings = ratings.withColumn("title_norm", F.lower(F.trim(F.col("Title"))))
        books = books.withColumn("title_norm", F.lower(F.trim(F.col("Title"))))

        # Join
        df = ratings.join(
            books.select("title_norm", "categories"),
            "title_norm",
            "inner"
        ).filter(F.col("review/text").isNotNull())

        # Извлечение признаков
        df = df.withColumn("words", F.size(F.split(F.col("review/text"), "\\s+"))) \
               .withColumn("help",
                    F.split(F.col("review/helpfulness"), "/")[0].cast("double") /
                    F.when(F.split(F.col("review/helpfulness"), "/")[1].cast("double") > 0,
                           F.split(F.col("review/helpfulness"), "/")[1].cast("double"))
                     .otherwise(1.0)
               ) \
               .withColumn("len_cat",
                    F.when(F.col("words") < 100, "short")
                     .when(F.col("words") <= 500, "medium")
                     .otherwise("long")
               ) \
               .withColumn("genre",
                    F.regexp_extract(F.col("categories"), r"\['([^']+)'", 1)
               ) \
               .filter(F.col("genre") != "")

        # Таблица 1: длина → полезность
        t1 = df.groupBy("len_cat") \
               .agg(F.round(F.avg("help"), 4).alias("avg_help")) \
               .coalesce(1)

        # Таблица 2: жанры → длина
        GENRES = ["Fiction", "Biography", "History", "Science", "Romance",
                  "Mystery", "Fantasy", "Science Fiction", "Psychology", "Art",
                  "Music", "Philosophy", "Religion", "Business", "Travel"]
        t2 = df.filter(F.lower(F.col("genre")).isin([g.lower() for g in GENRES])) \
               .groupBy("genre") \
               .agg(F.round(F.avg("words"), 1).alias("avg_words")) \
               .filter(F.col("avg_words") > 0) \
               .orderBy(F.desc("avg_words")) \
               .limit(20) \
               .coalesce(1)

        # Сохранение
        output_dir = "/content/airflow_output"
        os.makedirs(output_dir, exist_ok=True)

        t1.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/table1")
        t2.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/table2")

        print(f"Анализ завершён. Результаты в {output_dir}/")

    finally:
        spark.stop()

if __name__ == "__main__":
    main()

Overwriting /content/spark_book_analysis.py


In [38]:
%%writefile /content/airflow/dags/book_analysis_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'analyst',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
}

with DAG(
    dag_id="book_reviews_daily_analysis",
    default_args=default_args,
    description="Ежедневный анализ отзывов книг",
    schedule_interval="@daily",
    start_date=datetime(2025, 12, 10),
    catchup=False,
    tags=["books", "nlp", "spark"],
) as dag:

    run_analysis = BashOperator(
        task_id="run_spark_analysis",
        bash_command="python /content/spark_book_analysis.py"
    )

    run_analysis

Overwriting /content/airflow/dags/book_analysis_dag.py


In [41]:
print("Тестовый запуск Spark-анализа...")
!python /content/spark_book_analysis.py

print("\nПроверка результатов:")
!ls -la /content/airflow_output/
!head -n 5 /content/airflow_output/table1/*.csv 2>/dev/null || echo "table1: пусто"
!head -n 5 /content/airflow_output/table2/*.csv 2>/dev/null || echo "table2: пусто"

Тестовый запуск Spark-анализа...
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/11 20:13:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/11 20:13:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Анализ завершён. Результаты в /content/airflow_output/

Проверка результатов:
total 16
drwxr-xr-x 4 root root 4096 Dec 11 20:18 .
drwxr-xr-x 1 root root 4096 Dec 11 20:13 ..
drwxr-xr-x 2 root root 4096 Dec 11 20:16 table1
drwxr-xr-x 2 root root 4096 Dec 11 20:18 table2
len_cat,avg_help
long,0.691
medium,0.5989
short,0.414
genre,avg_words
RELIGION,212.6
PHILOSOPHY,171.3
Philosophy,131.4
Fantasy,126.2


In [42]:
# Чтение результатов через Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Check").getOrCreate()

try:
    t1_check = spark.read.option("header", "true").csv("/content/airflow_output/table1")
    t2_check = spark.read.option("header", "true").csv("/content/airflow_output/table2")

    print("ТАБЛИЦА 1 (длина → полезность):")
    t1_check.show()

    print("ТАБЛИЦА 2 (жанры → длина):")
    t2_check.show(truncate=False)

finally:
    spark.stop()

ТАБЛИЦА 1 (длина → полезность):
+-------+--------+
|len_cat|avg_help|
+-------+--------+
|   long|   0.691|
| medium|  0.5989|
|  short|   0.414|
+-------+--------+

ТАБЛИЦА 2 (жанры → длина):
+---------------+---------+
|genre          |avg_words|
+---------------+---------+
|RELIGION       |212.6    |
|PHILOSOPHY     |171.3    |
|Philosophy     |131.4    |
|Fantasy        |126.2    |
|History        |123.9    |
|Science fiction|119.5    |
|Fiction        |115.3    |
|Science        |115.1    |
|FICTION        |113.4    |
|Travel         |111.6    |
|SCIENCE        |111.0    |
|Psychology     |107.8    |
|Biography      |107.8    |
|Business       |107.5    |
|HISTORY        |106.2    |
|Religion       |104.7    |
|Music          |99.4     |
|Art            |95.4     |
|Mystery        |83.5     |
|PSYCHOLOGY     |80.3     |
+---------------+---------+

