Условие: создайте csv файл с таким содержимым:

title,author,genre,sales,year

"1984", "George Orwell", "Science Fiction", 5000, 1949

"The Lord of the Rings", "J.R.R. Tolkien", "Fantasy", 3000, 1954

"To Kill a Mockingbird", "Harper Lee", "Southern Gothic", 4000, 1960

"The Catcher in the Rye", "J.D. Salinger", "Novel", 2000, 1951

"The Great Gatsby", "F. Scott Fitzgerald", "Novel", 4500, 1925

Задание:

— Используя Spark прочитайте данные из файла csv.
— Фильтруйте данные, чтобы оставить только книги, продажи которых превышают 3000 экземпляров.
— Сгруппируйте данные по жанру и вычислите общий объем продаж для каждого жанра.
— Отсортируйте данные по общему объему продаж в порядке убывания.
— Выведите результаты на экран.

In [1]:
!pip install pyspark >> None

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg,col
import pyspark.sql.functions as F
from pyspark.sql import Window

In [3]:
# Создание сессии Spark для сохранения датафрейма
sp = SparkSession.builder.appName("save_csv").getOrCreate()

In [4]:
# Загрузка данных о продажах книг
sales_data = [("1984", "George Orwell", "Science Fiction", 5000, 1949),
              ("The Lord of the Rings", "J.R.R. Tolkien", "Fantasy", 3000, 1954),
              ("To Kill a Mockingbird", "Harper Lee", "Southern Gothic", 4000, 1960),
              ("The Catcher in the Rye", "J.D. Salinger", "Novel", 2000, 1951),
              ("The Great Gatsby", "F. Scott Fitzgerald", "Novel", 4500, 1925)
]

sales_df = sp.createDataFrame(sales_data, ["title","author", "genre", "sales", "year"])

In [5]:
# Записываем датафрейм в csv
sales_df.write.csv("sales_df.csv", header=True)

In [6]:
# Создание сессии Spark для выполнения задания
spark = SparkSession.builder.appName("books_sales").getOrCreate()

In [7]:
# Считываем данные из csv
sales_df=spark.read.option('header', 'true').csv('sales_df.csv')
sales_df.show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|The Catcher in th...|      J.D. Salinger|          Novel| 2000|1951|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
|                1984|      George Orwell|Science Fiction| 5000|1949|
|The Lord of the R...|     J.R.R. Tolkien|        Fantasy| 3000|1954|
+--------------------+-------------------+---------------+-----+----+



In [8]:
# Фильтр данных, чтобы оставить только книги, продажи которых превышают 3000 экземпляров
filtered_df = sales_df.filter(col('sales')>3000)
filtered_df.show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
|                1984|      George Orwell|Science Fiction| 5000|1949|
+--------------------+-------------------+---------------+-----+----+



In [9]:
# Группировка данных по жанру с вычислением общего объема продаж для каждого жанра. Сортировка по общему объему продаж в порядке убывания
# Использован полный датафрейм, без учета фильтра. Если нужно было последовательно выполнить задания, то использовали бы датафрейм filtered_df.

sales_df=sales_df.groupBy('genre')\
  .agg(F.sum("sales").alias("sum_sales"))\
  .orderBy('sum_sales', ascending=False)

sales_df.show()

+---------------+---------+
|          genre|sum_sales|
+---------------+---------+
|          Novel|   6500.0|
|Science Fiction|   5000.0|
|Southern Gothic|   4000.0|
|        Fantasy|   3000.0|
+---------------+---------+

