# **Spark Apache (Семинары)**
## **Урок 5. Spark on scala**

### Условие: создайте csv файл с таким содержимым:

title,author,genre,sales,year

"1984", "George Orwell", "Science Fiction", 5000, 1949

"The Lord of the Rings", "J.R.R. Tolkien", "Fantasy", 3000, 1954

"To Kill a Mockingbird", "Harper Lee", "Southern Gothic", 4000, 1960

"The Catcher in the Rye", "J.D. Salinger", "Novel", 2000, 1951

"The Great Gatsby", "F. Scott Fitzgerald", "Novel", 4500, 1925

### Задание:
— Используя Spark прочитайте данные из файла csv.

— Фильтруйте данные, чтобы оставить только книги, продажи которых превышают 3000 экземпляров.

— Сгруппируйте данные по жанру и вычислите общий объем продаж для каждого жанра.

— Отсортируйте данные по общему объему продаж в порядке убывания.

— Выведите результаты на экран.

In [123]:
! pip install pyspark >> /dev/null

In [124]:
# Установка требуемых модулей
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

In [125]:
data = [("1984", "George Orwell", "Science Fiction", 5000, 1949),
("The Lord of the Rings", "J.R.R. Tolkien", "Fantasy", 3000, 1954),
("To Kill a Mockingbird", "Harper Lee", "Southern Gothic", 4000, 1960),
("The Catcher in the Rye", "J.D. Salinger", "Novel", 2000, 1951),
("The Great Gatsby", "F. Scott Fitzgerald", "Novel", 4500, 1925)
]

data_schema = ['title', 'author', 'genre', 'sales', 'year']

In [126]:
# Установка сессии spark
spark = SparkSession.builder.appName('WriteCSV').getOrCreate()

# Создаем dataframe
df_data = spark.createDataFrame(data=data, schema = data_schema)

df_data.show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|                1984|      George Orwell|Science Fiction| 5000|1949|
|The Lord of the R...|     J.R.R. Tolkien|        Fantasy| 3000|1954|
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|The Catcher in th...|      J.D. Salinger|          Novel| 2000|1951|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
+--------------------+-------------------+---------------+-----+----+



In [127]:
# Записываем файл в папку 'data' на GoogleColab

df_data.repartition(1).write.csv('data', mode="overwrite", header=True)

spark.stop()

In [128]:
# Установка сессии spark
spark = SparkSession.builder.appName('Spark_ETL').getOrCreate()

# Читаем данные из файла
df = spark.read.format("csv").option("header",True) \
     .load(f"/content/data/{[f for f in os.listdir('/content/data/') if f[-4:] == '.csv'][0]}")

# Конвертируем данные string -> int
df = df.withColumn('sales',df['sales'].cast('int'))
df = df.withColumn('year',df['year'].cast('int'))

df.show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|                1984|      George Orwell|Science Fiction| 5000|1949|
|The Lord of the R...|     J.R.R. Tolkien|        Fantasy| 3000|1954|
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|The Catcher in th...|      J.D. Salinger|          Novel| 2000|1951|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
+--------------------+-------------------+---------------+-----+----+



In [129]:
# Фильтруем данные, чтобы оставить только книги, продажи которых превышают 3000 экземпляров.
df_sales = df.filter(F.col('sales') > 3000)
df_sales.show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|                1984|      George Orwell|Science Fiction| 5000|1949|
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
+--------------------+-------------------+---------------+-----+----+



In [130]:
# Сгруппируйте данные по жанру и вычислите общий объем продаж для каждого жанра.
df_group = df.groupBy('genre').sum('sales') \
             .withColumnRenamed('sum(sales)', 'total_sum')
df_group.show()

+---------------+---------+
|          genre|total_sum|
+---------------+---------+
|Southern Gothic|     4000|
|          Novel|     6500|
|        Fantasy|     3000|
|Science Fiction|     5000|
+---------------+---------+



In [131]:
# Отсортируйте данные по общему объему продаж в порядке убывания.
df_sort = df.sort(F.col('sales').desc())
df_sort.show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|                1984|      George Orwell|Science Fiction| 5000|1949|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|The Lord of the R...|     J.R.R. Tolkien|        Fantasy| 3000|1954|
|The Catcher in th...|      J.D. Salinger|          Novel| 2000|1951|
+--------------------+-------------------+---------------+-----+----+



In [132]:
spark.stop()