In [1]:
from pyspark.sql import SparkSession   
from pyspark.sql.functions import col, when
from pyspark.sql.types import StructType, StructField, \
  StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import year, round

spark = SparkSession.builder \
  .appName('read') \
  .getOrCreate()

book_schema = StructType([
  StructField('book_id', IntegerType(), True),
  StructField('title', StringType(), True),
  StructField('author_id', IntegerType(), True),
  StructField('genre', StringType(), True),
  StructField('price', FloatType(), True),
  StructField('publish_date', DateType(), True)
])

author_schema = StructType([
  StructField('author_id', IntegerType(), True),
  StructField('name', StringType(), True),
  StructField('birth_date', DateType(), True),
  StructField('country', StringType(), True)
])

df_books = spark.read.csv('books.csv', 
                    header=True, schema=book_schema)
df_authors = spark.read.csv('authors.csv', 
                    header=True, schema=author_schema)

"""
# Общая информация о наборах данных
print('Схемы данных:')
df_books.printSchema()
df_authors.printSchema()

# print('Первые пару записей:')
df_books.show(5)
df_authors.show(5)
"""
# Объединение набора
df_joined = (
    df_authors
        .join(df_books, df_books.author_id == df_authors.author_id, 'inner') 
        .orderBy(df_authors.author_id).drop(df_books.author_id)
)

# print('Объединенные записи:')
# df_joined.show(10)

# 1
print('Топ-5 авторов, книги которых принесли наибольшую выручку:')
(
    df_joined.groupBy('author_id') 
        .agg({'price': 'sum'})
        .withColumnRenamed('sum(price)', 'revenue')
        .withColumn('revenue', round(col('revenue'), 3))
        .orderBy(col('revenue').desc())
).show(5)

# 2
print('Количество книг в каждом жанре:')
(
    df_joined.groupBy('genre')
        .agg({'genre': 'count'}).withColumnRenamed('count(genre)', 'count') 
        .orderBy(col('count').desc())
).show()

# 3
print('Средняя цена книг по каждому автору')
(
    df_joined.groupBy('author_id') 
        .agg({'price': 'avg'}).withColumnRenamed('avg(price)', 'avg cost') 
        .withColumn('avg cost', round(col('avg cost'), 3)) 
        .orderBy(col('avg cost').desc())
).show()

# 4
print('Книги, опубликованные после 2000 года')
(
    df_joined.filter(year(col('publish_date')) > 2000) 
      .orderBy(col('price').desc())
)[['name', 'title', 'genre', 'price', 'publish_date']].show()

spark.stop()

Топ-5 авторов, книги которых принесли наибольшую выручку:
+---------+-------+
|author_id|revenue|
+---------+-------+
|        2| 231.97|
|        7| 132.66|
|        1| 111.86|
|        8| 107.16|
|        5|  88.83|
+---------+-------+
only showing top 5 rows

Количество книг в каждом жанре:
+-----------+-----+
|      genre|count|
+-----------+-----+
|Non-Fiction|    9|
|    Science|    3|
|    Fiction|    3|
|    Fantasy|    3|
|    Mystery|    2|
+-----------+-----+

Средняя цена книг по каждому автору
+---------+--------+
|author_id|avg cost|
+---------+--------+
|        5|   88.83|
|        4|    83.7|
|        2|  57.993|
|        9|   46.31|
|        7|   44.22|
|        6|  43.965|
|        1|  37.287|
|        8|   35.72|
|       10|  21.165|
+---------+--------+

Книги, опубликованные после 2000 года
+--------+-------+-----------+-----+------------+
|    name|  title|      genre|price|publish_date|
+--------+-------+-----------+-----+------------+
|Author_7|Book_20|    Myst