## Лабораторная работа № 1 
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

### Часть 1

В данной части работы рассмотрены:
* загрузка данных из HDFS;
* базовые преобразования данных;
* загрузка преобразованных данных в таблицу `Apache Airflow`.

Подключим необходимые библиотеки.

In [12]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

Сформируем объект конфигурации для `Apache Spark`, указав необходимые параметры.

In [13]:
    def create_spark_configuration() -> SparkConf:
        """
        Создает и конфигурирует экземпляр SparkConf для приложения Spark.

        Returns:
            SparkConf: Настроенный экземпляр SparkConf.
        """
        # Получаем имя пользователя
        user_name = os.getenv("USER")
        
        conf = SparkConf()
        conf.setAppName("lab 1 Test")
        conf.setMaster("yarn")
        conf.set("spark.submit.deployMode", "client")
        conf.set("spark.executor.memory", "12g")
        conf.set("spark.executor.cores", "8")
        conf.set("spark.executor.instances", "2")
        conf.set("spark.driver.memory", "4g")
        conf.set("spark.driver.cores", "2")
        conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
        conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
        conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
        conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
        conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

        return conf

Создаём сам объект конфигурации.

In [14]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. В процессе создания сессии происходит подключение к кластеру `Apache Hadoop`, что может занять некоторое время.

In [15]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

24/10/19 20:02:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Для исследования будем использовать датасет `"Flight Prices"`, расположенный на платформе `Kaggle` по адресу https://www.kaggle.com/datasets/dilwong/flightprices.

Данный датасет уже загружен в `HDFS` по адресу: `hdfs:///user/user3/andreev-directory/itineraries.csv`

Указываем путь в `HDFS` для файла с данными.

In [16]:
path = "hdfs:///user/user3/andreev-directory/itineraries.csv"

Заполняем датафрейм данными из файла.

In [17]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

Выводим фрагмент датафрейма на экран.

In [18]:
df.show()

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segments

Очевидно, что в целях сохранения ясности изложения и сокращения расчетного времени имеет смысл рассматривать не все столбцы датасета. Оставим следующие колонки, удалив остальные:

| Название столбца  | Расшифровка |
| ------------- | ------------- |
| legId             | Идентификационный номер рейса  |
| startingAirport   | Аэропорт вылета  |
| destinationAirport | Аэропорт назначения |
| isRefundable      | Признак возврата билета |
| baseFare     | Базовая цена билета (без сборов и налогов) |
| totalFare         | Общая стоимость билета (включая сборы и налоги) |
| segmentsDepartureTimeEpochSeconds      | Время вылета сегмента рейса в формате Unix Epoch (секунды с 1970 года) |
| segmentsAirlineName            | Название авиакомпании |
| segmentsEquipmentDescription         | Описание самолета, выполняющего сегмент рейса |
| segmentsDurationInSeconds     | Длительность сегмента рейса в секундах |
| segmentsDistance   | Расстояние сегмента рейса |
| segmentsCabinCode           | Код класса обслуживания |

In [19]:
df = df.select(
    "legId", "startingAirport", "destinationAirport", "isRefundable",
     "baseFare", "totalFare", "segmentsDepartureTimeEpochSeconds", "segmentsAirlineName",
     "segmentsEquipmentDescription", "segmentsDurationInSeconds", "segmentsDistance",
     "segmentsCabinCode"
)

In [20]:
df.show()

+--------------------+---------------+------------------+------------+--------+---------+---------------------------------+--------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|startingAirport|destinationAirport|isRefundable|baseFare|totalFare|segmentsDepartureTimeEpochSeconds| segmentsAirlineName|segmentsEquipmentDescription|segmentsDurationInSeconds|segmentsDistance|segmentsCabinCode|
+--------------------+---------------+------------------+------------+--------+---------+---------------------------------+--------------------+----------------------------+-------------------------+----------------+-----------------+
|9ca0e81111c683bec...|            ATL|               BOS|       False|  217.67|   248.60|                       1650214620|               Delta|                 Airbus A321|                     8940|             947|            coach|
|98685953630e772a0...|            ATL|               BOS|   

Выведем на экран метаданные датасета.

In [21]:
df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- isRefundable: string (nullable = true)
 |-- baseFare: string (nullable = true)
 |-- totalFare: string (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsAirlineName: string (nullable = true)
 |-- segmentsEquipmentDescription: string (nullable = true)
 |-- segmentsDurationInSeconds: string (nullable = true)
 |-- segmentsDistance: string (nullable = true)
 |-- segmentsCabinCode: string (nullable = true)



Видно, что все столбцы датасета содержат строковый тип данных, что не соответствует ожиданиям. Выполним преобразования типов данных некоторых столбцов.

In [22]:
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("isRefundable",
                           col("isRefundable").cast("Boolean"))
    data = data.withColumn("baseFare",
                           col("baseFare").cast("Float"))
    data = data.withColumn("totalFare",
                           col("totalFare").cast("Float"))
    data = data.withColumn("segmentsDepartureTimeEpochSeconds",
                           col("segmentsDepartureTimeEpochSeconds").cast("Integer"))

    return data

In [23]:
df = transform_dataframe(df)

In [24]:
df.show()

+--------------------+---------------+------------------+------------+--------+---------+---------------------------------+--------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|startingAirport|destinationAirport|isRefundable|baseFare|totalFare|segmentsDepartureTimeEpochSeconds| segmentsAirlineName|segmentsEquipmentDescription|segmentsDurationInSeconds|segmentsDistance|segmentsCabinCode|
+--------------------+---------------+------------------+------------+--------+---------+---------------------------------+--------------------+----------------------------+-------------------------+----------------+-----------------+
|9ca0e81111c683bec...|            ATL|               BOS|       false|  217.67|    248.6|                       1650214620|               Delta|                 Airbus A321|                     8940|             947|            coach|
|98685953630e772a0...|            ATL|               BOS|   

In [25]:
df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- baseFare: float (nullable = true)
 |-- totalFare: float (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: integer (nullable = true)
 |-- segmentsAirlineName: string (nullable = true)
 |-- segmentsEquipmentDescription: string (nullable = true)
 |-- segmentsDurationInSeconds: string (nullable = true)
 |-- segmentsDistance: string (nullable = true)
 |-- segmentsCabinCode: string (nullable = true)



Видно, что теперь столбцы датафрейма содержат значения корректных типов.

Полученный датафрейм сохраним для дальнейшего использования. Сохранение выполним в таблицу `Apache Iceberg`. 

`Apache Iceberg` — это поддерживающий высокую производительность табличный формат для больших данных.

Сначала создадим базу данных, в которой будет расположена таблица. Во избежание путаницы, **каждую таблицу следует называть с использованием фамилии студента**.

In [26]:
database_name = "andreev_database_new"

Создадим инструкцию SQL для добавления базы данных в каталог `Apache Spark`.

In [27]:
create_database_sql = f"""
CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}
"""

In [28]:
spark.sql(create_database_sql)

DataFrame[]

Установим созданную базу данных как текущую.

In [29]:
spark.catalog.setCurrentDatabase(database_name)

И, наконец, записываем преобразованный датафрейм в таблицу `sobd_lab1_table`.

In [30]:
# Сохранение DataFrame в виде таблицы
df.writeTo("sobd_lab1_table").using("iceberg").create()

                                                                                

После успешной записи можно посмотреть, какие таблицы входят в базу данных.

In [31]:
for table in spark.catalog.listTables():
    print(table.name)

sobd_lab1_table


Обратите внимание, что при необходимости созданные базу данных и таблицу можно удалить следующими командами.

In [32]:
#spark.sql("DROP TABLE spark_catalog.andreev_database_new.sobd_lab1_table")
#spark.sql("DROP DATABASE spark_catalog.andreev_database_new")

После успешной записи таблицы останавливаем сессию `Apache Spark`.

In [33]:
spark.stop()