In [24]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.ml.feature import VectorAssembler, StringIndexer, Binarizer, Bucketizer
from pyspark.ml.functions import vector_to_array
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql import Window
from pyspark.sql.functions import to_date, col, weekday, max, dayofweek, split, count, when

In [6]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 2 Test")
    conf.setMaster("yarn")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

In [7]:
conf = create_spark_configuration()

In [8]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

25/01/18 07:05:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
path = "/user/user1/2019oct_2.csv"

In [10]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

                                                                                

In [11]:
df.count()

                                                                                

42448764

In [12]:
df = df.dropna()

In [13]:
df.count()

                                                                                

26560620

In [14]:
# Фильтруем строки с event_type "cart" или "purchase"
cart_purchase_users = df.filter(col("event_type").isin(["cart", "purchase"]))

# Удаляем дубликаты по столбцу 'user_id'
cart_purchase_users = cart_purchase_users.dropDuplicates(subset=['user_id'])

# Удаляем строки с пустыми значениями
cart_purchase_users = cart_purchase_users.na.drop(how="any")

# Показываем результат
cart_purchase_users.show()

                                                                                

+--------------------+----------+----------+-------------------+--------------------+---------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|    brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+---------+------+---------+--------------------+
|2019-10-14 17:09:...|      cart|   1004741|2053013555631882655|electronics.smart...|   xiaomi|191.77|293957954|ef4f8622-b955-45d...|
|2019-10-17 17:32:...|      cart|   1004247|2053013555631882655|electronics.smart...|    apple|810.52|362327778|614dda5e-2755-4dc...|
|2019-10-15 17:10:...|      cart|   1004875|2053013555631882655|electronics.smart...|  samsung|368.50|380024145|a8326e55-9f47-4db...|
|2019-10-18 11:19:...|  purchase|   2501816|2053013564003713919|appliances.kitche...|    artel| 41.44|384989212|7472f245-e885-4c5...|
|2019-10-14 18:04:...|      cart|   2701646|205301356391143922

In [15]:
cart_purchase_users = df.filter(df["event_type"].isin("cart", "purchase")) \
                        .dropDuplicates(["user_id"])
cart_purchase_users.show()

[Stage 12:>                                                         (0 + 1) / 1]

+--------------------+----------+----------+-------------------+--------------------+---------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|    brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+---------+------+---------+--------------------+
|2019-10-14 17:09:...|      cart|   1004741|2053013555631882655|electronics.smart...|   xiaomi|191.77|293957954|ef4f8622-b955-45d...|
|2019-10-17 17:32:...|      cart|   1004247|2053013555631882655|electronics.smart...|    apple|810.52|362327778|614dda5e-2755-4dc...|
|2019-10-15 17:10:...|      cart|   1004875|2053013555631882655|electronics.smart...|  samsung|368.50|380024145|a8326e55-9f47-4db...|
|2019-10-18 11:19:...|  purchase|   2501816|2053013564003713919|appliances.kitche...|    artel| 41.44|384989212|7472f245-e885-4c5...|
|2019-10-14 18:04:...|      cart|   2701646|205301356391143922

                                                                                

In [16]:
cart_purchase_users_all_activity = df.filter(df['user_id'].isin(cart_purchase_users['user_id']))

In [17]:
cart_purchase_users_all_activity.show(5)

+--------------------+----------+----------+-------------------+--------------------+------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code| brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|  aqua|  33.20|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:...|      view|   1004237|2053013555631882655|electronics.smart...| apple|1081.98|535871217|c6bd7419-2748-4c5...|
|2019-10-01 00:00:...|      view|   1480613|2053013561092866779|   computers.desktop|pulser| 908.62|512742880|0d0d91c2-c9c2-4e8...|
|2019-10-01 00:00:...|      view|  28719074|2053013565480109009|  apparel.sh

In [20]:
activity_in_session = cart_purchase_users_all_activity.groupBy("user_session").agg(count("event_type").alias("activity_count"))

In [67]:
activity_in_session.show(5)

[Stage 13:>                                                         (0 + 1) / 1]

+--------------------+--------------+
|        user_session|activity_count|
+--------------------+--------------+
|c3012f56-70f3-419...|             1|
|2af9b570-0942-4dc...|             3|
|78c0b329-af93-44f...|             1|
|c7e588c7-78a9-403...|             2|
|85881243-7b33-409...|             3|
+--------------------+--------------+
only showing top 5 rows



                                                                                

In [21]:
from pyspark.sql.functions import to_date

df = df.withColumn('event_date', to_date(col('event_time').substr(1, 10), 'yyyy-MM-dd'))

In [69]:
df.show(5)

+--------------------+----------+----------+-------------------+--------------------+------+-------+---------+--------------------+----------+
|          event_time|event_type|product_id|        category_id|       category_code| brand|  price|  user_id|        user_session|event_date|
+--------------------+----------+----------+-------------------+--------------------+------+-------+---------+--------------------+----------+
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|  aqua|  33.20|554748717|9333dfbd-b87a-470...|2019-10-01|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|lenovo| 251.74|550050854|7c90fc70-0e80-459...|2019-10-01|
|2019-10-01 00:00:...|      view|   1004237|2053013555631882655|electronics.smart...| apple|1081.98|535871217|c6bd7419-2748-4c5...|2019-10-01|
|2019-10-01 00:00:...|      view|   1480613|2053013561092866779|   computers.desktop|pulser| 908.62|512742880|0d0d91c2-c9c2-4e8...|2019-10-01|

                                                                                

In [22]:
df_targets = df.filter(df["event_type"].isin("cart", "purchase")) \
    .dropDuplicates(["event_type", "product_id", "price", "user_id", "user_session"])

In [25]:
df_targets = df_targets.withColumn("is_purchased", when(df_targets["event_type"] == "purchase", 1).otherwise(0))

In [72]:
df_targets.show(5)

[Stage 17:>                                                         (0 + 1) / 1]

+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+----------+------------+
|          event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|        user_session|event_date|is_purchased|
+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+----------+------------+
|2019-10-14 05:34:...|  purchase|   1005135|2053013555631882655|electronics.smart...|  apple|1747.70|560002777|0002fa4e-1d84-444...|2019-10-14|           1|
|2019-10-09 17:47:...|  purchase|   1004767|2053013555631882655|electronics.smart...|samsung| 250.93|549979887|00035c16-00bf-49a...|2019-10-09|           1|
|2019-10-11 09:20:...|      cart|   1004836|2053013555631882655|electronics.smart...|samsung| 230.28|544933935|00046b3e-75e1-475...|2019-10-11|           0|
|2019-10-31 15:45:...|  purchase|   1004870|20530135556318

                                                                                

In [26]:
window_spec = Window.partitionBy("user_session", "product_id")

# Применяем оконную функцию max для вычисления максимального значения 'is_purchased' по каждой группе
df_targets = df_targets.withColumn("is_purchased", max(col("is_purchased")).over(window_spec))

In [27]:
df_targets = df_targets.filter(col("event_type") == "cart")
# Удаление дубликатов по столбцам user_session, product_id и is_purchased
df_targets = df_targets.dropDuplicates(["user_session", "product_id", "is_purchased"])

In [28]:
df_targets = df_targets.withColumn('event_weekday', dayofweek(col('event_date')) - 1)

In [91]:
df_targets.show(5)

[Stage 35:>                                                         (0 + 1) / 1]

+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+----------+------------+-------------+
|          event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|        user_session|event_date|is_purchased|event_weekday|
+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+----------+------------+-------------+
|2019-10-23 08:56:...|      cart|   1005074|2053013555631882655|electronics.smart...|samsung|1149.08|536813317|00029324-8160-401...|2019-10-23|           0|            3|
|2019-10-17 18:29:...|      cart|   1004653|2053013555631882655|electronics.smart...|samsung| 606.04|515993649|000c8c15-4025-4f9...|2019-10-17|           0|            4|
|2019-10-27 09:17:...|      cart|   1005113|2053013555631882655|electronics.smart...|  apple|1091.10|514468999|0011bd92-49a8-425...|2019-10-27|  

                                                                                

In [130]:
df_targets.show(5)

+--------------------+----------+----------+-------------------+--------------------+------+-------+---------+--------------------+----------+--------------------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code| brand|  price|  user_id|        user_session|event_date|category_code_level1|category_code_level2|
+--------------------+----------+----------+-------------------+--------------------+------+-------+---------+--------------------+----------+--------------------+--------------------+
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|  aqua|  33.20|554748717|9333dfbd-b87a-470...|2019-10-01|appliances.enviro...|                NULL|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|lenovo| 251.74|550050854|7c90fc70-0e80-459...|2019-10-01|  computers.notebook|                NULL|
|2019-10-01 00:00:...|      view|   1004237|2053013555631882655|electronics

In [29]:
df_targets = df_targets.join(activity_in_session, on="user_session", how="left")

In [138]:
df_targets.show(5)

                                                                                

+--------------------+--------------------+----------+----------+-------------------+--------------------+-------+------+---------+----------+------------+-------------+--------------+
|        user_session|          event_time|event_type|product_id|        category_id|       category_code|  brand| price|  user_id|event_date|is_purchased|event_weekday|activity_count|
+--------------------+--------------------+----------+----------+-------------------+--------------------+-------+------+---------+----------+------------+-------------+--------------+
|000081ea-9376-4eb...|2019-10-24 09:06:...|      cart|   1004856|2053013555631882655|electronics.smart...|samsung|131.51|513622224|2019-10-24|           1|            4|             3|
|000174ac-0ea3-402...|2019-10-18 10:45:...|      cart|   1004767|2053013555631882655|electronics.smart...|samsung|249.86|548449052|2019-10-18|           1|            5|             9|
|00023d48-1798-4d1...|2019-10-16 01:24:...|      cart|   4804295|2053013554

In [30]:
df_targets.printSchema()

root
 |-- user_session: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event_date: date (nullable = true)
 |-- is_purchased: integer (nullable = true)
 |-- event_weekday: integer (nullable = true)
 |-- activity_count: long (nullable = true)



In [31]:
df_targets = df_targets.withColumn("price", F.col("price").cast(DoubleType()))

In [32]:
df_targets.printSchema()

root
 |-- user_session: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event_date: date (nullable = true)
 |-- is_purchased: integer (nullable = true)
 |-- event_weekday: integer (nullable = true)
 |-- activity_count: long (nullable = true)



In [143]:
is_purcahase_set = df_targets[df_targets['is_purchased']== 1]
is_purcahase_set.count()

                                                                                

273909

In [144]:
not_purcahase_set = df_targets[df_targets['is_purchased']== 0]
not_purcahase_set.count()

                                                                                

272691

In [33]:
database_name = "lopin_database2"

In [34]:
spark.catalog.setCurrentDatabase(database_name)

In [35]:
df_targets.writeTo("sobd_lab2").using("iceberg").create()

                                                                                

In [36]:
for table in spark.catalog.listTables():
    print(table.name)

sobd_lab1_processed_table
sobd_lab2
sobd_lab1_table


In [37]:
spark.stop()