Создание Spark сессии

In [None]:
import findspark
findspark.init('/home/su/spark-3.5.0-bin-hadoop3')

import pyspark
from pyspark.sql import SparkSession 


print("<<---***--- START ---***--->>")

spark = (SparkSession
 .builder
 .appName('pyspark_example')
 .enableHiveSupport()
 .getOrCreate())

TASK 1
Сгенерировать DataFrame из трёх колонок (row_id, discipline, season) - олимпийские 
дисциплины по сезонам. 
 row_id - число порядкового номера строки;
 discipline - наименование олимпийский дисциплины на английском (полностью маленькии 
буквами; 
 season - сезон дисциплины (summer / winer); 
*Укажите не менее чем по 5 дисциплин для каждого с
зона. 
Сохраните DataFrame в csv-файл, разделитель колонок табуляция, первая строка должна 
содержать название колонок. 
Данные должны быть сохранены в виде 1 csv-файла а не множества маленьких.

In [None]:
# Генерация DataFrame
rows = [
    (1, "skiing", "winter"),
    (2, "ice hockey", "winter"),
    (3, "snowboarding", "winter"),
    (4, "figure skating", "winter"),
    (5, "biathlon", "winter"),
    (6, "swimming", "summer"),
    (7, "gymnastics", "summer"),
    (8, "cycling", "summer"),
    (9, "diving", "summer"),
    (10, "beach volleyball", "summer")
]

schema = "row_id BIGINT, discipline STRING, season STRING"
df_ol = spark.createDataFrame(rows, schema)
df_ol.show()

In [None]:
# Объединяем все данные в один RDD
coalesced_df = df_ol.coalesce(1)
# Сохраняем DataFrame в файл CSV
coalesced_df.write \
            .option("header", "true") \
            .option("delimiter", "\t") \
            .option("inferSchema", "true") \
            .csv("data", mode="overwrite")

In [None]:
# Читаем файл для проверки что все в порядке

df_ol = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("data/")
)

# Вывод схемы DataFrame
df_ol.printSchema()

# Вывод первых строк DataFrame
df_ol.show(truncate=False)

TASK 2
Прочитайте исходный файл "Athletes.csv". 
Посчитайте в разрезе дисциплин сколько всего спортсменов в каждой из дисциплин 
принимало участие. 
Результат сохраните в формате parquet.

In [None]:
# Чтение данных из CSV-файла в DataFrame
df_atl = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", ";")
    .load("source/")
)

# Вывод схемы DataFrame
df_atl.printSchema()

# Вывод первых 5 строк DataFrame
df_atl.show(5)

In [None]:
# подсчет в разрезе дисциплин сколько всего спортсменов в каждой из дисциплин принимало участие
# Создание временного представления для DataFrame
df_atl.createOrReplaceTempView("athletes")

df = spark.sql("""
    SELECT
        Discipline,
        COUNT(name) AS athletes
    FROM athletes
    GROUP BY Discipline;
""")

# Вывод первых 10 строк DataFrame с учетом длинных строк
df.show(10, truncate=False)

In [None]:
# сохранение результата в формате parquet
# Запись DataFrame в формате Parquet с указанием сжатия GZIP и режима "overwrite"
df.write.format("parquet") \
        .option("compression", "gzip") \
        .mode("overwrite") \
        .save("data_out/")

# Чтение данных из Parquet-файла для проверки
spark.read.format("parquet") \
          .option("compression", "gzip") \
          .load("data_out/") \
          .show(truncate=False)

TASK 3
Прочитайте исходный файл "Athletes.csv". 
Посчитайте в разрезе дисциплин сколько всего спортсменов в каждой из дисциплин 
принимало участие. 
Получившийся результат нужно объединить с сгенерированным вами DataFrame из 1-го 
задания и в итоге вывести количество участников, только по тем дисциплинам, что есть в 
вашем сгенерированном DataFrame. 
Результат сохраните в формате paquet. 

In [None]:
# Чтение исходного файла "Athletes.csv".
df_atl = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", ";")
    .load("source/")
)

df_ol = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("data/")
)


# Вывод схемы DataFrame
df_atl.printSchema()
df_ol.printSchema()

# Вывод первых 5 строк DataFrame
df_atl.show(5)
df_ol.show(5)

In [None]:
# подсчет в разрезе дисциплин сколько всего спортсменов в каждой из дисциплин принимало участие
# Создание временного представления для DataFrame
df_atl.createOrReplaceTempView("athletes")
df_ol.createOrReplaceTempView("olimpic")

df = spark.sql("""
    SELECT
        Discipline,
        COUNT(name) AS athletes
    FROM athletes
    GROUP BY Discipline
    HAVING Discipline in (SELECT INITCAP(discipline) FROM olimpic);
""")

# Вывод первых 10 строк DataFrame с учетом длинных строк
df.show(10, truncate=False)

In [None]:
# сохранение результата в формате parquet
# Запись DataFrame в формате Parquet с указанием сжатия GZIP и режима "overwrite"
df.write.format("parquet") \
        .option("compression", "gzip") \
        .mode("overwrite") \
        .save("result/")

# Чтение данных из Parquet-файла для проверки
spark.read.format("parquet") \
          .option("compression", "gzip") \
          .load("result/") \
          .show(truncate=False)