1.reading a csv file and creating a dataframe.
2.Applying transformation like, removing nulls, converting string type timestamp to time format.
3.Checking if any rows failed to convert into timestamps.
4.Filtering the rows that are not converted into timestamp.
5.Adding Date and Hours columns.
6.Finding the count of each action user did on each date.

In [0]:
df = spark.read.csv("/Volumes/workspace/default/sampledataset/user_activity_logs.csv", header = True, inferSchema = True)
df.display()
df.printSchema()

user_id,action,timestamp,category,device
1001,viewed,7/25/2025 10:12,Electronics,Mobile
1001,added_to_cart,7/25/2025 10:15,Electronics,Mobile
1001,purchased,7/25/2025 10:18,Electronics,Mobile
1002,viewed,7/25/2025 11:05,Books,Laptop
1002,viewed,7/25/2025 11:08,Books,Laptop
1003,viewed,7/25/2025 11:10,Clothing,Tablet


root
 |-- user_id: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- category: string (nullable = true)
 |-- device: string (nullable = true)



In [0]:
from pyspark.sql.functions import *

#drop nulls
df_clean = df.dropna()
df_clean = df_clean.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"))

# add date and hour columns
df_clean = df_clean\
.withColumn("date", to_date("timestamp"))\
.withColumn("hour", hour("timestamp"))
df_clean.display()



In [0]:
from pyspark.sql.functions import *

# Explicit format for timestamp (safe conversion)
# Your timestamp format is: "yyyy-MM-dd HH:mm:ss"
df_clean = df.withColumn(
    "timestamp_clean",
    to_timestamp("timestamp", "M/d/yyyy HH:mm")
)

# Check if any rows failed to convert
df_clean.select("timestamp", "timestamp_clean").show(truncate=False)

# Filter out rows where timestamp couldn't be parsed
df_clean = df_clean.filter(col("timestamp_clean").isNotNull())

# Add date and hour columns
df_clean = df_clean \
    .withColumn("date", to_date("timestamp_clean")) \
    .withColumn("hour", hour("timestamp_clean"))

display(df_clean)


+---------------+-------------------+
|timestamp      |timestamp_clean    |
+---------------+-------------------+
|7/25/2025 10:12|2025-07-25 10:12:00|
|7/25/2025 10:15|2025-07-25 10:15:00|
|7/25/2025 10:18|2025-07-25 10:18:00|
|7/25/2025 11:05|2025-07-25 11:05:00|
|7/25/2025 11:08|2025-07-25 11:08:00|
|7/25/2025 11:10|2025-07-25 11:10:00|
+---------------+-------------------+



user_id,action,timestamp,category,device,timestamp_clean,date,hour
1001,viewed,7/25/2025 10:12,Electronics,Mobile,2025-07-25T10:12:00.000Z,2025-07-25,10
1001,added_to_cart,7/25/2025 10:15,Electronics,Mobile,2025-07-25T10:15:00.000Z,2025-07-25,10
1001,purchased,7/25/2025 10:18,Electronics,Mobile,2025-07-25T10:18:00.000Z,2025-07-25,10
1002,viewed,7/25/2025 11:05,Books,Laptop,2025-07-25T11:05:00.000Z,2025-07-25,11
1002,viewed,7/25/2025 11:08,Books,Laptop,2025-07-25T11:08:00.000Z,2025-07-25,11
1003,viewed,7/25/2025 11:10,Clothing,Tablet,2025-07-25T11:10:00.000Z,2025-07-25,11


In [0]:
df_clean = df_clean.groupBy("user_id", "date", "action").agg(count("*").alias("action_count"))
display(df_clean)

user_id,date,action,action_count
1002,2025-07-25,viewed,2
1001,2025-07-25,purchased,1
1001,2025-07-25,viewed,1
1001,2025-07-25,added_to_cart,1
1003,2025-07-25,viewed,1


In [0]:
df_clean = df_clean.groupBy("user_id", "date")\
    .pivot('action', ['viewed', 'purchased', 'added_to_cart'])\
    .sum('action_count')\
    .fillna(0)
display(df_clean)


user_id,date,viewed,purchased,added_to_cart
1001,2025-07-25,1,1,1
1003,2025-07-25,1,0,0
1002,2025-07-25,2,0,0


In [0]:
df_clean.write.mode("overwrite").parquet("/Volumes/workspace/default/sampledataset/user_activity_logs")
#df_clean.write.format("delta").saveAsTable("user_activity_logs")

In [0]:
output_path = "/Volumes/workspace/default/sampledataset/user_activity_logs/"
print(f"✅ Final result written to: {output_path}")
display(dbutils.fs.ls(output_path))

✅ Final result written to: /Volumes/workspace/default/sampledataset/user_activity_logs/


path,name,size,modificationTime
dbfs:/Volumes/workspace/default/sampledataset/user_activity_logs/_SUCCESS,_SUCCESS,0,1754284974000
dbfs:/Volumes/workspace/default/sampledataset/user_activity_logs/_committed_5527395433473826661,_committed_5527395433473826661,124,1754284973000
dbfs:/Volumes/workspace/default/sampledataset/user_activity_logs/_started_5527395433473826661,_started_5527395433473826661,0,1754284973000
dbfs:/Volumes/workspace/default/sampledataset/user_activity_logs/part-00000-tid-5527395433473826661-45c8f626-ca11-44a2-9727-07e01c5b5295-136-1.c000.snappy.parquet,part-00000-tid-5527395433473826661-45c8f626-ca11-44a2-9727-07e01c5b5295-136-1.c000.snappy.parquet,1571,1754284973000
