In [None]:
# Условие: создайте csv файл с таким содержимым:
# title,author,genre,sales,year
# "1984", "George Orwell", "Science Fiction", 5000, 1949
# "The Lord of the Rings", "J.R.R. Tolkien", "Fantasy", 3000, 1954
# "To Kill a Mockingbird", "Harper Lee", "Southern Gothic", 4000, 1960
# "The Catcher in the Rye", "J.D. Salinger", "Novel", 2000, 1951
# "The Great Gatsby", "F. Scott Fitzgerald", "Novel", 4500, 1925

# Задание:

# — Используя Spark прочитайте данные из файла csv.
# — Фильтруйте данные, чтобы оставить только книги, продажи которых превышают 3000 экземпляров.
# — Сгруппируйте данные по жанру и вычислите общий объем продаж для каждого жанра.
# — Отсортируйте данные по общему объему продаж в порядке убывания.
# — Выведите результаты на экран.

In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F

In [5]:
spark = SparkSession.builder.appName('Seminar#5').getOrCreate()

In [14]:
# Создадим csv файл через Spark
data =[("1984", "George Orwell", "Science Fiction", 5000, 1949),
       ("The Lord of the Rings", "J.R.R. Tolkien", "Fantasy", 3000, 1954),
       ("To Kill a Mockingbird", "Harper Lee", "Southern Gothic", 4000, 1960),
       ("The Catcher in the Rye", "J.D. Salinger", "Novel", 2000, 1951),
       ("The Great Gatsby", "F. Scott Fitzgerald", "Novel", 4500, 1925)]
# https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html
# title,author,genre,sales,year
schema=StructType([
    StructField('title', StringType(), True),\
    StructField('author', StringType(), True),\
    StructField('genre', StringType(), True),\
    StructField('sales', IntegerType(), True),\
    StructField('year', IntegerType(), True),\
    ])
df = spark.createDataFrame(data, schema=schema)

In [15]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- sales: integer (nullable = true)
 |-- year: integer (nullable = true)



In [44]:
df.repartition(1).write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('./de_spark_intro_s5_hw.csv')
# df.write.csv("./de_spark_intro_s5_hw.csv", header=True)

In [50]:
# Немного неожиданное поведение: движок Spark сохраняем csv, как каталог, а не как файл.
# Чтобы не было двух файлов, добавил repartition(1) при сохранении
!ls -l ./de_spark_intro_s5_hw.csv

total 4
-rw-r--r-- 1 root root   0 Apr 28 19:08 _SUCCESS
-rw-r--r-- 1 root root 295 Apr 28 19:08 part-00000-fc67dd90-4156-4275-8686-5b55c50e6b5c-c000.csv


In [51]:
!cat ./de_spark_intro_s5_hw.csv/part-00000-fc67dd90-4156-4275-8686-5b55c50e6b5c-c000.csv

title,author,genre,sales,year
1984,George Orwell,Science Fiction,5000,1949
The Lord of the Rings,J.R.R. Tolkien,Fantasy,3000,1954
To Kill a Mockingbird,Harper Lee,Southern Gothic,4000,1960
The Catcher in the Rye,J.D. Salinger,Novel,2000,1951
The Great Gatsby,F. Scott Fitzgerald,Novel,4500,1925


In [None]:
# — Используя Spark прочитайте данные из файла csv.

In [45]:
df1 = spark.read.csv("./de_spark_intro_s5_hw.csv", sep=",", inferSchema=True, header=True)

In [48]:
df1.show(truncate=False)

+----------------------+-------------------+---------------+-----+----+
|title                 |author             |genre          |sales|year|
+----------------------+-------------------+---------------+-----+----+
|1984                  |George Orwell      |Science Fiction|5000 |1949|
|The Lord of the Rings |J.R.R. Tolkien     |Fantasy        |3000 |1954|
|To Kill a Mockingbird |Harper Lee         |Southern Gothic|4000 |1960|
|The Catcher in the Rye|J.D. Salinger      |Novel          |2000 |1951|
|The Great Gatsby      |F. Scott Fitzgerald|Novel          |4500 |1925|
+----------------------+-------------------+---------------+-----+----+



In [60]:
df1.printSchema()

root
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- sales: integer (nullable = true)
 |-- year: integer (nullable = true)



In [None]:
# — Фильтруйте данные, чтобы оставить только книги, продажи которых превышают 3000 экземпляров.

In [54]:
sales_treshold_min = 3_000
df1.filter(col('sales') > sales_treshold_min).show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|                1984|      George Orwell|Science Fiction| 5000|1949|
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
+--------------------+-------------------+---------------+-----+----+



In [71]:
# — Сгруппируйте данные по жанру и вычислите общий объем продаж для каждого жанра.
df1.groupBy('genre').agg(F.avg('sales').alias('sales_by_genre'), F.max('year').alias('max_year')).show()

+---------------+--------------+--------+
|          genre|sales_by_genre|max_year|
+---------------+--------------+--------+
|Southern Gothic|        4000.0|    1960|
|          Novel|        3250.0|    1951|
|        Fantasy|        3000.0|    1954|
|Science Fiction|        5000.0|    1949|
+---------------+--------------+--------+



In [75]:
# — Отсортируйте данные по общему объему продаж в порядке убывания.
df1.sort(col('sales').desc()).show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|                1984|      George Orwell|Science Fiction| 5000|1949|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|The Lord of the R...|     J.R.R. Tolkien|        Fantasy| 3000|1954|
|The Catcher in th...|      J.D. Salinger|          Novel| 2000|1951|
+--------------------+-------------------+---------------+-----+----+

