In [1]:
# Dataset Initialization
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, avg, countDistinct, to_timestamp, minute, rank
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("WebTrafficData").getOrCreate()
web_data = [
    Row(UserID=1, Page="Home", Timestamp="2024-04-10 10:00:00", Duration=35, Device="Mobile", Country="India"),
    Row(UserID=2, Page="Products", Timestamp="2024-04-10 10:02:00", Duration=120, Device="Desktop", Country="USA"),
    Row(UserID=3, Page="Cart", Timestamp="2024-04-10 10:05:00", Duration=45, Device="Tablet", Country="UK"),
    Row(UserID=1, Page="Checkout", Timestamp="2024-04-10 10:08:00", Duration=60, Device="Mobile", Country="India"),
    Row(UserID=4, Page="Home", Timestamp="2024-04-10 10:10:00", Duration=15, Device="Mobile", Country="Canada"),
    Row(UserID=2, Page="Contact", Timestamp="2024-04-10 10:15:00", Duration=25, Device="Desktop", Country="USA"),
    Row(UserID=5, Page="Products", Timestamp="2024-04-10 10:20:00", Duration=90, Device="Desktop", Country="India"),
]
df_web = spark.createDataFrame(web_data)
df_web.show(truncate=False)


+------+--------+-------------------+--------+-------+-------+
|UserID|Page    |Timestamp          |Duration|Device |Country|
+------+--------+-------------------+--------+-------+-------+
|1     |Home    |2024-04-10 10:00:00|35      |Mobile |India  |
|2     |Products|2024-04-10 10:02:00|120     |Desktop|USA    |
|3     |Cart    |2024-04-10 10:05:00|45      |Tablet |UK     |
|1     |Checkout|2024-04-10 10:08:00|60      |Mobile |India  |
|4     |Home    |2024-04-10 10:10:00|15      |Mobile |Canada |
|2     |Contact |2024-04-10 10:15:00|25      |Desktop|USA    |
|5     |Products|2024-04-10 10:20:00|90      |Desktop|India  |
+------+--------+-------------------+--------+-------+-------+



In [2]:
# 1. Display the schema of web_traffic_data
df_web.printSchema()

root
 |-- UserID: long (nullable = true)
 |-- Page: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Duration: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Country: string (nullable = true)



In [3]:
# 2. Convert the Timestamp column to a proper timestamp type
df_web = df_web.withColumn('Timestamp', to_timestamp('Timestamp'))
df_web.printSchema()

root
 |-- UserID: long (nullable = true)
 |-- Page: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Duration: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Country: string (nullable = true)



In [4]:
# 3. Add a new column SessionMinute by extracting the minute from the Timestamp
df_web = df_web.withColumn('SessionMinute', minute('Timestamp'))
df_web.show()

+------+--------+-------------------+--------+-------+-------+-------------+
|UserID|    Page|          Timestamp|Duration| Device|Country|SessionMinute|
+------+--------+-------------------+--------+-------+-------+-------------+
|     1|    Home|2024-04-10 10:00:00|      35| Mobile|  India|            0|
|     2|Products|2024-04-10 10:02:00|     120|Desktop|    USA|            2|
|     3|    Cart|2024-04-10 10:05:00|      45| Tablet|     UK|            5|
|     1|Checkout|2024-04-10 10:08:00|      60| Mobile|  India|            8|
|     4|    Home|2024-04-10 10:10:00|      15| Mobile| Canada|           10|
|     2| Contact|2024-04-10 10:15:00|      25|Desktop|    USA|           15|
|     5|Products|2024-04-10 10:20:00|      90|Desktop|  India|           20|
+------+--------+-------------------+--------+-------+-------+-------------+



In [5]:
# 4. Filter users who used a 'Mobile' device and visited the 'Checkout' page
df_web.filter((col('Device') == 'Mobile') & (col('Page') == 'Checkout')).show()

+------+--------+-------------------+--------+------+-------+-------------+
|UserID|    Page|          Timestamp|Duration|Device|Country|SessionMinute|
+------+--------+-------------------+--------+------+-------+-------------+
|     1|Checkout|2024-04-10 10:08:00|      60|Mobile|  India|            8|
+------+--------+-------------------+--------+------+-------+-------------+



In [6]:
# 5. Show all entries with a Duration greater than 60 seconds
df_web.filter(col('Duration') > 60).show()

+------+--------+-------------------+--------+-------+-------+-------------+
|UserID|    Page|          Timestamp|Duration| Device|Country|SessionMinute|
+------+--------+-------------------+--------+-------+-------+-------------+
|     2|Products|2024-04-10 10:02:00|     120|Desktop|    USA|            2|
|     5|Products|2024-04-10 10:20:00|      90|Desktop|  India|           20|
+------+--------+-------------------+--------+-------+-------+-------------+



In [7]:
# 6. Find all users from India who visited the 'Products' page
df_web.filter((col('Country') == 'India') & (col('Page') == 'Products')).show()

+------+--------+-------------------+--------+-------+-------+-------------+
|UserID|    Page|          Timestamp|Duration| Device|Country|SessionMinute|
+------+--------+-------------------+--------+-------+-------+-------------+
|     5|Products|2024-04-10 10:20:00|      90|Desktop|  India|           20|
+------+--------+-------------------+--------+-------+-------+-------------+



In [8]:
# 7. Get the average duration per device type
df_web.groupBy('Device').avg('Duration').show()

+-------+------------------+
| Device|     avg(Duration)|
+-------+------------------+
| Mobile|36.666666666666664|
| Tablet|              45.0|
|Desktop| 78.33333333333333|
+-------+------------------+



In [9]:
# 8. Count the number of sessions per country
df_web.groupBy('Country').count().withColumnRenamed('count', 'SessionCount').show()

+-------+------------+
|Country|SessionCount|
+-------+------------+
|  India|           3|
|    USA|           2|
|     UK|           1|
| Canada|           1|
+-------+------------+



In [10]:
# 9. Find the most visited page overall
df_web.groupBy('Page').count().orderBy(col('count').desc()).show(1)

+----+-----+
|Page|count|
+----+-----+
|Home|    2|
+----+-----+
only showing top 1 row



In [11]:
# 10. Rank each user’s pages by timestamp (oldest to newest)
windowSpec = Window.partitionBy('UserID').orderBy('Timestamp')
df_web.withColumn('Rank', rank().over(windowSpec)).show()

+------+--------+-------------------+--------+-------+-------+-------------+----+
|UserID|    Page|          Timestamp|Duration| Device|Country|SessionMinute|Rank|
+------+--------+-------------------+--------+-------+-------+-------------+----+
|     1|    Home|2024-04-10 10:00:00|      35| Mobile|  India|            0|   1|
|     1|Checkout|2024-04-10 10:08:00|      60| Mobile|  India|            8|   2|
|     2|Products|2024-04-10 10:02:00|     120|Desktop|    USA|            2|   1|
|     2| Contact|2024-04-10 10:15:00|      25|Desktop|    USA|           15|   2|
|     3|    Cart|2024-04-10 10:05:00|      45| Tablet|     UK|            5|   1|
|     4|    Home|2024-04-10 10:10:00|      15| Mobile| Canada|           10|   1|
|     5|Products|2024-04-10 10:20:00|      90|Desktop|  India|           20|   1|
+------+--------+-------------------+--------+-------+-------+-------------+----+



In [12]:
# 11. Find the total duration of all sessions per user using groupBy
df_web.groupBy('UserID').sum('Duration').withColumnRenamed('sum(Duration)', 'TotalDuration').show()

+------+-------------+
|UserID|TotalDuration|
+------+-------------+
|     1|           95|
|     3|           45|
|     2|          145|
|     5|           90|
|     4|           15|
+------+-------------+



In [13]:
# 12. Create a temporary view called traffic_view
df_web.createOrReplaceTempView('traffic_view')

In [14]:
# 13. Write a SQL query to get the top 2 longest sessions by duration
spark.sql('SELECT * FROM traffic_view ORDER BY Duration DESC LIMIT 2').show()

+------+--------+-------------------+--------+-------+-------+-------------+
|UserID|    Page|          Timestamp|Duration| Device|Country|SessionMinute|
+------+--------+-------------------+--------+-------+-------+-------------+
|     2|Products|2024-04-10 10:02:00|     120|Desktop|    USA|            2|
|     5|Products|2024-04-10 10:20:00|      90|Desktop|  India|           20|
+------+--------+-------------------+--------+-------+-------+-------------+



In [15]:
# 14. Get the number of unique users per page using SQL
spark.sql('SELECT Page, COUNT(DISTINCT UserID) as UniqueUsers FROM traffic_view GROUP BY Page').show()

+--------+-----------+
|    Page|UniqueUsers|
+--------+-----------+
|    Cart|          1|
|    Home|          2|
|Checkout|          1|
|Products|          2|
| Contact|          1|
+--------+-----------+



In [16]:
# 15. Save the final DataFrame to CSV
df_web.write.mode('overwrite').csv('/tmp/web_traffic_csv', header=True)

In [17]:
# 16. Save partitioned by Country in Parquet format
df_web.write.mode('overwrite').partitionBy('Country').parquet('/tmp/web_traffic_parquet')