In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from pyspark.sql import functions as F
from datetime import date
# Import Window from pyspark.sql.window
from pyspark.sql.window import Window


In [None]:

# Initialize Spark session
spark = SparkSession.builder.appName("E-commerce Transactions Analysis").getOrCreate()


In [None]:

# Define the schema for the DataFrame
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("transaction_date", DateType(), True)])


In [None]:

# Create sample data
data = [
    ("T1", "U1", "P1", "Music", 15.0, date(2024, 1, 15)),
    ("T2", "U2", "P2", "Music", 25.0, date(2024, 1, 20)),
    ("T3", "U3", "P3", "Home Appliances", 80.0, date(2024, 2, 1)),
    ("T4", "U4", "P4", "Home Appliances", 150.0, date(2024, 2, 3)),
    ("T5", "U5", "P5", "Health", 45.0, date(2024, 3, 7)),
    ("T6", "U6", "P6", "Health", 60.0, date(2024, 3, 9)),
    ("T7", "U7", "P7", "Gaming", 120.0, date(2024, 4, 1)),
    ("T8", "U8", "P8", "Gaming", 180.0, date(2024, 4, 5)),
    ("T9", "U9", "P9", "Kitchen", 55.0, date(2024, 5, 3)),
    ("T10", "U10", "P10", "Kitchen", 70.0, date(2024, 5, 7)),
]


In [None]:


# Create DataFrame
df = spark.createDataFrame(data, schema)
df.show()

+--------------+-------+----------+---------------+------+----------------+
|transaction_id|user_id|product_id|       category|amount|transaction_date|
+--------------+-------+----------+---------------+------+----------------+
|            T1|     U1|        P1|          Music|  15.0|      2024-01-15|
|            T2|     U2|        P2|          Music|  25.0|      2024-01-20|
|            T3|     U3|        P3|Home Appliances|  80.0|      2024-02-01|
|            T4|     U4|        P4|Home Appliances| 150.0|      2024-02-03|
|            T5|     U5|        P5|         Health|  45.0|      2024-03-07|
|            T6|     U6|        P6|         Health|  60.0|      2024-03-09|
|            T7|     U7|        P7|         Gaming| 120.0|      2024-04-01|
|            T8|     U8|        P8|         Gaming| 180.0|      2024-04-05|
|            T9|     U9|        P9|        Kitchen|  55.0|      2024-05-03|
|           T10|    U10|       P10|        Kitchen|  70.0|      2024-05-07|
+-----------

In [None]:

# Calculate total spending amount and average transaction amount per user
spending_df = df.groupBy("user_id").agg(
    F.sum("amount").alias("total_spent"),
    F.avg("amount").alias("avg_transaction")
)
spending_df.show()

+-------+-----------+---------------+
|user_id|total_spent|avg_transaction|
+-------+-----------+---------------+
|     U2|       25.0|           25.0|
|     U4|      150.0|          150.0|
|     U3|       80.0|           80.0|
|     U5|       45.0|           45.0|
|     U1|       15.0|           15.0|
|    U10|       70.0|           70.0|
|     U6|       60.0|           60.0|
|     U9|       55.0|           55.0|
|     U8|      180.0|          180.0|
|     U7|      120.0|          120.0|
+-------+-----------+---------------+



In [None]:

# Calculate the most frequently purchased category for each user
favorite_category_df = df.groupBy("user_id", "category") \
    .count() \
    .withColumn("rank", F.row_number().over(
        Window.partitionBy("user_id").orderBy(F.desc("count"))
    )) \
    .filter(F.col("rank") == 1) \
    .select("user_id", F.col("category").alias("favorite_category"))

df.show()


+--------------+-------+----------+---------------+------+----------------+
|transaction_id|user_id|product_id|       category|amount|transaction_date|
+--------------+-------+----------+---------------+------+----------------+
|            T1|     U1|        P1|          Music|  15.0|      2024-01-15|
|            T2|     U2|        P2|          Music|  25.0|      2024-01-20|
|            T3|     U3|        P3|Home Appliances|  80.0|      2024-02-01|
|            T4|     U4|        P4|Home Appliances| 150.0|      2024-02-03|
|            T5|     U5|        P5|         Health|  45.0|      2024-03-07|
|            T6|     U6|        P6|         Health|  60.0|      2024-03-09|
|            T7|     U7|        P7|         Gaming| 120.0|      2024-04-01|
|            T8|     U8|        P8|         Gaming| 180.0|      2024-04-05|
|            T9|     U9|        P9|        Kitchen|  55.0|      2024-05-03|
|           T10|    U10|       P10|        Kitchen|  70.0|      2024-05-07|
+-----------

In [None]:

# Join results
result_df = spending_df.join(favorite_category_df, on="user_id", how="left")


In [None]:

# Show the result
result_df.show()

+-------+-----------+---------------+-----------------+
|user_id|total_spent|avg_transaction|favorite_category|
+-------+-----------+---------------+-----------------+
|     U2|       25.0|           25.0|            Music|
|     U4|      150.0|          150.0|  Home Appliances|
|     U3|       80.0|           80.0|  Home Appliances|
|     U5|       45.0|           45.0|           Health|
|     U1|       15.0|           15.0|            Music|
|    U10|       70.0|           70.0|          Kitchen|
|     U6|       60.0|           60.0|           Health|
|     U9|       55.0|           55.0|          Kitchen|
|     U8|      180.0|          180.0|           Gaming|
|     U7|      120.0|          120.0|           Gaming|
+-------+-----------+---------------+-----------------+

