### 0. Load Dataset and Create DataFrame

In [0]:

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, to_timestamp, minute, avg, countDistinct, rank, desc
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.appName("WebTraffic").getOrCreate()

web_data = [
    Row(UserID=1, Page="Home", Timestamp="2024-04-10 10:00:00", Duration=35, Device="Mobile", Country="India"),
    Row(UserID=2, Page="Products", Timestamp="2024-04-10 10:02:00", Duration=120, Device="Desktop", Country="USA"),
    Row(UserID=3, Page="Cart", Timestamp="2024-04-10 10:05:00", Duration=45, Device="Tablet", Country="UK"),
    Row(UserID=1, Page="Checkout", Timestamp="2024-04-10 10:08:00", Duration=60, Device="Mobile", Country="India"),
    Row(UserID=4, Page="Home", Timestamp="2024-04-10 10:10:00", Duration=15, Device="Mobile", Country="Canada"),
    Row(UserID=2, Page="Contact", Timestamp="2024-04-10 10:15:00", Duration=25, Device="Desktop", Country="USA"),
    Row(UserID=5, Page="Products", Timestamp="2024-04-10 10:20:00", Duration=90, Device="Desktop", Country="India"),
]

df_web = spark.createDataFrame(web_data)
df_web.show(truncate=False)


+------+--------+-------------------+--------+-------+-------+
|UserID|Page    |Timestamp          |Duration|Device |Country|
+------+--------+-------------------+--------+-------+-------+
|1     |Home    |2024-04-10 10:00:00|35      |Mobile |India  |
|2     |Products|2024-04-10 10:02:00|120     |Desktop|USA    |
|3     |Cart    |2024-04-10 10:05:00|45      |Tablet |UK     |
|1     |Checkout|2024-04-10 10:08:00|60      |Mobile |India  |
|4     |Home    |2024-04-10 10:10:00|15      |Mobile |Canada |
|2     |Contact |2024-04-10 10:15:00|25      |Desktop|USA    |
|5     |Products|2024-04-10 10:20:00|90      |Desktop|India  |
+------+--------+-------------------+--------+-------+-------+



### 1. Display Schema

In [0]:
df_web.printSchema()

root
 |-- UserID: long (nullable = true)
 |-- Page: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Duration: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Country: string (nullable = true)



### 2. Convert Timestamp to `timestamp` type

In [0]:
df_web = df_web.withColumn("Timestamp", to_timestamp("Timestamp"))

### 3. Extract minute as `SessionMinute`

In [0]:
df_web = df_web.withColumn("SessionMinute", minute("Timestamp"))
df_web.select("UserID", "Timestamp", "SessionMinute").show()

+------+-------------------+-------------+
|UserID|          Timestamp|SessionMinute|
+------+-------------------+-------------+
|     1|2024-04-10 10:00:00|            0|
|     2|2024-04-10 10:02:00|            2|
|     3|2024-04-10 10:05:00|            5|
|     1|2024-04-10 10:08:00|            8|
|     4|2024-04-10 10:10:00|           10|
|     2|2024-04-10 10:15:00|           15|
|     5|2024-04-10 10:20:00|           20|
+------+-------------------+-------------+



### 4. Filter: Mobile users on Checkout page

In [0]:
df_web.filter((col("Device") == "Mobile") & (col("Page") == "Checkout")).show()

+------+--------+-------------------+--------+------+-------+-------------+
|UserID|    Page|          Timestamp|Duration|Device|Country|SessionMinute|
+------+--------+-------------------+--------+------+-------+-------------+
|     1|Checkout|2024-04-10 10:08:00|      60|Mobile|  India|            8|
+------+--------+-------------------+--------+------+-------+-------------+



### 5. Duration > 60 seconds

In [0]:
df_web = df_web.withColumn("Duration", col("Duration").cast(IntegerType()))
df_web.filter(col("Duration") > 60).show()

+------+--------+-------------------+--------+-------+-------+-------------+
|UserID|    Page|          Timestamp|Duration| Device|Country|SessionMinute|
+------+--------+-------------------+--------+-------+-------+-------------+
|     2|Products|2024-04-10 10:02:00|     120|Desktop|    USA|            2|
|     5|Products|2024-04-10 10:20:00|      90|Desktop|  India|           20|
+------+--------+-------------------+--------+-------+-------+-------------+



### 6. Indian users on Products page

In [0]:
df_web.filter((col("Country") == "India") & (col("Page") == "Products")).show()

+------+--------+-------------------+--------+-------+-------+-------------+
|UserID|    Page|          Timestamp|Duration| Device|Country|SessionMinute|
+------+--------+-------------------+--------+-------+-------+-------------+
|     5|Products|2024-04-10 10:20:00|      90|Desktop|  India|           20|
+------+--------+-------------------+--------+-------+-------+-------------+



### 7. Average duration per device

In [0]:
df_web.groupBy("Device").avg("Duration").withColumnRenamed("avg(Duration)", "AvgDuration").show()

+-------+------------------+
| Device|       AvgDuration|
+-------+------------------+
| Mobile|36.666666666666664|
| Tablet|              45.0|
|Desktop| 78.33333333333333|
+-------+------------------+



### 8. Number of sessions per country

In [0]:
df_web.groupBy("Country").count().withColumnRenamed("count", "SessionCount").show()

+-------+------------+
|Country|SessionCount|
+-------+------------+
|  India|           3|
|    USA|           2|
|     UK|           1|
| Canada|           1|
+-------+------------+



### 9. Most visited page

In [0]:
df_web.groupBy("Page").count().orderBy(col("count").desc()).limit(1).show()

+----+-----+
|Page|count|
+----+-----+
|Home|    2|
+----+-----+



### 10. Rank pages per user by timestamp

In [0]:

window_spec = Window.partitionBy("UserID").orderBy("Timestamp")
df_web.withColumn("Rank", rank().over(window_spec)).select("UserID", "Page", "Timestamp", "Rank").show()


+------+--------+-------------------+----+
|UserID|    Page|          Timestamp|Rank|
+------+--------+-------------------+----+
|     1|    Home|2024-04-10 10:00:00|   1|
|     1|Checkout|2024-04-10 10:08:00|   2|
|     2|Products|2024-04-10 10:02:00|   1|
|     2| Contact|2024-04-10 10:15:00|   2|
|     3|    Cart|2024-04-10 10:05:00|   1|
|     4|    Home|2024-04-10 10:10:00|   1|
|     5|Products|2024-04-10 10:20:00|   1|
+------+--------+-------------------+----+



### 11. Total duration per user

In [0]:
df_web.groupBy("UserID").sum("Duration").withColumnRenamed("sum(Duration)", "TotalDuration").show()

+------+-------------+
|UserID|TotalDuration|
+------+-------------+
|     1|           95|
|     3|           45|
|     2|          145|
|     4|           15|
|     5|           90|
+------+-------------+



### 12. Create temporary view

In [0]:
df_web.createOrReplaceTempView("traffic_view")

### 13. SQL: Top 2 longest sessions

In [0]:

spark.sql("""
    SELECT * FROM traffic_view
    ORDER BY Duration DESC
    LIMIT 2
""").show()


+------+--------+-------------------+--------+-------+-------+-------------+
|UserID|    Page|          Timestamp|Duration| Device|Country|SessionMinute|
+------+--------+-------------------+--------+-------+-------+-------------+
|     2|Products|2024-04-10 10:02:00|     120|Desktop|    USA|            2|
|     5|Products|2024-04-10 10:20:00|      90|Desktop|  India|           20|
+------+--------+-------------------+--------+-------+-------+-------------+



### 14. SQL: Unique users per page

In [0]:

spark.sql("""
    SELECT Page, COUNT(DISTINCT UserID) AS UniqueUsers
    FROM traffic_view
    GROUP BY Page
""").show()


+--------+-----------+
|    Page|UniqueUsers|
+--------+-----------+
|    Cart|          1|
|    Home|          2|
|Checkout|          1|
|Products|          2|
| Contact|          1|
+--------+-----------+



### 15. Save final DataFrame to CSV

In [0]:
df_web.write.mode("overwrite").option("header", True).csv("/tmp/web_traffic_csv")

### 16. Save partitioned by Country in Parquet

In [0]:
df_web.write.mode("overwrite").partitionBy("Country").parquet("/tmp/web_traffic_parquet")