In [31]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import month
from pyspark.sql.functions import year
from pyspark.sql.functions import col
from pyspark.sql.functions import round

spark = SparkSession.builder \
.appName("read")  \
.getOrCreate()

"""
Данные
Таблица books:

book_id: ID книги
title: Название книги
author_id: ID автора
genre: Жанр книги
price: Цена книги
publish_date: Дата публикации (в формате YYYY-MM-DD)

Таблица authors:

author_id: ID автора
name: Имя автора
birth_date: Дата рождения автора (в формате YYYY-MM-DD)
country: Страна автора
"""
"""
2. Обработка данных:

Преобразуйте столбцы publish_date и birth_date в формат даты. +
3. Объединение данных:

Объедините таблицы books и authors по author_id.
4. Анализ данных:

a) Найдите топ-5 авторов, книги которых принесли наибольшую выручку. +
b) Найдите количество книг в каждом жанре. +
c) Подсчитайте среднюю цену книг по каждому автору.
d) Найдите книги, опубликованные после 2000 года, и отсортируйте их по цене.
5. Результаты:

Выведите результаты анализа в виде таблиц.
"""
df_books = spark.read.option("header","true").csv("/content/books.csv")
df_authors = spark.read.option("header","true").csv("/content/authors.csv")

#df_books.printSchema()
#df_authors.printSchema()

df_books_types = (df_books.withColumn("book_id", df_books["book_id"].cast('int'))
                 .withColumn("author_id", df_books["author_id"].cast('int'))
                 .withColumn("price", df_books["price"].cast('float'))
                 .withColumn("publish_date", df_books["publish_date"].cast('date'))
)
df_books_types = df_books_types.withColumn("publish_year", year("publish_date")).alias("publish_year")

df_authors_types = (df_authors.withColumn("author_id", df_authors["author_id"].cast('int'))
                 .withColumn("birth_date", df_authors["birth_date"].cast('date'))
)

#df_books_types.printSchema()
#df_authors_types.printSchema()

df = (df_books_types.join(df_authors_types, ['author_id']
                           , how="inner")
     .select(df_books_types.book_id, df_books_types.price, df_books_types.title, df_books_types.genre, df_books_types.publish_year
             , df_books_types.publish_date, df_books_types.publish_year, df_authors_types.author_id
             , df_authors_types.name, df_authors_types.country, df_authors_types.birth_date))
## a
df_max_vir = (df.groupBy("author_id","name").agg({"price": "sum"})
                      .withColumnRenamed("sum(price)", "revenue"))
df_max_vir_top5 = df_max_vir.select("author_id", "name", round("revenue",2).alias("revenue")).orderBy(col("revenue").desc())
df_max_vir_top5.show(5)
## b
df_cnt_books = (df.groupBy("genre").agg({"genre": "count"})
                      .withColumnRenamed("count(genre)", "count"))

df_cnt_books.orderBy(col("count").desc()).show()
## c
df_avg_price_by_author = (df.groupBy("author_id","name").agg({"price": "avg"})
                      .withColumnRenamed("avg(price)", "avg_price"))

df_avg_price_by_author.select("author_id", "name", round("avg_price", 2).alias("avg_price")).orderBy(col("avg_price").desc()).show()
## d

df_after_2000 = (df.filter(df.publish_year > 2000))

(df_after_2000.select("author_id", "book_id", "title", "genre", "price", "publish_date", "name", "birth_date", "country")
      .orderBy(col("price").desc()).show())
spark.stop()

+---------+--------+-------+
|author_id|    name|revenue|
+---------+--------+-------+
|        2|Author_2| 231.97|
|        7|Author_7| 132.66|
|        1|Author_1| 111.86|
|        8|Author_8| 107.16|
|        5|Author_5|  88.83|
+---------+--------+-------+
only showing top 5 rows

+-----------+-----+
|      genre|count|
+-----------+-----+
|Non-Fiction|    9|
|    Science|    3|
|    Fiction|    3|
|    Fantasy|    3|
|    Mystery|    2|
+-----------+-----+

+---------+---------+---------+
|author_id|     name|avg_price|
+---------+---------+---------+
|        5| Author_5|    88.83|
|        4| Author_4|     83.7|
|        2| Author_2|    57.99|
|        9| Author_9|    46.31|
|        7| Author_7|    44.22|
|        6| Author_6|    43.97|
|        1| Author_1|    37.29|
|        8| Author_8|    35.72|
|       10|Author_10|    21.17|
+---------+---------+---------+

+---------+-------+-------+-----------+-----+------------+--------+----------+---------+
|author_id|book_id|  title|