## Лабораторная работа № 1 
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

### Часть 1

В данной части работы рассмотрены:
* загрузка данных из HDFS;
* базовые преобразования данных;
* загрузка преобразованных данных в таблицу `Apache Airflow`.

Подключим необходимые библиотеки.

In [73]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

Сформируем объект конфигурации для `Apache Spark`, указав необходимые параметры.

In [74]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("yarn")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

Создаём сам объект конфигурации.

In [75]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. В процессе создания сессии происходит подключение к кластеру `Apache Hadoop`, что может занять некоторое время.

In [76]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

24/12/07 02:17:39 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
24/12/07 02:17:57 WARN Client: Same path resource file:///home/user6/.ivy2/jars/org.apache.iceberg_iceberg-spark-runtime-3.5_2.12-1.6.0.jar added multiple times to distributed cache.


Для исследования будем использовать датасет `"US Used cars dataset"`, расположенный на платформе `Kaggle` по адресу https://www.kaggle.com/datasets/ananaymital/us-used-cars-dataset.

Датасет включает в себя информацию о более чем трех миллионах используемых машин в США. Он разрешен для использования в учебных целях.

Данный датасет уже загружен в `HDFS` по адресу: `hdfs:///datasets/used_cars_data.csv`

Указываем путь в `HDFS` для файла с данными.

In [77]:
#path = "hdfs:///datasets/HI-Medium_Trans.csv"
path = "hdfs:///user/user6/khripunov_directory/HI-Medium_Trans.csv"

Заполняем датафрейм данными из файла.

In [78]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

24/12/07 02:18:47 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

Выводим фрагмент датафрейма на экран.

In [79]:
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022/09/01 00:17|      020|800104D70|    020|800104D70|        6794.63|         US Dollar|    6794.63|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:02|    03196|800107150|  03196|800107150|        7739.29|         US Dollar|    7739.29|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:17|    01208|80010E430|  01208|80010E430|        1880.23|         US Dollar|    1880.23|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:03|    01208|80010E650|    020|80010E6F0|    73966883.00|         US Dollar|73966883.

                                                                                

Очевидно, что в целях сохранения ясности изложения и сокращения расчетного времени имеет смысл рассматривать не все солбцы датасета. Оставим следующие колонки, удалив остальные:

| Название столбца  | Расшифровка |
| ------------- | ------------- |
| Timestamp         | Дата и время выполнения транзакции  |
| From Bank	        | Банк, откуда была отправлена транзакция  |
| To Bank           | Банк, куда была отправлена транзакция |
| Amount Received   | Полученная сумма
| Amount Paid       | Сумма, отправленная отправителем |
| Payment Currency  | Валюта отправленной суммы |
| Payment Format	| Формат платежа (например, "Reinvestment", "Cheque")  |
| Is Laundering	    | Флаг, указывающий, является ли транзакция подозрительной (0 или 1) |

In [80]:
df = df.select(
    "Timestamp", "From Bank", "To Bank", "Amount Received", "Amount Paid", "Payment Currency", "Payment Format",
    "Is Laundering"
)

In [81]:
df.show()

+----------------+---------+-------+---------------+-----------+----------------+--------------+-------------+
|       Timestamp|From Bank|To Bank|Amount Received|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+----------------+---------+-------+---------------+-----------+----------------+--------------+-------------+
|2022/09/01 00:17|      020|    020|        6794.63|    6794.63|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:02|    03196|  03196|        7739.29|    7739.29|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:17|    01208|  01208|        1880.23|    1880.23|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:03|    01208|    020|    73966883.00|73966883.00|       US Dollar|        Cheque|            0|
|2022/09/01 00:02|    01208|    020|    45868454.00|45868454.00|       US Dollar|        Cheque|            0|
|2022/09/01 00:27|    03203|  03203|       13284.41|   13284.41|       US Dollar|  Reinvestment|            0|
|

Выведем на экран метаданные датасета.

In [82]:
df.printSchema()

root
 |-- Timestamp: string (nullable = true)
 |-- From Bank: string (nullable = true)
 |-- To Bank: string (nullable = true)
 |-- Amount Received: string (nullable = true)
 |-- Amount Paid: string (nullable = true)
 |-- Payment Currency: string (nullable = true)
 |-- Payment Format: string (nullable = true)
 |-- Is Laundering: string (nullable = true)



Видно, что все столбцы датасета содержат строковый тип данных, что не соответствует ожиданиям. Выполним преобразования типов данных некоторых столбцов.

In [83]:
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("Timestamp", col("Timestamp").cast("Timestamp"))  # Дата и время
    data = data.withColumn("Amount Received", col("Amount Received").cast("Float"))  # Полученная сумма
    data = data.withColumn("Amount Paid", col("Amount Paid").cast("Float"))  # Сумма отправления
    data = data.withColumn("Is Laundering", col("Is Laundering").cast("Boolean"))  # Флаг подозрительности
    
    # Убираем валютные обозначения в столбцах валют (при необходимости)
    data = data.withColumn("Payment Currency", regexp_replace(col("Payment Currency"), r"\s+Dollar", "").alias("Payment Currency"))

    return data

# Подсчёт записей с Is Laundering == True
df.filter(col("Is Laundering") == False).count()



                                                                                

31863008

In [84]:
df = transform_dataframe(df)

In [85]:
df.show()

+---------+---------+-------+---------------+-----------+----------------+--------------+-------------+
|Timestamp|From Bank|To Bank|Amount Received|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+---------+---------+-------+---------------+-----------+----------------+--------------+-------------+
|     NULL|      020|    020|        6794.63|    6794.63|              US|  Reinvestment|        false|
|     NULL|    03196|  03196|        7739.29|    7739.29|              US|  Reinvestment|        false|
|     NULL|    01208|  01208|        1880.23|    1880.23|              US|  Reinvestment|        false|
|     NULL|    01208|    020|     7.396688E7| 7.396688E7|              US|        Cheque|        false|
|     NULL|    01208|    020|    4.5868456E7|4.5868456E7|              US|        Cheque|        false|
|     NULL|    03203|  03203|       13284.41|   13284.41|              US|  Reinvestment|        false|
|     NULL|      020|    020|           9.72|       9.72|       

                                                                                

In [86]:
df.printSchema()

root
 |-- Timestamp: timestamp (nullable = true)
 |-- From Bank: string (nullable = true)
 |-- To Bank: string (nullable = true)
 |-- Amount Received: float (nullable = true)
 |-- Amount Paid: float (nullable = true)
 |-- Payment Currency: string (nullable = true)
 |-- Payment Format: string (nullable = true)
 |-- Is Laundering: boolean (nullable = true)



Видно, что теперь столбцы датафрейма содержат значения корректных типов.

Полученный датафрейм сохраним для дальнейшего использования. Сохранение выполним в таблицу `Apache Iceberg`. 

`Apache Iceberg` — это поддерживающий высокую производительность табличный формат для больших данных.

Сначала создадим базу данных, в которой будет расположена таблица. Во избежание путаницы, **каждую таблицу следует называть с использованием фамилии студента**.

In [87]:
database_name = "Khripunov_database"

Создадим инструкцию SQL для добавления базы данных в каталог `Apache Spark`.

In [88]:
create_database_sql = f"""
CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}
"""

In [89]:
spark.sql(create_database_sql)

DataFrame[]

Установим созданную базу данных как текущую.

In [90]:
spark.catalog.setCurrentDatabase(database_name)

И, наконец, записываем преобразованный датафрейм в таблицу `sobd_lab1_table`.

In [91]:
# Сохранение DataFrame в виде таблицы
df.writeTo("sobd_lab1_table2").using("iceberg").create()

                                                                                

После успешной записи можно посмотреть, какие таблицы входят в базу данных.

In [92]:
for table in spark.catalog.listTables():
    print(table.name)

                                                                                

sobd_lab1_table
sobd_lab1_table1
sobd_lab1_table2


Обратите внимание, что при необходимости созданные базу данных и таблицу можно удалить следующими командами.

In [93]:
# spark.sql("DROP TABLE spark_catalog.Khripunov_database.sobd_lab1_table")
# spark.sql("DROP DATABASE spark_catalog.Khripunov_database")

После успешной записи таблицы останавливаем сессию `Apache Spark`.

In [94]:
spark.stop()