In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col,trim, to_timestamp, date_format
from pyspark.sql.window import Window
#from pyspark.sql import functions as F 

In [2]:
# To build the sparkSession
spark = SparkSession.builder\
.appName("PySpark Assignment")\
.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Raw data file
file_path = "/home/raghava/Projects/Bigdata_class/Input_data/web_visits_data.txt"

# Reading the data
sales_data = spark.read.option("inferschema", True).csv(file_path, header=True, sep=",")

In [4]:

for col_name in sales_data.columns:
    sales_data = sales_data.withColumn(col_name, trim(col(col_name)))

# add timestamp datatype to timestamp column 
stamped_sales_data = sales_data.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

# Change the time stamp format
time_stamped_data = stamped_sales_data.withColumn("Sys_date", date_format(col("timestamp"), "MM/dd/yyyy HH:mm:ss"))    

# Specify the HDFS path
hdfs_path = "hdfs://127.0.0.1:9000/raghava/data/enriched"

# Write the DataFrame to HDFS in Parquet format
stamped_sales_data.write.parquet(hdfs_path, mode='overwrite')

                                                                                

#### Read Data from HDFS

In [3]:
# Specify the HDFS path
hdfs_path = "hdfs://127.0.0.1:9000/raghava/data/enriched/part-00000-b428cee2-cdcd-4c60-a993-8734878e39ad-c000.snappy.parquet"

hdfs_parq = spark.read.option("inferschema", True).parquet(hdfs_path, header=True)

                                                                                

### 1-The total number of visits for each title:-

In [None]:
# Creating temp table view for querying
hdfs_parq.createOrReplaceTempView("hdfs_parq_view")

# Querying the data
visit_counts = spark.sql("SELECT title, COUNT(visitId) AS num_visits FROM hdfs_parq_view GROUP BY title")

# Number of visits for each title
visit_counts.show(truncate=False)


[Stage 1:>                                                          (0 + 1) / 1]

+------------------------------------------------------+----------+
|title                                                 |num_visits|
+------------------------------------------------------+----------+
|Deal of the Day: Electronics Deals - Best Buy         |994       |
|Clearance Electronics Outlet - Best Buy               |982       |
|Member Deals - Best Buy                               |969       |
|Best Buy | Official Online Store | Shop Now &amp; Save|1007      |
|Cart - Best Buy                                       |1018      |
|Schedule a Service - Best Buy                         |975       |
|Best Buy Top Deals                                    |983       |
|Best Buy Support & Customer Service                   |994       |
|Track Your Repair - Best Buy                          |1042      |
|Remote Support: Geek Squad - Best Buy                 |1036      |
+------------------------------------------------------+----------+



                                                                                

### 2-The Hour of the Day with most visits overall:-


In [51]:
# Querying the data
most_visit_hour = spark.sql("""
    SELECT HOUR(timestamp) AS hour_of_day, COUNT(*) AS num_visits
    FROM hdfs_parq_view
    GROUP BY HOUR(timestamp)
    ORDER BY num_visits DESC
    Limit(1)
""")

# Most number of visits in the hour of the day
most_visit_hour.show()

+-----------+----------+
|hour_of_day|num_visits|
+-----------+----------+
|          4|       474|
+-----------+----------+



#### 3-Find out the User with most visits:-

In [52]:
# Querying the data
most_visited_user = spark.sql("""
    SELECT name, COUNT(name) AS visit_times 
    FROM hdfs_parq_view 
    GROUP BY name
    ORDER BY visit_times DESC
    LIMIT(1)                          
""")


# Most number of visiting user
most_visited_user.show()

+---------+-----------+
|     name|visit_times|
+---------+-----------+
|ANVITAD23|        697|
+---------+-----------+



#### 3-Find out the User with most visits for 'Remote Support: Geek Squad - Best Buy':-

In [53]:
# Querying the data
user_for_remote_support = spark.sql("""
    SELECT name, COUNT(*) as visit_for_emote
    FROM hdfs_parq_view 
    WHERE title = 'Remote Support: Geek Squad - Best Buy'                              
    GROUP BY name
    ORDER BY visit_for_emote DESC
    LIMIT(1) 
""")


# Most number of visiting user for 'Remote Support:
user_for_remote_support.show()

+-----------------+---------------+
|             name|visit_for_emote|
+-----------------+---------------+
|MANIDEEPBOYAPATI9|             84|
+-----------------+---------------+



#### 4-Both 'Best Buy Support & Customer Service' and 'Remote Support: Geek Squad - Best Buy' 

In [54]:
# Querying the data
user_for_remote_bestbuy_support = spark.sql("""
    SELECT name, COUNT(*) as visit_for_both
    FROM hdfs_parq_view 
    WHERE title IN ('Best Buy Support & Customer Service','Remote Support: Geek Squad - Best Buy')
    GROUP BY name
    ORDER BY visit_for_both DESC
    LIMIT(1)
""")


# Most number of visiting user for 'Remote Support:
user_for_remote_bestbuy_support.show(truncate=False)

+---------------+--------------+
|name           |visit_for_both|
+---------------+--------------+
|SRIKANTHPITTA18|153           |
+---------------+--------------+



#### 5-Both 'Best Buy Support & Customer Service' and 'Schedule a Service - Best Buy'

In [55]:
# Querying the data
user_for_remote_bestbuy_support = spark.sql("""
    SELECT name, COUNT(*) as visit_for_both
    FROM hdfs_parq_view 
    WHERE title IN ('Best Buy Support & Customer Service','Schedule a Service - Best Buy')
    GROUP BY name
    ORDER BY visit_for_both DESC
    LIMIT(1)
""")

# Most number of visiting user for 'Remote Support:
user_for_remote_bestbuy_support.show(truncate=False)

+---------------+--------------+
|name           |visit_for_both|
+---------------+--------------+
|SRIKANTHPITTA18|150           |
+---------------+--------------+



#### 6-The User who has the longest time interval between visits

In [6]:
# Querying the data
query = """
WITH visit_diffs AS (
    SELECT 
        name,
        timestamp,
        LAG(timestamp) OVER (PARTITION BY name ORDER BY timestamp) AS prev_timestamp,
        UNIX_TIMESTAMP(timestamp) - UNIX_TIMESTAMP(LAG(timestamp) OVER (PARTITION BY name ORDER BY timestamp)) AS time_diff
    FROM hdfs_parq_view
    )

    SELECT name, ROUND(MAX(time_diff)/3600, 2) AS max_interval_hrs
    FROM visit_diffs
    GROUP BY name
    ORDER BY max_interval_hrs DESC
    LIMIT(1)
"""

long_interval_user = spark.sql(query)

# Most interval time user visiting
long_interval_user.show(truncate=False)

+--------+----------------+
|name    |max_interval_hrs|
+--------+----------------+
|SASI2025|0.35            |
+--------+----------------+



In [5]:
# Querying the data
query = """
WITH visit_diffs AS (
    SELECT 
        name,
        timestamp,
        LAG(timestamp) OVER (PARTITION BY name ORDER BY timestamp) AS prev_timestamp,
        UNIX_TIMESTAMP(timestamp) - UNIX_TIMESTAMP(LAG(timestamp) OVER (PARTITION BY name ORDER BY timestamp)) AS time_diff
    FROM hdfs_parq_view
    )

    SELECT name, ROUND(MIN(time_diff)/3600, 2) AS min_interval_hrs
    FROM visit_diffs
    GROUP BY name
    ORDER BY min_interval_hrs
    LIMIT(1)
"""

less_interval_user = spark.sql(query)

# Most interval time user visiting
less_interval_user.show(truncate=False)

+------+----------------+
|name  |min_interval_hrs|
+------+----------------+
|ABAGAM|0.0             |
+------+----------------+



                                                                                