<a href="https://colab.research.google.com/github/MSaiTeja01/BDS_Assignment2/blob/main/BDS_Assignment2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [31]:
# 1. Setup Apache Spark in Google Colab


# Install Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and extract Spark (here we use Spark 3.1.2 with Hadoop 3.2)
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

# Set environment variables for Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

# Install findspark to make Spark available in the Python environment
!pip install -q findspark

import findspark
findspark.init()

# Create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("WebLogAnalysis").getOrCreate()

In [33]:
# 2. Load the Data

# Load each compressed log file as a DataFrame
df_jul = spark.read.text("/content/Access_log_Aug1995.gz")
df_aug = spark.read.text("/content/Access_log_Jul1995.gz")

# Combine both months into one DataFrame
df_all = df_jul.union(df_aug)

In [34]:
# 3. Data Wrangling

# a. Parse the Log Files

from pyspark.sql.functions import regexp_extract, col

# Regular expression pattern based on the Common Log Format
regex_pattern = r'(\S+) (\S+) (\S+) \[(.*?)\] "(.*?)" (\d{3}) (\S+)'

logs_df = df_all.select(
    regexp_extract('value', regex_pattern, 1).alias('remotehost'),
    regexp_extract('value', regex_pattern, 2).alias('rfc931'),
    regexp_extract('value', regex_pattern, 3).alias('authuser'),
    regexp_extract('value', regex_pattern, 4).alias('date'),
    regexp_extract('value', regex_pattern, 5).alias('request'),
    regexp_extract('value', regex_pattern, 6).alias('status'),
    regexp_extract('value', regex_pattern, 7).alias('bytes')
)


In [35]:
# b. Handle Missing or Malformed Data
from pyspark.sql.functions import when

logs_df = logs_df.withColumn("rfc931", when(col("rfc931")=="-", None).otherwise(col("rfc931")))
logs_df = logs_df.withColumn("authuser", when(col("authuser")=="-", None).otherwise(col("authuser")))
logs_df = logs_df.withColumn("bytes", when(col("bytes")=="-", None).otherwise(col("bytes")))
logs_df = logs_df.withColumn("status", col("status").cast("int"))
logs_df = logs_df.withColumn("bytes", when(col("bytes").isNotNull(), col("bytes").cast("int")).otherwise(None))


In [36]:
# c. Convert and Format Date
from pyspark.sql.functions import to_timestamp, date_format

# The date field in the log is in the format: dd/MMM/yyyy:HH:mm:ss (ignoring zone)
# In Spark 3+, the timestamp parser is stricter. We'll handle timezone separately.
logs_df = logs_df.withColumn("timestamp", to_timestamp(col("date").substr(1, 20), "dd/MMM/yyyy:HH:mm:ss")) # Using substring to remove brackets and timezone
logs_df = logs_df.withColumn("date_formatted", date_format(col("timestamp"), "dd-MMM-yyyy"))

In [37]:
# d. Extract the Endpoint from the Request
from pyspark.sql.functions import split

# The request field is typically: "GET /path/resource HTTP/1.0"
logs_df = logs_df.withColumn("endpoint", split(col("request"), " ").getItem(1))


In [38]:
# 4. Analytics

# i. Total Log Records
total_logs = logs_df.count()
print("Total Log Records:", total_logs)

Total Log Records: 3461613


In [39]:
# ii. Count of Unique Hosts
unique_hosts = logs_df.select("remotehost").distinct().count()
print("Unique Hosts:", unique_hosts)

Unique Hosts: 137979


In [40]:
# iii. Date-wise Unique Host Counts
from pyspark.sql.functions import countDistinct

date_unique_hosts = logs_df.groupBy("date_formatted") \
                           .agg(countDistinct("remotehost").alias("unique_hosts")) \
                           .orderBy("date_formatted")
date_unique_hosts.show(truncate=False)

+--------------+------------+
|date_formatted|unique_hosts|
+--------------+------------+
|null          |1           |
|01-Aug-1995   |2582        |
|01-Jul-1995   |5192        |
|02-Jul-1995   |4859        |
|03-Aug-1995   |3222        |
|03-Jul-1995   |7336        |
|04-Aug-1995   |4191        |
|04-Jul-1995   |5524        |
|05-Aug-1995   |2502        |
|05-Jul-1995   |7383        |
|06-Aug-1995   |2538        |
|06-Jul-1995   |7820        |
|07-Aug-1995   |4108        |
|07-Jul-1995   |6474        |
|08-Aug-1995   |4406        |
|08-Jul-1995   |2898        |
|09-Aug-1995   |4317        |
|09-Jul-1995   |2554        |
|10-Aug-1995   |4523        |
|10-Jul-1995   |4464        |
+--------------+------------+
only showing top 20 rows



In [41]:
# iv. Average Requests per Host per Day
from pyspark.sql.functions import count

# Total requests per day
daily_requests = logs_df.groupBy("date_formatted") \
                        .agg(count("request").alias("total_requests"))

# Unique hosts per day (already calculated)
daily_unique_hosts = logs_df.groupBy("date_formatted") \
                            .agg(countDistinct("remotehost").alias("unique_hosts"))

# Join and calculate average
daily_avg = daily_requests.join(daily_unique_hosts, "date_formatted") \
                          .withColumn("avg_requests_per_host", col("total_requests") / col("unique_hosts")) \
                          .orderBy("date_formatted")
daily_avg.show(truncate=False)

+--------------+--------------+------------+---------------------+
|date_formatted|total_requests|unique_hosts|avg_requests_per_host|
+--------------+--------------+------------+---------------------+
|01-Aug-1995   |33996         |2582        |13.166537567776917   |
|01-Jul-1995   |64714         |5192        |12.464175654853621   |
|02-Jul-1995   |60265         |4859        |12.40275776908829    |
|03-Aug-1995   |41388         |3222        |12.845437616387336   |
|03-Jul-1995   |89584         |7336        |12.211559432933479   |
|04-Aug-1995   |59557         |4191        |14.210689572894298   |
|04-Jul-1995   |70452         |5524        |12.753801593048516   |
|05-Aug-1995   |31893         |2502        |12.747002398081534   |
|05-Jul-1995   |94575         |7383        |12.809833401056482   |
|06-Aug-1995   |32420         |2538        |12.77383766745469    |
|06-Jul-1995   |100960        |7820        |12.910485933503836   |
|07-Aug-1995   |57362         |4108        |13.9634858812074  

In [42]:
# v. Number of 404 Response Codes
errors_404 = logs_df.filter(col("status") == 404).count()
print("Total 404 Errors:", errors_404)

Total 404 Errors: 20901


In [43]:
# vi. Top 15 Endpoints with 404 Responses
from pyspark.sql.functions import desc

endpoints_404 = logs_df.filter(col("status") == 404) \
                       .groupBy("endpoint") \
                       .agg(count("endpoint").alias("error_count")) \
                       .orderBy(desc("error_count")) \
                       .limit(15)
endpoints_404.show(truncate=False)

+-----------------------------------------------------------------+-----------+
|endpoint                                                         |error_count|
+-----------------------------------------------------------------+-----------+
|/pub/winvn/readme.txt                                            |2004       |
|/pub/winvn/release.txt                                           |1732       |
|/shuttle/missions/STS-69/mission-STS-69.html                     |683        |
|/shuttle/missions/sts-68/ksc-upclose.gif                         |428        |
|/history/apollo/a-001/a-001-patch-small.gif                      |384        |
|/history/apollo/sa-1/sa-1-patch-small.gif                        |383        |
|/://spacelink.msfc.nasa.gov                                      |381        |
|/images/crawlerway-logo.gif                                      |374        |
|/elv/DELTA/uncons.htm                                            |372        |
|/history/apollo/pad-abort-test-1/pad-ab

In [44]:
# vii. Top 15 Hosts with 404 Responses
hosts_404 = logs_df.filter(col("status") == 404) \
                   .groupBy("remotehost") \
                   .agg(count("remotehost").alias("error_count")) \
                   .orderBy(desc("error_count")) \
                   .limit(15)
hosts_404.show(truncate=False)

+---------------------------+-----------+
|remotehost                 |error_count|
+---------------------------+-----------+
|hoohoo.ncsa.uiuc.edu       |251        |
|piweba3y.prodigy.com       |157        |
|jbiagioni.npt.nuwc.navy.mil|132        |
|piweba1y.prodigy.com       |114        |
|www-d4.proxy.aol.com       |91         |
|piweba4y.prodigy.com       |86         |
|scooter.pa-x.dec.com       |69         |
|www-d1.proxy.aol.com       |64         |
|phaelon.ksc.nasa.gov       |64         |
|dialip-217.den.mmc.com     |62         |
|www-b4.proxy.aol.com       |62         |
|www-b3.proxy.aol.com       |61         |
|www-a2.proxy.aol.com       |60         |
|titan02f                   |59         |
|piweba2y.prodigy.com       |59         |
+---------------------------+-----------+

