## Лабораторная работа № 1 
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

### Часть 1

В данной части работы рассмотрены:
* загрузка данных из HDFS;
* базовые преобразования данных;
* загрузка преобразованных данных в таблицу `Apache Airflow`.

Подключим необходимые библиотеки.

In [1]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

Сформируем объект конфигурации для `Apache Spark`, указав необходимые параметры.

In [2]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("yarn")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

Создаём сам объект конфигурации.

In [3]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. В процессе создания сессии происходит подключение к кластеру `Apache Hadoop`, что может занять некоторое время.

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

:: loading settings :: url = jar:file:/opt/spark-3.5.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/user7/.ivy2/cache
The jars for the packages stored in: /home/user7/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-22650cad-606e-4662-ad44-b3fb9d7e432b;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 in central
:: resolution report :: resolve 658ms :: artifacts dl 25ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spa

Для исследования был выбран датасет `"NYC Yellow Taxi Trip Data"`, расположенный на платформе `Kaggle` по адресу https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data.

Датасет включает в себя информацию о более чем двенадцати миллионах поездок на такси в городе Нью-Йорк и его окрестностях за 2015-ый год.

Указываем путь в `HDFS` для файла с данными.

In [5]:
path = "hdfs:///user/user7/skobelin_dir/taxi_1.csv"

Заполняем датафрейм данными из файла.

In [6]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

                                                                                

Выводим фрагмент датафрейма на экран.

In [7]:
df.show()
# VendorID - id компании, в которой работал таксист
# RateCode - способ расчета суммы оплаты
# StoreAndForward - показывает, хранилась ли информация о поездке в памяти машины перед отправкой на сервер или нет

                                                                                

+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|   -73.993896484375|40.750110626220703|        

Расшифруем названия колонок:

| Название столбца  | Расшифровка |
| ------------- | ------------- |
| VendorID              | Id компании, в которой работал таксист  |
| tpep_pickup_datetime  | Дата и время начала поездки  |
| tpep_dropoff_datetime | Дата и вреям окончания поездки |
| passenger_count       | Количество пассажиров |
| trip_distance         | Расстояние (в милях) между начальной и конечной токами поездки |
| pickup_longitude      | Долгота на которой был подобран пассажир |
| pickup_latitude       | Широта на которой был подобран пассажир |
| RateCodeID            | Способ рассчета суммы оплаты |
| store_and_fwd_flag    | Показывает, хранилась ли информация о поездке в памяти машины перед отправкой на сервер или нет |
| dropoff_longitude     | Долгота на которой был высажен пассажир |
| dropoff_latitude      | Широта на которой был высажен пассажир |
| payment_type          | Способ оплаты |
| fare_amount           | Базовая стоимость поездки |
| extra                 | Доплата |
| mta_tax               | Налог |
| tip_amount            | Чаевые |
| tolls_amount          | Налог с длительных поездок |
| improvement_surcharge | Дополнительный сбор средств на улучшение качества услуг |
| total_amount          | Полная стоимость поездки |

Выведем на экран метаданные датасета.

In [8]:
df.count()

                                                                                

12748986

In [9]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RateCodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)



Видно, что все столбцы датасета содержат строковый тип данных, что не соответствует ожиданиям. Выполним преобразования типов данных некоторых столбцов.

In [10]:
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("VendorID",
                           col("VendorID").cast("Integer"))
    data = data.withColumn("passenger_count",
                           col("passenger_count").cast("Integer"))
    data = data.withColumn("RateCodeID",
                           col("RateCodeID").cast("Integer"))
    data = data.withColumn("payment_type",
                           col("payment_type").cast("Integer"))
    data = data.withColumn("trip_distance",
                           col("trip_distance").cast("Float"))
    data = data.withColumn("pickup_longitude",
                           col("pickup_longitude").cast("Float"))
    data = data.withColumn("pickup_latitude",
                           col("pickup_latitude").cast("Float"))
    data = data.withColumn("dropoff_longitude",
                           col("dropoff_longitude").cast("Float"))
    data = data.withColumn("dropoff_latitude",
                           col("dropoff_latitude").cast("Float"))
    data = data.withColumn("fare_amount",
                           col("fare_amount").cast("Float"))
    data = data.withColumn("extra",
                           col("extra").cast("Float"))
    data = data.withColumn("mta_tax",
                           col("mta_tax").cast("Float"))
    data = data.withColumn("tip_amount",
                           col("tip_amount").cast("Float"))
    data = data.withColumn("tolls_amount",
                           col("tolls_amount").cast("Float"))
    data = data.withColumn("improvement_surcharge",
                           col("improvement_surcharge").cast("Float"))
    data = data.withColumn("total_amount",
                           col("total_amount").cast("Float"))
    return data


In [11]:
df = transform_dataframe(df)

In [12]:
df.show()

[Stage 5:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|RateCodeID|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|        -73.9939|       40.75011|         1|                 N|       -73.974

                                                                                

In [13]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)



Видно, что теперь столбцы датафрейма содержат значения корректных типов.

Полученный датафрейм сохраним для дальнейшего использования. Сохранение выполним в таблицу `Apache Iceberg`. 

`Apache Iceberg` — это поддерживающий высокую производительность табличный формат для больших данных.

In [14]:
database_name = "skobelin_database"

Создадим инструкцию SQL для добавления базы данных в каталог `Apache Spark`.

In [15]:
create_database_sql = f"""
CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}
"""

In [16]:
spark.sql(create_database_sql)

DataFrame[]

Установим созданную базу данных как текущую.

In [18]:
spark.catalog.setCurrentDatabase(database_name)

И, наконец, записываем преобразованный датафрейм в таблицу `sobd_lab1_table`.

In [20]:
# Сохранение DataFrame в виде таблицы
df.writeTo("skobelin_lab1_table").using("iceberg").create()

AnalysisException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `skobelin_database`.`skobelin_lab1_table` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects.

После успешной записи можно посмотреть, какие таблицы входят в базу данных.

In [21]:
for table in spark.catalog.listTables():
    print(table.name)

skobelin_lab1_processed_table
skobelin_lab1_table


После успешной записи таблицы останавливаем сессию `Apache Spark`.

In [25]:
spark.stop()