# Лабораторная работа № 1
Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark  
**Часть 1**

В данной части работы рассмотрены:

- загрузка данных из HDFS;
- базовые преобразования данных;
- загрузка преобразованных данных в таблиц Iceberg;у Apache

Подключим необходимые библиотеки.иблиотеки.й среды.й среды.

In [1]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

Сформируем объект конфигурации для Apache Spark, указав необходимые параметры.

In [4]:
def create_spark_configuration() -> SparkConf:
    """
    Создаёт и конфигурирует экземпляр SparkConf для приложения Spark.
    Адаптация для локального выполнения в docker (local[*]) с доступом к HDFS.
    """
    # В контейнере обычно имя пользователя 'jovyan', но используем переменную окружения
    user_name = "jovyan"
    
    conf = SparkConf()
    # В локальной среде используем local[*] вместо yarn
    conf.setAppName("lab 1 Test")
    conf.setMaster("local[*]")
    conf.set("spark.sql.adaptive.enabled", "true")
    
    # Память/ядра — разумные значения для локального контейнера (можно подстраивать)
    conf.set("spark.executor.memory", "6g")
    conf.set("spark.executor.cores", "2")
    conf.set("spark.executor.instances", "1")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "1")

    # Настройка HDFS (указываем на сервис внутри docker-compose)
    conf.set("spark.hadoop.fs.defaultFS", "hdfs://hadoop-namenode:9820")
    conf.set("spark.hadoop.dfs.client.use.datanode.hostname", "true")
    
    # Конфигурация Apache Iceberg
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

Создаём конфигурацию и сессию

In [5]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. В процессе создания сессии происходит подключение к кластеру `Apache Hadoop`, что может занять некоторое время.

In [6]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

Исходный датасет уменьшен ввиду ограниченности ресурсов и сохранён в HDFS по пути:
`hdfs:///data/cars/Used_Crs.csvv
В локальной среде (docker) это будет доступно как:
`hdfs://hadoop-namenode:9820/data/cars/Used_Cars.csv``

Далее выполним чтение CSV

In [9]:
path = "hdfs://hadoop-namenode:9820/data/cars/Used_Cars.csv"

Заполняем датафрейм данными из файла.

In [10]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

Выводим фрагмент датафрейма на экран.

In [11]:
print("Первые 5 строк (preview):")
df.show(5, truncate=120)

Первые 5 строк (preview):
+-----------------+------------+----+----------+----------+---------------+-----+--------+-----------------+--------------------+------------+----------+------------------------------------------------------------------------------------------------------------------------+----------------+-------------------+-----------+--------------+-----+-------------+----------------+--------------+-------------+----------------+---------+-------------+-------+--------------------+----------+------------------------+-----+------------+------+------+---------+--------+--------+-----------+-------------+----------+---------+------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------+----------+---------------+-------+---------------+-----------+------------------+-------+-------+--------------+-------------+--------+-------------------+-----------+----

Очевидно, что в целях сохранения ясности изложения и сокращения расчетного времени имеет смысл рассматривать не все колонки датасета. Оставим следующие колонки, удалив остальные:

| Название столбца  | Расшифровка |
| ------------- | ------------- |
| vin               | Идентификационный номер автомобиля  |
| body_type         | Тип кузова автомобиля (кабриолет, хэтчбек, седан и т.д.)  |
| daysonmarket      | Количество дней, прошедших с момента первого размещения автомобиля на сайте |
| fleet             | Был ли автомобиль ранее частью автопарка |
| has_accidents     | Имеется ли регистрация аварий на данном автомобиле |
| horsepower        | Мощность двигателя в лошадиных силах |
| is_certified      | Является ли автомобиль сертифицированным (на сертифицированные автомобили распространяется гарантийный срок) |
| is_cpo            | Подержанные автомобили, сертифицированные дилером |
| is_oemcpo         | Подержанные автомобили, сертифицированные производителем |
| major_options     | Список основный опций автомобиля |
| maximum_seating   | Максимальное количество посадочных мест |
| mileage           | Величина пробега автомобиля |
| price             | Цена автомобиля |
| wheel_system      | Тип привода |
| year              | Год выпуска автомобиля |


In [12]:
df = df.select(
    "vin", "body_type", "daysonmarket", "fleet", "has_accidents", "horsepower",
    "is_certified", "is_cpo", "is_oemcpo", "major_options",
    "maximum_seating", "mileage", "price", "wheel_system", "year"
)

In [13]:
df.show()

+-----------------+---------------+------------+-----+-------------+----------+------------+------+---------+--------------------+---------------+-------+-------+------------+----+
|              vin|      body_type|daysonmarket|fleet|has_accidents|horsepower|is_certified|is_cpo|is_oemcpo|       major_options|maximum_seating|mileage|  price|wheel_system|year|
+-----------------+---------------+------------+-----+-------------+----------+------------+------+---------+--------------------+---------------+-------+-------+------------+----+
|ZACNJABB5KPJ92081|SUV / Crossover|         522| NULL|         NULL|     177.0|        NULL|  NULL|     NULL|['Quick Order Pac...|        5 seats|    7.0|23141.0|         FWD|2019|
|SALCJ2FX1LH858117|SUV / Crossover|         207| NULL|         NULL|     246.0|        NULL|  NULL|     NULL|['Adaptive Cruise...|        7 seats|    8.0|46500.0|         AWD|2020|
|JF1VA2M67G9829723|          Sedan|        1233|False|        False|     305.0|        NULL|  N

Выведем на экран метаданные датасета.

In [14]:
df.printSchema()

root
 |-- vin: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- daysonmarket: string (nullable = true)
 |-- fleet: string (nullable = true)
 |-- has_accidents: string (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- is_certified: string (nullable = true)
 |-- is_cpo: string (nullable = true)
 |-- is_oemcpo: string (nullable = true)
 |-- major_options: string (nullable = true)
 |-- maximum_seating: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- price: string (nullable = true)
 |-- wheel_system: string (nullable = true)
 |-- year: string (nullable = true)



Видно, что все столбцы датасета содержат строковый тип данных, что не соответствует ожиданиям. Выполним преобразования типов данных некоторых столбцов.

In [15]:
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("daysonmarket",
                           col("daysonmarket").cast("Integer"))
    data = data.withColumn("fleet",
                           col("fleet").cast("Boolean"))
    data = data.withColumn("has_accidents",
                           col("has_accidents").cast("Boolean"))
    data = data.withColumn("horsepower",
                           col("horsepower").cast("Float"))
    data = data.withColumn("is_certified",
                           col("is_certified").cast("Boolean"))
    data = data.withColumn("is_cpo",
                           col("is_cpo").cast("Boolean"))
    data = data.withColumn("is_oemcpo",
                           col("is_oemcpo").cast("Boolean"))

    # Преобразуем строку в массив строк
    data = data.withColumn("major_options",
                           regexp_extract_all(col("major_options"),
                                              lit(r"'([^']*)'"),
                                              1)
    )

    # Убираем единицы измерения
    data = data.withColumn("maximum_seating",
                           regexp_replace(col("maximum_seating"),
                                          r"\s+seats",
                                          "").cast("Integer")
    )

    data = data.withColumn("mileage",
                           col("mileage").cast("Float"))
    data = data.withColumn("price",
                           col("price").cast("Float"))
    data = data.withColumn("year",
                           col("year").cast("Integer"))

    return data


In [16]:
df = transform_dataframe(df)

In [17]:
df.show()

+-----------------+---------------+------------+-----+-------------+----------+------------+------+---------+--------------------+---------------+-------+-------+------------+----+
|              vin|      body_type|daysonmarket|fleet|has_accidents|horsepower|is_certified|is_cpo|is_oemcpo|       major_options|maximum_seating|mileage|  price|wheel_system|year|
+-----------------+---------------+------------+-----+-------------+----------+------------+------+---------+--------------------+---------------+-------+-------+------------+----+
|ZACNJABB5KPJ92081|SUV / Crossover|         522| NULL|         NULL|     177.0|        NULL|  NULL|     NULL|[Quick Order Pack...|              5|    7.0|23141.0|         FWD|2019|
|SALCJ2FX1LH858117|SUV / Crossover|         207| NULL|         NULL|     246.0|        NULL|  NULL|     NULL|[Adaptive Cruise ...|              7|    8.0|46500.0|         AWD|2020|
|JF1VA2M67G9829723|          Sedan|        1233|false|        false|     305.0|        NULL|  N

In [18]:
df.printSchema()

root
 |-- vin: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- daysonmarket: integer (nullable = true)
 |-- fleet: boolean (nullable = true)
 |-- has_accidents: boolean (nullable = true)
 |-- horsepower: float (nullable = true)
 |-- is_certified: boolean (nullable = true)
 |-- is_cpo: boolean (nullable = true)
 |-- is_oemcpo: boolean (nullable = true)
 |-- major_options: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- maximum_seating: integer (nullable = true)
 |-- mileage: float (nullable = true)
 |-- price: float (nullable = true)
 |-- wheel_system: string (nullable = true)
 |-- year: integer (nullable = true)



Видно, что теперь столбцы датафрейма содержат значения корректных типов.

Полученный датафрейм сохраним для дальнейшего использования. Сохранение выполним в таблицу `Apache Iceberg`.

`Apache Iceberg` — это поддерживающий высокую производительность табличный формат для больших данных.

Сначала создадим базу данных, в которой будет расположена таблица.


In [19]:
database_name = "gordeev_database"

Создадим инструкцию SQL для добавления базы данных в каталог Apache Spark.

In [20]:
create_database_sql = f"""
CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}
"""

CREATE DATABASE executed.


In [None]:
spark.sql(create_database_sql)

И, наконец, записываем преобразованный датафрейм в таблицу sobd_lab1_table.

In [27]:
# Сохранение DataFrame в виде таблицы
df.writeTo(f"spark_catalog.{database_name}.sobd_lab1_table").using("iceberg").create()

После успешной записи можно посмотреть, какие таблицы входят в базу данных.

In [29]:
for table in spark.catalog.listTables("gordeev_database"):
    print(table.name)

sobd_lab1_table


После успешной записи таблицы останавливаем сессию Apache Spark.

In [30]:
spark.stop()