## Лабораторная работа № 1 
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

### Часть 1

В данной части работы рассмотрены:
* загрузка данных из HDFS;
* базовые преобразования данных;
* загрузка преобразованных данных в таблицу `Apache Airflow`.

Подключим необходимые библиотеки.

In [1]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

Сформируем объект конфигурации для `Apache Spark`, указав необходимые параметры.

In [2]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("yarn")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

Создаём сам объект конфигурации.

In [3]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. В процессе создания сессии происходит подключение к кластеру `Apache Hadoop`, что может занять некоторое время.

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

:: loading settings :: url = jar:file:/opt/spark-3.5.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/user0/.ivy2/cache
The jars for the packages stored in: /home/user0/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-badb9122-c6d1-4b7d-bb9a-0f575197b402;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 in central
:: resolution report :: resolve 627ms :: artifacts dl 24ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spa

Для исследования будем использовать датасет `NYC Yellow Taxi Trip Data`, расположенный на платформе `Kaggle` по адресу 
https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data?select=yellow_tripdata_2016-03.csv
Датасет включает в себя информацию о поездках на такси.

Данный датасет уже загружен в `HDFS` по адресу: `hdfs:///user/user0/gysynin_dmitry_data/yellow_tripdata_2016-03.csv`

Указываем путь в `HDFS` для файла с данными.

In [None]:
# path = "hdfs:///datasets/used_cars_data.csv"
# hadoop fs -copyFromLocal /home/user0/gysynin_dmitry/SOBD/2024/lab1/datasets/yellow_tripdata_2015-01.csv /user/user0/gysynin_dmitry_data/
path = "hdfs:///user/user0/gysynin_dmitry_data/yellow_tripdata_2015-01.csv"


Заполняем датафрейм данными из файла.

In [6]:
df = (spark.read.format("csv")
      .option("header", "true")
      # .option("multiLine", "true")
      .option("separator", ",")
      .load(path)
)

                                                                                

Выводим фрагмент датафрейма на экран.

In [7]:
df.show()

                                                                                

+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|   -73.993896484375|40.750110626220703|        

Очевидно, что в целях сохранения ясности изложения и сокращения расчетного времени имеет смысл рассматривать не все солбцы датасета. Оставим следующие колонки, удалив остальные:


| Название столбца  | Расшифровка |
| ------------- | ------------- |
| VendorID | Код, указывающий на поставщика услуг TPEP, который предоставил запись. 1 - Creative Mobile Technologies 2- VeriFone Inc. |
| passenger_count | Количество пассажиров |
| trip_distance | Расстояние поездки |
| RateCodeID | Окончательный код тарифа, действующий в конце поездки. 1- Стандартный тариф 2- JFK 3- Ньюарк 4- Нассау или Вестчестер 5- Тариф по договоренности 6- Групповая поездка |
| store_and_fwd_flag | Этот флаг указывает, хранилась ли запись о поездке в памяти автомобиля перед отправкой поставщику, так называемый «store and forward», поскольку автомобиль не имел соединения с сервером. Y = поездка с сохранением и пересылкой N= не поездка с сохранением и пересылкой |
| payment_type | Цифровой код, обозначающий, каким образом пассажир оплатил поездку. 1- Кредитная карта 2- Наличные 3- Без оплаты 4-Спор 5- Неизвестно 6-Анулированная поездка |
| fare_amount | Тариф за время и расстояние, рассчитанный счетчиком. |
| extra | Различные доплаты и надбавки. В настоящее время сюда входят только сборы в размере 0,50 и 1 доллара в час пик и ночные сборы. |
| tip_amount | Сумма чаевых - Это поле автоматически заполняется для чаевых по кредитной карте. Чаевые наличными не учитываются. |
| tolls_amount | Общая сумма всех дорожных сборов, оплаченных в поездке. |
| total_amount | Общая сумма, взимаемая с пассажиров. Не включает денежные чаевые. |

In [8]:
df = df.select(
    "VendorID", "passenger_count", "trip_distance",
    "RateCodeID", "store_and_fwd_flag", "payment_type", 
    "fare_amount", "extra", "tip_amount",
    "tolls_amount", "total_amount"
)

In [9]:
df.show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------+---------------+-------------+----------+------------------+------------+-----------+-----+----------+------------+------------+
|VendorID|passenger_count|trip_distance|RateCodeID|store_and_fwd_flag|payment_type|fare_amount|extra|tip_amount|tolls_amount|total_amount|
+--------+---------------+-------------+----------+------------------+------------+-----------+-----+----------+------------+------------+
|       2|              1|         1.59|         1|                 N|           1|         12|    1|      3.25|           0|       17.05|
|       1|              1|         3.30|         1|                 N|           1|       14.5|  0.5|         2|           0|        17.8|
|       1|              1|         1.80|         1|                 N|           2|        9.5|  0.5|         0|           0|        10.8|
|       1|              1|          .50|         1|                 N|           2|        3.5|  0.5|         0|           0|         4.8|
|       1|              1| 

                                                                                

Выведем на экран метаданные датасета.

In [10]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RateCodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)



Видно, что все столбцы датасета содержат строковый тип данных, что не соответствует ожиданиям. Выполним преобразования типов данных некоторых столбцов.

In [11]:
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("VendorID",
                    col("VendorID").cast("Integer"))
    data = data.withColumn("passenger_count",
                    col("passenger_count").cast("Integer"))
    data = data.withColumn("RateCodeID",
                    col("RateCodeID").cast("Integer"))
    data = data.withColumn("payment_type",
                    col("payment_type").cast("Integer"))

    data = data.withColumn("trip_distance",
                    col("trip_distance").cast("Float"))
    data = data.withColumn("fare_amount",
                    col("fare_amount").cast("Float"))
    data = data.withColumn("extra",
                    col("extra").cast("Float"))
    data = data.withColumn("tip_amount",
                    col("tip_amount").cast("Float"))
    data = data.withColumn("tolls_amount",
                    col("tolls_amount").cast("Float"))
    data = data.withColumn("total_amount",
                    col("total_amount").cast("Float"))

    data = data.withColumn("store_and_fwd_flag",
                    col("store_and_fwd_flag").cast("Boolean"))

    return data

In [12]:
df = transform_dataframe(df)

In [13]:
df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------+---------------+-------------+----------+------------------+------------+-----------+-----+----------+------------+------------+
|VendorID|passenger_count|trip_distance|RateCodeID|store_and_fwd_flag|payment_type|fare_amount|extra|tip_amount|tolls_amount|total_amount|
+--------+---------------+-------------+----------+------------------+------------+-----------+-----+----------+------------+------------+
|       2|              1|         1.59|         1|             false|           1|       12.0|  1.0|      3.25|         0.0|       17.05|
|       1|              1|          3.3|         1|             false|           1|       14.5|  0.5|       2.0|         0.0|        17.8|
|       1|              1|          1.8|         1|             false|           2|        9.5|  0.5|       0.0|         0.0|        10.8|
|       1|              1|          0.5|         1|             false|           2|        3.5|  0.5|       0.0|         0.0|         4.8|
|       1|              1| 

                                                                                

In [14]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)



Видно, что теперь столбцы датафрейма содержат значения корректных типов.

Полученный датафрейм сохраним для дальнейшего использования. Сохранение выполним в таблицу `Apache Iceberg`. 

`Apache Iceberg` — это поддерживающий высокую производительность табличный формат для больших данных.

Сначала создадим базу данных, в которой будет расположена таблица. Во избежание путаницы, **каждую таблицу следует называть с использованием фамилии студента**.

In [16]:
database_name = "gysynin_dmitry_database"

Создадим инструкцию SQL для добавления базы данных в каталог `Apache Spark`.

In [17]:
create_database_sql = f"""
CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}
"""

In [18]:
spark.sql(create_database_sql)

DataFrame[]

Установим созданную базу данных как текущую.

In [19]:
spark.catalog.setCurrentDatabase(database_name)

И, наконец, записываем преобразованный датафрейм в таблицу `sobd_lab1_table_gysynin`.

In [20]:
# Сохранение DataFrame в виде таблицы
df.writeTo("sobd_lab1_table_gysynin").using("iceberg").create()

                                                                                

После успешной записи можно посмотреть, какие таблицы входят в базу данных.

In [21]:
for table in spark.catalog.listTables():
    print(table.name)

sobd_lab1_processed_table
sobd_lab1_table_gysynin


Обратите внимание, что при необходимости созданные базу данных и таблицу можно удалить следующими командами.

In [None]:
spark.sql("DROP TABLE spark_catalog.gysynin_dmitry_database.sobd_lab1_table_gysynin")
spark.sql("DROP DATABASE spark_catalog.gysynin_dmitry_database")

После успешной записи таблицы останавливаем сессию `Apache Spark`.

In [22]:
spark.stop()

/user/user0/gysynin_dmitry_data/