In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.functions import *


spark = SparkSession.builder.appName("SparkByExamples.com").getOrCreate()

schema = StructType([
    StructField("trsnsaction_id",StringType(),nullable = False),
    StructField("user_id",StringType(),nullable = False),
    StructField("product_id",StringType(),nullable = False),
    StructField("category",StringType(),nullable = False),
    StructField("amount",DoubleType(),nullable = False),
    StructField("transaction_date",StringType(),nullable = False)
])

data = [
    (1, 101, 1001, 'Electronics', 299.88, '2023-06-15'),
    (2, 102, 1002, 'Clothing', 59.99, '2022-11-20'),
    (3, 103, 1003, 'Home & Kitchen', 42.75, '2021-09-07'),
    (4, 104, 1004, 'Sports', 134.20, '2024-01-11'),
    (5, 105, 1005, 'Electronics', 199.99, '2023-02-27'),
    (6, 106, 1006, 'Beauty', 89.50, '2024-03-23'),
    (7, 107, 1007, 'Books', 25.99, '2022-12-30'),
    (8, 108, 1008, 'Home & Kitchen', 78.20, '2023-11-05'),
    (9, 109, 1009, 'Clothing', 42.10, '2023-05-12'),
    (10, 110, 1010, 'Electronics', 150.75, '2024-02-21')
]

total_spent_by_user = df.groupBy("user_id").agg(sum("amount").alias("total_spent"))
avg_spent_by_user = df.groupBy("user_id").agg(avg("amount").alias("avg_spent"))
amounts = df.groupBy("user_id").agg(
    sum("amount").alias("total_spent_by_user"),
    avg("amount").alias("avg_spent_by_user"))

window= Window.partitionBy("user_id").orderBy(desc("count"))
fav_cat = df.groupBy("user_id", "category").count().withColumn("rank", row_number().over(window)).select("user_id", "category")\
.filter(col("rank")==1)
r = amounts.join(fav_cat, "user_id")
r.show()



+-------+-------------------+-----------------+--------------+
|user_id|total_spent_by_user|avg_spent_by_user|      category|
+-------+-------------------+-----------------+--------------+
|    101|             299.88|           299.88|   Electronics|
|    102|              59.99|            59.99|      Clothing|
|    103|              42.75|            42.75|Home & Kitchen|
|    104|              134.2|            134.2|        Sports|
|    105|             199.99|           199.99|   Electronics|
|    106|               89.5|             89.5|        Beauty|
|    107|              25.99|            25.99|         Books|
|    108|               78.2|             78.2|Home & Kitchen|
|    109|               42.1|             42.1|      Clothing|
|    110|             150.75|           150.75|   Electronics|
+-------+-------------------+-----------------+--------------+



In [28]:
df = spark.createDataFrame(data,schema)

+-------+-------------------+-----------------+
|user_id|total_spent_by_user|avg_spent_by_user|
+-------+-------------------+-----------------+
|    101|             299.88|           299.88|
|    104|              134.2|            134.2|
|    102|              59.99|            59.99|
|    103|              42.75|            42.75|
|    105|             199.99|           199.99|
|    107|              25.99|            25.99|
|    110|             150.75|           150.75|
|    108|               78.2|             78.2|
|    106|               89.5|             89.5|
|    109|               42.1|             42.1|
+-------+-------------------+-----------------+



+-------+--------------+
|user_id|      category|
+-------+--------------+
|    101|   Electronics|
|    102|      Clothing|
|    103|Home & Kitchen|
|    104|        Sports|
|    105|   Electronics|
|    106|        Beauty|
|    107|         Books|
|    108|Home & Kitchen|
|    109|      Clothing|
|    110|   Electronics|
+-------+--------------+

