In [2]:
import os
os.environ['SPARK_HOME'] = "C:/spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.functions import *

In [30]:
spark = SparkSession.builder.appName("aggregate-app")\
.getOrCreate()
spark

In [40]:
data = [
    ("T1001", "U001", "P001", "Electronics", 199.99, "2024-01-15"),
    ("T1002", "U002", "P002", "Clothing", 49.99, "2024-01-16"),
    ("T1003", "U001", "P003", "Books", 12.99, "2024-01-17"),
    ("T1004", "U003", "P004", "Electronics", 299.99, "2024-01-18"),
    ("T1005", "U004", "P005", "Groceries", 15.75, "2024-01-19"),
    ("T1006", "U002", "P006", "Clothing", 89.99, "2024-01-20"),
    ("T1007", "U005", "P007", "Furniture", 450.00, "2024-01-21"),
    ("T1008", "U001", "P008", "Electronics", 199.99, "2024-01-22"),
    ("T1009", "U003", "P009", "Books", 24.99, "2024-01-23"),
    ("T1010", "U004", "P010", "Groceries", 20.50, "2024-01-24")
]

In [42]:
schema = StructType([
    StructField("transaction_id", StringType(), True),StructField("user_id", StringType(), True),
    StructField("product_id", StringType(), True),StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),StructField("date", StringType(), True)
])
# dataframe = spark.createDataFrame(data,["transaction_id","user_id","product_id","category","amount","date"])
df = spark.createDataFrame(data, schema)

df.show()

+--------------+-------+----------+-----------+------+----------+
|transaction_id|user_id|product_id|   category|amount|      date|
+--------------+-------+----------+-----------+------+----------+
|         T1001|   U001|      P001|Electronics|199.99|2024-01-15|
|         T1002|   U002|      P002|   Clothing| 49.99|2024-01-16|
|         T1003|   U001|      P003|      Books| 12.99|2024-01-17|
|         T1004|   U003|      P004|Electronics|299.99|2024-01-18|
|         T1005|   U004|      P005|  Groceries| 15.75|2024-01-19|
|         T1006|   U002|      P006|   Clothing| 89.99|2024-01-20|
|         T1007|   U005|      P007|  Furniture| 450.0|2024-01-21|
|         T1008|   U001|      P008|Electronics|199.99|2024-01-22|
|         T1009|   U003|      P009|      Books| 24.99|2024-01-23|
|         T1010|   U004|      P010|  Groceries|  20.5|2024-01-24|
+--------------+-------+----------+-----------+------+----------+



In [81]:
amounts = df.groupBy("user_id").agg(
    sum("amount").alias("total_spent"),
    avg("amount").alias("average_transaction"))
amounts.show()

+-------+-----------+-------------------+
|user_id|total_spent|average_transaction|
+-------+-----------+-------------------+
|   U001|     412.97| 137.65666666666667|
|   U002|     139.98|              69.99|
|   U003|     324.98|             162.49|
|   U004|      36.25|             18.125|
|   U005|      450.0|              450.0|
+-------+-----------+-------------------+



In [90]:
window= Window.partitionBy("user_id").orderBy(desc("count"))
user_fav = df.groupBy("user_id", "category").count().withColumn("rank", row_number().over(window)).select("user_id", "category")\
.filter(col("rank")==1)
user_fav.show()

+-------+-----------+
|user_id|   category|
+-------+-----------+
|   U001|Electronics|
|   U002|   Clothing|
|   U003|Electronics|
|   U004|  Groceries|
|   U005|  Furniture|
+-------+-----------+



In [82]:
result = amounts.join(user_fav,on="user_id",how="left")
result.show()

+-------+-----------+-------------------+-----------+
|user_id|total_spent|average_transaction|   category|
+-------+-----------+-------------------+-----------+
|   U001|     412.97| 137.65666666666667|Electronics|
|   U002|     139.98|              69.99|   Clothing|
|   U003|     324.98|             162.49|Electronics|
|   U004|      36.25|             18.125|  Groceries|
|   U005|      450.0|              450.0|  Furniture|
+-------+-----------+-------------------+-----------+

