## Лабораторная работа № 1 
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

### Часть 1

В данной части работы рассмотрены:
* загрузка данных из HDFS;
* базовые преобразования данных;
* загрузка преобразованных данных в таблицу `Apache Airflow`.

Подключим необходимые библиотеки.

In [1]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

Сформируем объект конфигурации для `Apache Spark`, указав необходимые параметры.

In [2]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("yarn")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

Создаём сам объект конфигурации.

In [3]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. В процессе создания сессии происходит подключение к кластеру `Apache Hadoop`, что может занять некоторое время.

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

:: loading settings :: url = jar:file:/opt/spark-3.5.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/user6/.ivy2/cache
The jars for the packages stored in: /home/user6/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8ccc41b6-2a4b-4ae4-8212-22bb90fa5881;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 in central
:: resolution report :: resolve 567ms :: artifacts dl 27ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spa

Для исследования будем использовать датасет `"Flight Prices"`, расположенный на платформе `Kaggle` по адресу https://www.kaggle.com/datasets/dilwong/flightprices.

Датасет содержит данные о ценах на авиабилеты, включая маршруты, авиакомпании, классы обслуживания и временные параметры. Он разрешен для использования в учебных целях.

Данный датасет уже загружен в `HDFS` по адресу: `hdfs:///user/user6/kachurin_directory/itineraries.csv`

Указываем путь в `HDFS` для файла с данными.

In [5]:

path = "hdfs:///user/user6/kachurin_directory/itineraries.csv"

Заполняем датафрейм данными из файла.

In [6]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

                                                                                

Выводим фрагмент датафрейма на экран.

In [7]:
df.show()

24/12/15 10:57:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segments

Очевидно, что в целях сохранения ясности изложения и сокращения расчетного времени имеет смысл рассматривать не все столбцы датасета. Оставим следующие колонки, удалив остальные:

| Название столбца            | Расшифровка                                                                 |
| --------------------------- | --------------------------------------------------------------------------- |
| legId                       | Идентификатор рейса                                                        |
| flightDate                  | Дата вылета (YYYY-MM-DD)                                                   |
| startingAirport             | Код аэропорта вылета (IATA)                                               |
| destinationAirport          | Код аэропорта прилета (IATA)                                              |
| isBasicEconomy              | Является ли билет экономичным (логическое значение)                       |
| isRefundable                | Является ли билет возвратным (логическое значение)                        |
| isNonStop                   | Является ли рейс прямым (логическое значение)                             |
| baseFare                    | Базовая стоимость билета (в долларах США)                                 |
| totalFare                   | Полная стоимость билета с учетом налогов и сборов (в долларах США)        |
| seatsRemaining              | Количество оставшихся мест                                                |
| totalTravelDistance         | Общая дистанция путешествия (в милях)                                     |


In [8]:
df = df.select(
    "legId", "flightDate", "startingAirport", "destinationAirport",
    "isBasicEconomy", "isRefundable", "isNonStop", "baseFare", "totalFare",
    "seatsRemaining", "totalTravelDistance"
)


In [9]:
df.show()

+--------------------+----------+---------------+------------------+--------------+------------+---------+--------+---------+--------------+-------------------+
|               legId|flightDate|startingAirport|destinationAirport|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|
+--------------------+----------+---------------+------------------+--------------+------------+---------+--------+---------+--------------+-------------------+
|9ca0e81111c683bec...|2022-04-17|            ATL|               BOS|         False|       False|     True|  217.67|   248.60|             9|                947|
|98685953630e772a0...|2022-04-17|            ATL|               BOS|         False|       False|     True|  217.67|   248.60|             4|                947|
|98d90cbc32bfbb05c...|2022-04-17|            ATL|               BOS|         False|       False|     True|  217.67|   248.60|             9|                947|
|969a269d38eae583f...|2022-04-17| 

                                                                                

Выведем на экран метаданные датасета.

In [10]:
df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- flightDate: string (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- isBasicEconomy: string (nullable = true)
 |-- isRefundable: string (nullable = true)
 |-- isNonStop: string (nullable = true)
 |-- baseFare: string (nullable = true)
 |-- totalFare: string (nullable = true)
 |-- seatsRemaining: string (nullable = true)
 |-- totalTravelDistance: string (nullable = true)



Видно, что все столбцы датасета содержат строковый тип данных, что не соответствует ожиданиям. Выполним преобразования типов данных некоторых столбцов.

In [11]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, to_date

def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования, включая обработку дат.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразование логических столбцов
    data = data.withColumn("isBasicEconomy", col("isBasicEconomy").cast("Boolean"))
    data = data.withColumn("isRefundable", col("isRefundable").cast("Boolean"))
    data = data.withColumn("isNonStop", col("isNonStop").cast("Boolean"))
    
    # Преобразование числовых столбцов
    data = data.withColumn("baseFare", col("baseFare").cast("Float"))
    data = data.withColumn("totalFare", col("totalFare").cast("Float"))
    data = data.withColumn("seatsRemaining", col("seatsRemaining").cast("Integer"))
    data = data.withColumn("totalTravelDistance", col("totalTravelDistance").cast("Float"))
    
    # Преобразование строк с датами в формат Date (формат по умолчанию - yyyy-MM-dd)
    data = data.withColumn("flightDate", to_date(col("flightDate")))
    
    return data


In [12]:
df = transform_dataframe(df)

In [13]:
df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+----------+---------------+------------------+--------------+------------+---------+--------+---------+--------------+-------------------+
|               legId|flightDate|startingAirport|destinationAirport|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|
+--------------------+----------+---------------+------------------+--------------+------------+---------+--------+---------+--------------+-------------------+
|9ca0e81111c683bec...|2022-04-17|            ATL|               BOS|         false|       false|     true|  217.67|    248.6|             9|              947.0|
|98685953630e772a0...|2022-04-17|            ATL|               BOS|         false|       false|     true|  217.67|    248.6|             4|              947.0|
|98d90cbc32bfbb05c...|2022-04-17|            ATL|               BOS|         false|       false|     true|  217.67|    248.6|             9|              947.0|
|969a269d38eae583f...|2022-04-17| 

                                                                                

In [14]:
df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: float (nullable = true)
 |-- totalFare: float (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: float (nullable = true)



Видно, что теперь столбцы датафрейма содержат значения корректных типов.

Полученный датафрейм сохраним для дальнейшего использования. Сохранение выполним в таблицу `Apache Iceberg`. 

`Apache Iceberg` — это поддерживающий высокую производительность табличный формат для больших данных.

Сначала создадим базу данных, в которой будет расположена таблица. Во избежание путаницы, **каждую таблицу следует называть с использованием фамилии студента**.

In [15]:
database_name = "kachurin_database"

Создадим инструкцию SQL для добавления базы данных в каталог `Apache Spark`.

In [16]:
create_database_sql = f"""
CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}
"""

In [17]:
spark.sql(create_database_sql)

DataFrame[]

Установим созданную базу данных как текущую.

In [18]:
spark.catalog.setCurrentDatabase(database_name)

И, наконец, записываем преобразованный датафрейм в таблицу `sobd_lab1_table`.

In [19]:
# Сохранение DataFrame в виде таблицы
df.writeTo("sobd_lab1_table_1").using("iceberg").create()

                                                                                

После успешной записи можно посмотреть, какие таблицы входят в базу данных.

In [20]:
for table in spark.catalog.listTables():
    print(table.name)

sobd_lab1_table_1


Обратите внимание, что при необходимости созданные базу данных и таблицу можно удалить следующими командами.

In [21]:
#spark.sql("DROP TABLE spark_catalog.kachurin_database.sobd_lab1_table_1")
#spark.sql("DROP DATABASE spark_catalog.kachurin_database")

После успешной записи таблицы останавливаем сессию `Apache Spark`.

In [22]:
spark.stop()