 # **Final Assignment**

**Anna Tigranyan💚**

## **Task**

**Calculate**
- number of sessions
- number of events per session
- number of sessions that has orders
- number of unique products that was ordered

- number of orders per day-hour
- distribution of orders per hour
- click to order ratio per session
- distribution of session duration

In [122]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F


In [123]:
# Create a SparkSession
spark = SparkSession.builder.appName("BigDataHW").getOrCreate()

# Load the data
df = spark.read.csv("train_1000.csv", header=True, inferSchema=True)

df.show(10)

+-------+-------------+------+-------+
|    aid|           ts|  type|session|
+-------+-------------+------+-------+
|1517085|1659304800025|clicks|      0|
|1563459|1659304904511|clicks|      0|
|1309446|1659367439426|clicks|      0|
|  16246|1659367719997|clicks|      0|
|1781822|1659367871344|clicks|      0|
|1152674|1659367885796|clicks|      0|
|1649869|1659369893840| carts|      0|
| 461689|1659369898050| carts|      0|
| 305831|1659370027105|orders|      0|
| 461689|1659370027105|orders|      0|
+-------+-------------+------+-------+
only showing top 10 rows



## Number of sessions

In [124]:
# counting the number of unique sessions
num_sessions = df.select("session").distinct().count()

print("Number of unique sessions:", num_sessions)

Number of unique sessions: 1000


## Number of events per session

In [125]:
# groupby the 'session' column
num_sessions = df.groupby("session").agg(F.count("*").alias("session_count"))
# display the result
num_sessions.show(10)

+-------+-------------+
|session|session_count|
+-------+-------------+
|    148|          217|
|    463|            5|
|    471|            7|
|    496|          120|
|    833|           32|
|    243|           67|
|    392|           12|
|    540|           45|
|    623|            2|
|    737|            4|
+-------+-------------+
only showing top 10 rows



## Number of sessions that has orders

In [126]:
# filtering the dataset to include only rows where the "type"  is "orders"
orders_df = df.filter(df["type"] == "orders")

# counting the number of unique sessions in orders_df
ord_sessions = orders_df.select("session").distinct().count()

print("Number of sessions that has orders:", ord_sessions)

Number of sessions that has orders: 350


## Number of unique products that was ordered

We already have **orders_df** dataset which includers only rows where the "type" column is "orders".
So we only need to get the distinct product IDs and counting the number of unique IDs.

In [127]:
# getting the count of unique IDs in orders_df dataset
num_distinct_products = orders_df.select("aid").distinct().count()

print("Number of unique products that were ordered:", num_distinct_products)

Number of unique products that were ordered: 1608


## Number of orders per day-hour

We can calculate the number of orders per day-hour by:
*  first extracting the day and hour from the timestamp column,
* grouping the dataset by day and hour,
* and then counting the number of orders in each group.



In [128]:
# extracting the day and hour from the ts column
ord_df = orders_df.withColumn("day", F.dayofmonth(F.from_unixtime(F.col("ts")/1000)))
ord_df = ord_df.withColumn("hour", F.hour(F.from_unixtime(F.col("ts")/1000)))

In [129]:
# groupping the dataset above by day and hour and counting the number of orders for this each group
orders_per_day_hour = ord_df.groupBy("day","hour").agg(F.count("*").alias("num_orders"))

In [130]:
# showing the result
orders_per_day_hour.sort("day").show(10)

+---+----+----------+
|day|hour|num_orders|
+---+----+----------+
|  1|   8|        14|
|  1|  15|         5|
|  1|  16|        15|
|  1|   0|        21|
|  1|  19|        16|
|  1|  20|        21|
|  1|  23|         1|
|  1|  22|        27|
|  1|  18|        20|
|  1|  11|        19|
+---+----+----------+
only showing top 10 rows



## Distribution of orders per hour

To find the distribution of **orders per hour** we are going to:
* convert the ts column from bigint to timestamp in orders_df
* use the hour function to extract the hour from the timestamp column in orders_df
* group the data frame by the extracted hour and use the count function to count the number of orders for each hour.

In [131]:
# Converting the ts column from bigint to timestamp
dist_ord = orders_df.withColumn("ts", from_unixtime(orders_df["ts"]/1000).cast("timestamp"))

# Extracting the hour from the timestamp column
dist_ord = dist_ord.withColumn("hour", hour(dist_ord["ts"]))

# Groupping the data by the hour 
dist_ord = dist_ord.groupBy("hour").agg(count("*").alias("num_orders"))

# Showing the sorted result
dist_ord.sort('hour').show()


+----+----------+
|hour|num_orders|
+----+----------+
|   0|        45|
|   1|        10|
|   2|        10|
|   3|         2|
|   4|        25|
|   5|        24|
|   6|        33|
|   7|        54|
|   8|        58|
|   9|        72|
|  10|        99|
|  11|        79|
|  12|        66|
|  13|        63|
|  14|        39|
|  15|        48|
|  16|        79|
|  17|        74|
|  18|        84|
|  19|       114|
+----+----------+
only showing top 20 rows



 ## Click to order ratio per session

To calculate the click to order ratio per session we are going to:
* create a new data frame that **groups** the events by the **session** and **type**.
* calculate the number of clicks and orders per session
* divide the number of clicks by the number of orders to get the click to order ratio. 

In [132]:
# Creating a data frame that groups the events by the session and type
sess_type_df = df.groupBy("session", "type").agg(F.count("*").alias("num_events"))

# Creating a data frame that contains the number of clicks and orders per session
sess_cl_ord_df = sess_type_df.filter("type in ('clicks', 'orders')").groupBy("session").pivot("type").sum("num_events")

# Createinga new column that calculates the click to order ratio per session
sess_cl_ord_df = sess_cl_ord_df.withColumn("click_to_order_ratio", col("clicks")/col("orders"))


If the session doesn't containing click or order then in the data set below click_to_order_ratio will be null.

In [133]:
sess_cl_ord_df.show(5)

+-------+------+------+--------------------+
|session|clicks|orders|click_to_order_ratio|
+-------+------+------+--------------------+
|    148|   185|     8|              23.125|
|    463|     2|     2|                 1.0|
|    833|    26|     2|                13.0|
|    496|   117|  null|                null|
|    471|     7|  null|                null|
+-------+------+------+--------------------+
only showing top 5 rows



In [134]:
# sess_cl_ord_df sorted by click_to_order_ratio column(in descending order)
sess_cl_ord_df.sort('click_to_order_ratio',ascending=False).show(10)

+-------+------+------+--------------------+
|session|clicks|orders|click_to_order_ratio|
+-------+------+------+--------------------+
|    606|   422|     1|               422.0|
|    725|   377|     1|               377.0|
|     94|   319|     1|               319.0|
|    580|   308|     1|               308.0|
|     48|   296|     1|               296.0|
|    607|   277|     1|               277.0|
|    311|   271|     1|               271.0|
|    965|   243|     1|               243.0|
|    203|   227|     1|               227.0|
|    388|   223|     1|               223.0|
+-------+------+------+--------------------+
only showing top 10 rows



## Distribution of session duration

To calculate the distribution of session duration we are going to:
* convert the ts column type to timestamp in df
* calculate the duration of each session:create start time and end time columns using the session min and max ts-s and take the difference of this 2 as duration of the session.
* present a session numbers for per duration 

In [135]:
# Converting the ts column from bigint to timestamp
df = df.withColumn("ts", from_unixtime(df["ts"]/1000).cast("timestamp"))

# Getting the minimum and maximum timestamp per session(start_time,end_time)
sess_ts_df = df.groupBy("session").agg(F.min("ts").alias("start_time"), F.max("ts").alias("end_time"))

# Calculate the duration of each session in seconds
sess_ts_df = sess_ts_df.withColumn("duration", F.unix_timestamp("end_time") - F.unix_timestamp("start_time"))

# Groupping the sessions by duration
sess_duration_df = sess_ts_df.groupBy("duration").agg(F.count("*").alias("num_sessions"))

# Sort the sessions by duration
sess_duration_df = sess_duration_df.sort("duration")

# Show the distribution of session duration
sess_duration_df.show(10)



+--------+------------+
|duration|num_sessions|
+--------+------------+
|       2|           1|
|      10|           1|
|      13|           1|
|      17|           2|
|      21|           1|
|      22|           1|
|      23|           2|
|      25|           1|
|      26|           1|
|      27|           1|
+--------+------------+
only showing top 10 rows



**The End!
<br>
Thank You for the interesting Assignment!!!**