In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[1]")
    .appName("pyspark-practice")
    .config("spark.ui.enabled", "false")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .getOrCreate()
)

print("spark ok")


spark ok


In [3]:
csv_text = """event_id,user_id,event_ts,event_type,product_category,price,device,country
1,u001,2025-03-01 10:01:00,view,phone,0,mobile,MD
2,u001,2025-03-01 10:05:00,add_to_cart,phone,0,mobile,MD
3,u001,2025-03-01 10:07:00,purchase,phone,699,mobile,MD
4,u001,2025-03-10 21:10:00,view,accessories,0,desktop,MD
5,u002,2025-03-02 09:15:00,view,laptop,0,desktop,RO
6,u002,2025-03-02 09:30:00,purchase,laptop,849,desktop,RO
7,u003,2025-03-03 14:20:00,view,headphones,0,mobile,UA
8,u003,2025-03-03 14:40:00,view,headphones,0,mobile,UA
9,u003,2025-03-03 15:00:00,purchase,headphones,149,mobile,UA
10,u003,2025-03-15 12:10:00,refund,headphones,-149,mobile,UA
11,u004,2025-03-05 18:00:00,view,phone,0,mobile,MD
12,u004,2025-03-05 18:10:00,view,phone,0,mobile,MD
13,u004,2025-03-05 18:30:00,add_to_cart,phone,0,mobile,MD
14,u005,2025-03-07 20:05:00,view,laptop,0,laptop,DE
15,u005,2025-03-07 20:45:00,purchase,laptop,999,laptop,DE
"""

lines = spark.sparkContext.parallelize(csv_text.strip().split("\n"))
df_raw = spark.read.option("header", True).csv(lines)

In [10]:
df = (
    df_raw
    .withColumn("event_id", F.col("event_id").cast("int"))
    .withColumn("event_ts", F.to_timestamp("event_ts", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("price", F.col("price").cast("double"))
)

df.show(truncate=False)
df.printSchema()

+--------+-------+-------------------+-----------+----------------+------+-------+-------+
|event_id|user_id|event_ts           |event_type |product_category|price |device |country|
+--------+-------+-------------------+-----------+----------------+------+-------+-------+
|1       |u001   |2025-03-01 10:01:00|view       |phone           |0.0   |mobile |MD     |
|2       |u001   |2025-03-01 10:05:00|add_to_cart|phone           |0.0   |mobile |MD     |
|3       |u001   |2025-03-01 10:07:00|purchase   |phone           |699.0 |mobile |MD     |
|4       |u001   |2025-03-10 21:10:00|view       |accessories     |0.0   |desktop|MD     |
|5       |u002   |2025-03-02 09:15:00|view       |laptop          |0.0   |desktop|RO     |
|6       |u002   |2025-03-02 09:30:00|purchase   |laptop          |849.0 |desktop|RO     |
|7       |u003   |2025-03-03 14:20:00|view       |headphones      |0.0   |mobile |UA     |
|8       |u003   |2025-03-03 14:40:00|view       |headphones      |0.0   |mobile |UA     |

In [14]:
df.groupBy("event_type").count().show()


+-----------+-----+
| event_type|count|
+-----------+-----+
|   purchase|    4|
|add_to_cart|    2|
|       view|    8|
|     refund|    1|
+-----------+-----+



In [17]:
df.filter(df.event_type == "purchase") \
  .groupBy("country") \
  .count() \
  .orderBy("count", ascending=False) \
  .limit(3) \
  .show()


+-------+-----+
|country|count|
+-------+-----+
|     UA|    1|
|     RO|    1|
|     MD|    1|
+-------+-----+



In [18]:
df.filter(df.event_type.isin("purchase", "refund")) \
  .select(F.sum("price")) \
  .show()


+----------+
|sum(price)|
+----------+
|    2547.0|
+----------+



In [23]:
views_per_user = (
    df.filter(df.event_type == "view")
      .groupBy("user_id")
      .count()
)

views_gt_1 = views_per_user.filter("count > 1")

buyers = (
    df.filter(df.event_type == "purchase")
      .select("user_id")
      .distinct()
)

result = views_gt_1.join(buyers, on="user_id", how="left_anti")

result.show()


In [24]:
views_gt_1 = views_per_user.filter("count > 1")
