# Блок 1. Standalone Spark

### 1) Развернуть standalone cluster Spark: master + 2 workers. Приложить скрипт и/или алгоритм + скрин webui (10 баллов)

Скрины webui и файл docker-compose.yml в репозитории task_1

### 2) Подключиться к кластеру с помощью Jupyter и/или Zeppelin. Приложить скрипт и/или алгоритм + скрин рабочей сессии из инструмента (20 баллов)

Файл docker-compose.yml для подключения Jupyter также в репозитории task_1

Я сделала один файл для всего, чтобы одной командой все сразу запустить можно было

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark").master("spark://spark-master:7077").getOrCreate()
spark

### 3) + 10 дополнительных баллов за развертывание и подключение к HDFS

В прошлом ДЗ уже развертывали подключение к кластеру hadoop c hdfs. Поэтому для подтверждения все файлы из задания 2 были перенесены сюда, этому отобразим их:

In [4]:
import requests

response = requests.get(f"http://hadoop-namenode:9870/webhdfs/v1/upload?op=LISTSTATUS").json()

if response.get("FileStatuses"):
    files_data = response["FileStatuses"].get("FileStatus")
    books = [file.get("pathSuffix") for file in files_data if 'book' in file.get("pathSuffix")]
print(*books, sep = ', ')

book1-100k.csv, book1000k-1100k.csv, book100k-200k.csv, book1100k-1200k.csv, book1200k-1300k.csv, book1300k-1400k.csv, book1400k-1500k.csv, book1500k-1600k.csv, book1600k-1700k.csv, book1700k-1800k.csv, book1800k-1900k.csv, book1900k-2000k.csv, book2000k-3000k.csv, book200k-300k.csv, book3000k-4000k.csv, book300k-400k.csv, book4000k-5000k.csv, book400k-500k.csv, book500k-600k.csv, book600k-700k.csv, book700k-800k.csv, book800k-900k.csv, book900k-1000k.csv


# Блок 2. Работа с данными на Spark

### 1) Преобразовать данные исходного датасета в parquet объединяя все таблицы. Оценить разницу в скорости чтения / занимаемом объеме. Сделать выводы. (15 баллов)

In [5]:
from pyspark.sql.types import StructType, StructField, StringType

# Определяем схему данных, используя первую строку файла
schema = spark.read.csv(f"hdfs://hadoop-namenode:9000/upload/{books[0]}", header=True, inferSchema=True).limit(1).schema

# Создаем пустой DataFrame с заданной схемой
spark_df = spark.createDataFrame(data = [], schema=schema)

                                                                                

In [7]:
# Добавляем данные по книгам в spark_data
for file in books:
    spark_data = (
        spark.read
        .option("multiline", "true")
        .option("quote", '"')
        .option("header", "true")
        .option("escape", "\\")
        .option("escape", '"')
        .csv(f"hdfs://hadoop-namenode:9000/upload/{file}")
    )
    spark_df = spark_df.unionByName(spark_data, allowMissingColumns=True)

                                                                                

In [8]:
# Записываем данные в нужном формате
write = spark_df.write \
              .option(header=True) \
              .mode("overwrite") \
              .parquet(path="hdfs://hadoop-namenode:9000/book.parquet")


                                                                                

In [6]:
# Считываем их через spark.read.parquet
spark_df = spark.read.parquet(f"hdfs://hadoop-namenode:9000/book.parquet")

                                                                                

In [57]:
import time
import os

# Загрузка исходных данных
start_time_csv = time.time()
df_csv = spark.read.csv(f"hdfs://hadoop-namenode:9000/upload/*.csv", header=True, inferSchema=True)
end_time_csv = time.time()

# Загрузка данных в формате Parquet
start_time_parquet = time.time()
df_parquet = spark.read.parquet(f"hdfs://hadoop-namenode:9000/book.parquet")
end_time_parquet = time.time()

# Сравнение времени загрузки файла
print("Время загрузки данных из CSV: ", end_time_csv - start_time_csv)
print("Время загрузки данных из Parquet: ", end_time_parquet - start_time_parquet)

# Сравнение размера файла
response_csv = requests.get(f"http://hadoop-namenode:9000/webhdfs/v1/upload?op=LISTSTATUS").json()
if response_csv.get("FileStatuses"):
    files_data = response_csv["FileStatuses"].get("FileStatus")
    files_size = [int(file.get("length")) for file in files_data if "book" in file.get("pathSuffix")]
print("Размер файла CSV: ", sum(files_size) / 1024**2)

response_parquet = requests.get(f"http://hadoop-namenode:9000/webhdfs/v1/book.parquet?op=LISTSTATUS").json()
if response_parquet.get("FileStatuses"):
    files_data = response_parquet["FileStatuses"].get("FileStatus")
    files_size = [int(file.get("length")) for file in files_data if "" in file.get("pathSuffix")]
print("Размер файла Parquet: ", sum(files_size) / 1024**2)


                                                                                

Время загрузки данных из CSV:  24.131360054016113
Время загрузки данных из Parquet:  0.32906532287597656
Размер файла CSV:  1110.6515398025513
Размер файла Parquet:  692.3238620758057


#### Видно, что данные, преобразованные в parquet занимают в 2 раза меньше памяти и времени на их загрузку также необходимо намного меньше.

### 2) Используя весь набор данных с помощью Spark вывести (5 баллов за каждое задание)

#### a) Топ-10 книг с наибольшим числом ревью

In [50]:
from pyspark.sql.types import IntegerType

# Перевод в тип integer
spark_df = spark_df.withColumn("CountsOfReview", spark_df.CountsOfReview.cast(IntegerType()))

# Сортируем значения колонки по убыванию
top_review_books = spark_df.select("Name", "CountsOfReview").orderBy(spark_df.CountsOfReview.desc())

# Выбираем топ 10
top_review_books.limit(10).show(truncate=False)



+---------------------------------------------------------+--------------+
|Name                                                     |CountsOfReview|
+---------------------------------------------------------+--------------+
|The Hunger Games (The Hunger Games, #1)                  |154447        |
|Twilight (Twilight, #1)                                  |94850         |
|The Book Thief                                           |87685         |
|The Help                                                 |76040         |
|Harry Potter and the Sorcerer's Stone (Harry Potter, #1) |75911         |
|The Giver (The Giver, #1)                                |57034         |
|Water for Elephants                                      |52918         |
|The Girl with the Dragon Tattoo (Millennium, #1)         |52225         |
|Harry Potter and the Deathly Hallows (Harry Potter, #7)  |52088         |
|The Lightning Thief (Percy Jackson and the Olympians, #1)|48630         |
+------------------------

                                                                                

#### b) Топ-10 издателей с наибольшим средним числом страниц в книгах

In [18]:
from pyspark.sql.functions import avg

# Группируем по издателям, считаем среднее кол-во страниц
top_publishers = spark_df.groupBy("Publisher").agg(avg("pagesNumber").alias("avg_pages"))

# Выбираем топ 10
top_publishers = spark_df.orderBy("avg_pages", ascending=False).limit(10)
top_publishers.show(truncate=False)



+-----------------------------------------------------------+------------------+
|Publisher                                                  |avg_pages         |
+-----------------------------------------------------------+------------------+
|Crafty Secrets Publications                                |1807321.6         |
|Sacred-texts.com                                           |500000.0          |
|Department of Russian Language and Literature University of|322128.5714285714 |
|Logos Research Systems                                     |100000.0          |
|Encyclopedia Britannica, Incorporated                      |32642.0           |
|Progressive Management                                     |19106.3625        |
|Still Waters Revival Books                                 |10080.142857142857|
|P. Shalom Publications, Incorporated                       |8539.0            |
|Hendrickson Publishers, Inc. (Peabody, MA)                 |6448.0            |
|IEEE/EMB                   

                                                                                

#### c) Десять наиболее активных по числу изданных книг лет

In [21]:
from pyspark.sql.functions import col, desc

# Убираем пустые значения из Года публикации и груупируя по этой колонке считаем кол-во строчек
top_publish_years = spark_df.filter(col("PublishYear").isNotNull()).groupBy("PublishYear").count()

# Выбираем топ 10
top_publish_years = spark_df.orderBy(desc("count")).limit(10)
top_publish_years.show(truncate=False)



+-----------+------+
|PublishYear|count |
+-----------+------+
|2007       |129503|
|2006       |122363|
|2005       |117630|
|2004       |105727|
|2003       |104341|
|2002       |95531 |
|2001       |88224 |
|2000       |87286 |
|2008       |80265 |
|1999       |80153 |
+-----------+------+



                                                                                

#### d) Топ-10 книг имеющих наибольший разброс в оценках среди книг имеющих больше 500 оценок

In [53]:
from pyspark.sql.functions import regexp_extract, col, split, array, explode, stddev_samp

filtered_books = (spark_df
                  .select('Id', 'Name', 'RatingDist1', 'RatingDist2', 'RatingDist3', 'RatingDist4', 'RatingDist5', 'RatingDistTotal')
                  
                  # Извлекаем только число из колонки RatingDistTotal
                  .withColumn('RatingDistTotal', regexp_extract('RatingDistTotal', r'(\d+)', 1).cast('int'))
                  
                   # Отфильтровываем строки, которые имеют более 500 оценок
                  .filter(col('RatingDistTotal') > 500)
                  
                  # Создаем новую колонку, объединяющую все оценки в массив
                  .withColumn('RatingDist', array(*[split(col(c), ':').getItem(1).cast('int') for c in ['RatingDist1', 'RatingDist2', 'RatingDist3', 'RatingDist4', 'RatingDist5']]))
                  
                  # Разворачиваем массив в отдельные строки
                  .select('Id', 'Name', explode('RatingDist').alias('Rating'))
                  .groupBy('Id', 'Name')
                  
                  # Рассчитываем стандартное отклонение по каждой строке
                  .agg(stddev_samp('Rating').alias('stddev'))
                  
                  # Сортируем по убыванию стандартного отклонения и выбираем топ 10
                  .orderBy(col('stddev').desc())
                  .limit(10)
)

filtered_books.show()




+-------+--------------------+------------------+
|     Id|                Name|            stddev|
+-------+--------------------+------------------+
|4593339|Ο Χάρι Πότερ και ...|1884480.9762179346|
|3529641|Harry Potter og D...| 1857551.557211347|
|3484606|Harry Potter e a ...|1856879.5144942228|
|3102821|Harry Potter and ...|1850987.2240634726|
|2200812|Harry Potter och ...|1833126.7167265879|
|1990311|Harry Potter och ...|1824451.7246133152|
|1907452|Harry Potter e a ...|1822858.0094387494|
|1885731|Harry Potter and ...|1815587.3930200385|
|1811146|Harry Potter i Ka...|1814183.3459608485|
|1726635|Harri Potter maen...|1811594.8417798886|
+-------+--------------------+------------------+



                                                                                

Специально нашла данные по первым двум книгам из этого странного результирующего списка, чтобы убедиться в правильности расчетов и вручную посчитала std:

ID 4593339
Данные по оценкам: 5:4608992	4:1621963	3:603633	2:140565	1:119534

ID 3529641
Данные по оценкам: 5:4543188	4:1600072	3:596615	2:138407	1:117311


In [55]:
import statistics

data_1 = {5: 4608992, 4: 1621963, 3: 603633, 2: 140565, 1: 119534}
data_2 = {5:4543188, 4:1600072, 3:596615, 2:138407, 1:117311}
std_dev_1 = statistics.stdev(data_1.values())
std_dev_2 = statistics.stdev(data_2.values())
print(f'Для ID 4593339 стандартное отклонение = {std_dev_1}')
print(f'Для ID 3529641 стандартное отклонение = {std_dev_2}')

Для ID 4593339 стандартное отклонение = 1884480.9762179346
Для ID 3529641 стандартное отклонение = 1857551.557211347


С расчетами выше совпало, так что будем считать с технической стороны все правильно. На счет выбросов, не думаю, что тут есть смысл их смотреть, так как именно в этих фичах разброс слишком большой и неочевидный для идентификации выбросов.

Да и весь Гирри Поттер иначе может улететь, уж слишком он крут.

#### e) Любой интересный инсайт из данных

#### Средний рейтинг авторов с наибольшим числом книг

Значения null и выбросы (значения больше 10 и меньше 0) были проигнорированы в расчете общего числа книг, так как интересовал именно средний рейтинг этих книг

In [54]:
from pyspark.sql.functions import desc, avg, count, col


top_authors = (spark_df.select('Authors', 'Rating')
               
                       # Фильтруем значения от 0 до 10, убирая выбросы
                      .filter(col('Rating').isNotNull() & (col('Rating') > 0) & (col('Rating') <= 10))
               
                       # Группируем по автору и считаем средний рейтинг
                      .groupBy('Authors') 
                      .agg(avg('Rating').alias('avg_rating'), count('*').alias('book_count')) )

# Сортируем по убыванию количества книг и выбираем топ 10
top_authors = top_authors.orderBy(desc('book_count')).limit(10)

top_authors.show()



+--------------------+------------------+----------+
|             Authors|        avg_rating|book_count|
+--------------------+------------------+----------+
|           Anonymous| 4.260478767693589|      2402|
| William Shakespeare|3.8776038575667644|      1348|
|             Unknown| 3.753079268292683|       984|
|     Francine Pascal|3.5652150537634415|       930|
|     Agatha Christie|3.8879661016949147|       885|
|Fodor's Travel Pu...|3.7324803149606307|       762|
|        Harold Bloom|3.8629234972677593|       732|
|        Isaac Asimov| 3.986965317919076|       692|
|       Carolyn Keene| 3.777635239567233|       647|
|        Nora Roberts|3.9833384853168465|       647|
+--------------------+------------------+----------+



                                                                                

# Блок 3. Spark Streaming

В задании предлагается реализовать расчет среднего рейтинга книги на Spark Streaming со
следующими условиями (30 баллов):
- использовать данные user_rating как ﬁle source
- использовать ﬁle sink в формате parquet

In [79]:
from pyspark.sql.functions import when, col, avg
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession

In [81]:
# Рядом с данными book также лежат данные user_rating, отобразим их
if response.get("FileStatuses"):
    files_data = response["FileStatuses"].get("FileStatus")
    users = [file.get("pathSuffix") for file in files_data if 'user_rating' in file.get("pathSuffix")]
print(*users, sep = ', ')

user_rating_0_to_1000.csv, user_rating_1000_to_2000.csv, user_rating_2000_to_3000.csv, user_rating_3000_to_4000.csv, user_rating_4000_to_5000.csv, user_rating_5000_to_6000.csv, user_rating_6000_to_11000.csv


In [156]:
# Определяем схему данных
schema = StructType([
    StructField("ID", StringType()),
    StructField("Name", StringType()),
    StructField("Rating", StringType()),
    StructField("timestamp", TimestampType())
])

# Читаем все файлы из папки
streamingDataFrame = None
for file in users:
    if streamingDataFrame is None:
        # Если это первый файл, то создаем потоковый DataFrame
        streamingDataFrame = (
            spark.readStream
            .option("header", True)
            .option("escape", "\\")
            .option("maxFilesPerTrigger", 1)
            .option("escape", '"')
            .schema(schema)
            .csv(f"hdfs://hadoop-namenode:9000/upload/{file}")
        )
    else:
        # Иначе добавляем следующий DataFrame в уже существующий потоковый DataFrame
        nextDataFrame = (
            spark.readStream
            .option("header", True)
            .option("escape", "\\")
            .option("maxFilesPerTrigger", 1)
            .option("escape", '"')
            .schema(schema)
            .csv(f"hdfs://hadoop-namenode:9000/upload/{file}")
        )
        streamingDataFrame = streamingDataFrame.union(nextDataFrame)

# Создаем новую колонку с числовыми значениями рейтинга
spark_df = streamingDataFrame.withColumn('NumericalRating', when(col('Rating') == 'did not like it', 1)\
                                           .when(col('Rating') == 'it was ok', 2)\
                                           .when(col('Rating') == 'liked it', 3)\
                                           .when(col('Rating') == 'really liked it', 4)\
                                           .when(col('Rating') == 'it was amazing', 5)\
                                           .otherwise(None))

# Игнорируем строки со значением "This user doesn't have any rating"
spark_df = spark_df.filter(col('NumericalRating').isNotNull())

# Добавляем watermark на основе временной метки
spark_df_with_watermark = spark_df.withWatermark("timestamp", "10 minutes")

# Считаем среднее значение новой колонки по книгам
avg_rating_by_book = spark_df_with_watermark \
                     .groupBy('Name', window('timestamp', '10 minutes')) \
                     .agg({'NumericalRating':'avg'})


# Создаем Query
query = (avg_rating_by_book
         .writeStream
         .format("parquet")
         .option("checkpointLocation", "hdfs://hadoop-namenode:9000/checkpoints")
         .option("path", "hdfs://hadoop-namenode:9000/output")
         .outputMode("append")
         .start()
         .awaitTermination())


На всякий случай, оставлю отдельно то, что выведет код запроса:

In [75]:
# Создаем новую колонку с числовыми значениями рейтинга
spark_df = spark_df.withColumn('NumericalRating', when(col('Rating') == 'did not like it', 1)\
    .when(col('Rating') == 'it was ok', 2)\
    .when(col('Rating') == 'liked it', 3)\
    .when(col('Rating') == 'really liked it', 4)\
    .when(col('Rating') == 'it was amazing', 5)\
    .otherwise(None))

# Игнорируем строки со значением "This user doesn't have any rating"
spark_df = spark_df.filter(col('NumericalRating').isNotNull())

# Считаем среднее значение новой колонки по книгам
avg_rating_by_book = spark_df.groupBy('Name').agg({'NumericalRating':'avg'})
avg_rating_by_book.show()




+--------------------+--------------------+
|                Name|avg(NumericalRating)|
+--------------------+--------------------+
|    Babar and Zephir|                 3.5|
|      The Blue Aspic|                 3.0|
|      Doctor Faustus|   2.761904761904762|
|The Girl Who Play...|                3.75|
|The Church of Sol...|                 3.0|
|The Dream Life of...|  3.3333333333333335|
|90 Below: Or, Wha...|                 3.0|
|The Vice Guide to...|  2.3333333333333335|
|Eleanor Oliphant ...|  3.2698412698412698|
|Dawn of Wonder (T...|                 2.5|
|Witches Abroad (D...|               3.125|
|Sourcery (Discwor...|              3.0625|
|The Moonshot Effe...|                 1.0|
|La Gloire de mon ...|                 4.0|
|    Only Revolutions|                 2.0|
|The Berlin Storie...|  3.2222222222222223|
|Career of Evil (C...|   2.851063829787234|
|Before I Go to Sleep|  2.4054054054054053|
|The Jefferson Bib...|                 3.0|
|Principles of Con...|          

                                                                                