## Отчет лабораторной работа № 1 
## Часть 1 
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

### Использован датасет: https://www.kaggle.com/datasets/omertarikyilmaz/istabul-traffic-2020-2024
### Часть 1

В данной части работы рассмотрены:
* загрузка данных из HDFS;
* базовые преобразования данных;
* загрузка преобразованных данных в таблицу `Apache Airflow`.

Подключим необходимые библиотеки.

In [1]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

Сформируем объект конфигурации для `Apache Spark`, указав необходимые параметры.

In [2]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("local[*]")
    # conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "1")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "1")
    # conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    # conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    # conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    # conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    # conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    # conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

Создаём сам объект конфигурации.

In [3]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. В процессе создания сессии происходит подключение к кластеру `Apache Hadoop`, что может занять некоторое время.

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

Загрузим csv файл датасета.

In [5]:
path = "data/istanbulTraffic2020-2024.csv"

Заполняем датафрейм данными из файла.

In [6]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

Выводим фрагмент датафрейма на экран.

In [7]:
df.show()

+-------------------+----------------+----------------+-------+-------------+-------------+-------------+------------------+
|          DATE_TIME|        LATITUDE|       LONGITUDE|GEOHASH|MINIMUM_SPEED|MAXIMUM_SPEED|AVERAGE_SPEED|NUMBER_OF_VEHICLES|
+-------------------+----------------+----------------+-------+-------------+-------------+-------------+------------------+
|2021-10-01 00:00:00|40.9323120117188|29.3609619140625| sxkbu5|           52|           85|           67|                 7|
|2021-10-01 00:00:00|41.1245727539063|28.1854248046875| sxk1cq|           33|           89|           68|                 5|
|2021-10-01 00:00:00|41.1190795898438|28.9434814453125| sxk9ft|           48|          144|           72|               149|
|2021-10-01 00:00:00|40.9707641601563|29.3170166015625| sxkc54|           77|          104|           84|                24|
|2021-10-01 00:00:00|41.2509155273438|28.6798095703125| sxk6st|           45|          104|           73|                 8|


Очевидно, что в целях сохранения ясности изложения и сокращения расчетного времени имеет смысл рассматривать не все солбцы датасета. Оставим следующие колонки, удалив остальные:

| Название столбца  | Расшифровка |
| -------------------| ------------- |
| DATE_TIME          | Точная временная метка наблюдения  |
| LATITUDE           | Географическая широта места измерения  |
| LONGITUDE          | Географическая долгота места измерения |
| GEOHASH            | Строковый код для географической области |
| MINIMUM_SPEED      | Минимальная зарегистрированная скорость (км/ч) |
| MAXIMUM_SPEED      | Максимальная зарегистрированная скорость (км/ч) |
| AVERAGE_SPEED      | Средняя скорость всех обнаруженных транспортных средств (км/ч) |
| NUMBER_OF_VEHICLES |  Количество обнаруженных уникальных транспортных средств |

Выведем на экран метаданные датасета.

In [9]:
df.printSchema()

root
 |-- DATE_TIME: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- GEOHASH: string (nullable = true)
 |-- MINIMUM_SPEED: string (nullable = true)
 |-- MAXIMUM_SPEED: string (nullable = true)
 |-- AVERAGE_SPEED: string (nullable = true)
 |-- NUMBER_OF_VEHICLES: string (nullable = true)



Проверим датасет на нули.

In [18]:
columns_to_check = df.columns
df_with_nulls_or_empty = df.filter(
    " OR ".join([f"{col} IS NULL OR {col} = ''" for col in columns_to_check])
)
df_with_nulls_or_empty.show()

+---------+--------+---------+-------+-------------+-------------+-------------+------------------+
|DATE_TIME|LATITUDE|LONGITUDE|GEOHASH|MINIMUM_SPEED|MAXIMUM_SPEED|AVERAGE_SPEED|NUMBER_OF_VEHICLES|
+---------+--------+---------+-------+-------------+-------------+-------------+------------------+
+---------+--------+---------+-------+-------------+-------------+-------------+------------------+



Видно, что датасет не содержит нули.

Сохраним в нужный формат для быстрого чтения.

In [20]:
df.write.parquet("data/istanbulTraffic.parquet")

Останавливаем сессию.

In [21]:
spark.stop()