## Лабораторная работа № 1
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

Подключим необходимые библиотеки.

In [68]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit,
    split
)

Сформируем объект конфигурации для Apache Spark, указав необходимые параметры, и создадим сам объект конфигурации.

In [69]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = "dchel"
    
    conf = SparkConf()
    conf.setAppName("Lab 1")
    conf.setMaster("local[*]")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    # conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    # conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    # conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

In [70]:
conf = create_spark_configuration()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

Для исследования будем использовать датасет `eCommerce behavior data from multi category store`, расположенный на платформе **Kaggle** по адресу https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store.
Этот датасет содержит данные о поведении пользователей за 7 месяцев (с октября 2019 года по апрель 2020 года) в крупном интернет-магазине с несколькими категориями товаров.
.

Данный датасет уже загружен в HDFS по адеuser/dchel/dataset.csv.csv

Указываем путь  и заполняем датафрейм данными из файлааными.`

In [71]:
path = "hdfs://namenode:9000/user/dchel/dataset.csv"
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)
df.show()

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|  33.20|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    NULL| 543.10|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:...|      view|   1004237|205301355563188265

In [72]:
df = df.select(
    "event_type", "product_id", "category_id", "category_code", "brand", "price"
)
df.show()

+----------+----------+-------------------+--------------------+--------+-------+
|event_type|product_id|        category_id|       category_code|   brand|  price|
+----------+----------+-------------------+--------------------+--------+-------+
|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|
|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|  33.20|
|      view|  17200506|2053013559792632471|furniture.living_...|    NULL| 543.10|
|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|
|      view|   1004237|2053013555631882655|electronics.smart...|   apple|1081.98|
|      view|   1480613|2053013561092866779|   computers.desktop|  pulser| 908.62|
|      view|  17300353|2053013553853497655|                NULL|   creed| 380.96|
|      view|  31500053|2053013558031024687|                NULL|luminarc|  41.16|
|      view|  28719074|2053013565480109009|  apparel.shoes.keds|   baden| 102.71|
|      view|   1

In [73]:
df.printSchema()

root
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)



Выполним преобразования типов данных некоторых столбцов.

In [74]:
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("category_id", col("category_id").cast("Integer"))
    data = data.withColumn("product_id", col("product_id").cast("Integer"))
    data = data.withColumn("price", col("price").cast("Float"))

    # Преобразуем строку в массив строк
    data = data.withColumn("category_code", 
                            split(col("category_code"), r"\.")
                          )

    data = data.dropna(subset=["category_code", "brand"])

    return data

In [75]:
df = transform_dataframe(df)
df.show()
df.printSchema()

+----------+----------+-----------+--------------------+--------+-------+
|event_type|product_id|category_id|       category_code|   brand|  price|
+----------+----------+-----------+--------------------+--------+-------+
|      view|   3900821|       NULL|[appliances, envi...|    aqua|   33.2|
|      view|   1307067|       NULL|[computers, noteb...|  lenovo| 251.74|
|      view|   1004237|       NULL|[electronics, sma...|   apple|1081.98|
|      view|   1480613|       NULL|[computers, desktop]|  pulser| 908.62|
|      view|  28719074|       NULL|[apparel, shoes, ...|   baden| 102.71|
|      view|   1004545|       NULL|[electronics, sma...|  huawei| 566.01|
|      view|   2900536|       NULL|[appliances, kitc...|elenberg|  51.46|
|      view|   1005011|       NULL|[electronics, sma...| samsung| 900.64|
|      view|   3900746|       NULL|[appliances, envi...|   haier| 102.38|
|      view|  13500240|       NULL|[furniture, bedro...|     brw|  93.18|
|      view|   1801995|       NULL|[el

Сохраним обработанный датафрейм в HDFS

In [76]:
database_name = "dchel_database"
table_name = "eCommerce_table"

output_path = f"hdfs://namenode:9000/user/dchel/{database_name}/{table_name}"

print(f"Сохраняем данные в: {output_path}")

df.write.mode("overwrite").option("compression", "snappy").parquet(output_path)

print("Данные успешно сохранены в Parquet формате в HDFS")

Сохраняем данные в: hdfs://namenode:9000/user/dchel/dchel_database/eCommerce_table
Данные успешно сохранены в Parquet формате в HDFS


Закроем Spark сессию

In [77]:
spark.stop()