<a href="https://colab.research.google.com/github/ankits2001/SJSU-DATA201-Project/blob/main/Sharma_Log_File_Handling_in_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install pyspark and download the 7 log files

In [1]:
!wget -nc https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_1.log.gz
!wget -nc https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_2.log.gz
!wget -nc https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_3.log.gz
!wget -nc https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_4.log.gz
!wget -nc https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_5.log.gz
!wget -nc https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_6.log.gz
!wget -nc https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_7.log.gz

--2025-04-24 20:50:11--  https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_1.log.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10277393 (9.8M) [application/octet-stream]
Saving to: ‘sample_web_log_1.log.gz’


2025-04-24 20:50:12 (72.3 MB/s) - ‘sample_web_log_1.log.gz’ saved [10277393/10277393]

--2025-04-24 20:50:12--  https://raw.githubusercontent.com/keeyong/sjsu-data226-SP25/refs/heads/main/week13/data/sample_web_log_2.log.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Lengt

## Configure snowflake jar file and set up SparkSession

In [2]:
!cd /usr/local/lib/python3.11/dist-packages/pyspark/jars && wget https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.19.0/snowflake-jdbc-3.19.0.jar

--2025-04-24 20:50:38--  https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.19.0/snowflake-jdbc-3.19.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 70986770 (68M) [application/java-archive]
Saving to: ‘snowflake-jdbc-3.19.0.jar’


2025-04-24 20:50:39 (221 MB/s) - ‘snowflake-jdbc-3.19.0.jar’ saved [70986770/70986770]



In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("HandleLogFiles").getOrCreate()

## Create an input dataframe

In [4]:
# Load all .gz files in the directory into a DataFrame
df = spark.read.text("*.gz")

In [5]:
# Check the number of partitions
print(df.rdd.getNumPartitions())

df.show(truncate=False)

3
+-----------------------------------------------------------------------------------+
|value                                                                              |
+-----------------------------------------------------------------------------------+
|123.45.67.89 - - [05/Nov/2024:02:08:16 +0000] "DELETE /cart HTTP/1.1" 500 242      |
|192.168.1.1 - - [04/Nov/2024:21:23:39 +0000] "POST /checkout HTTP/1.1" 404 2781    |
|234.56.78.90 - - [05/Nov/2024:07:06:19 +0000] "GET /api/data HTTP/1.1" 301 3758    |
|192.168.1.1 - - [04/Nov/2024:20:03:56 +0000] "POST /home HTTP/1.1" 200 1837        |
|192.168.1.1 - - [04/Nov/2024:21:25:05 +0000] "GET /products/123 HTTP/1.1" 200 3430 |
|234.56.78.90 - - [04/Nov/2024:07:38:10 +0000] "GET /api/data HTTP/1.1" 404 3729    |
|123.45.67.89 - - [04/Nov/2024:12:33:22 +0000] "PUT /api/data HTTP/1.1" 404 799     |
|192.168.1.1 - - [04/Nov/2024:07:37:46 +0000] "GET /api/data HTTP/1.1" 500 309      |
|123.45.67.89 - - [04/Nov/2024:21:52:36 +0000] "POST

## Create a parsed dataframe (log_df)

In [6]:
# Extract the necessary information from log data using regular expressions
pattern = r'(\d+\.\d+\.\d+\.\d+) - - \[(.*?)\] "(.*?) (.*?) HTTP.*" (\d+) (\d+)'

log_df = df.select(
    F.regexp_extract("value", pattern, 1).alias("ip"),
    F.regexp_extract("value", pattern, 2).alias("timestamp"),
    F.regexp_extract("value", pattern, 3).alias("method"),
    F.regexp_extract("value", pattern, 4).alias("url"),
    F.regexp_extract("value", pattern, 5).alias("status").cast("integer"),
    F.regexp_extract("value", pattern, 6).alias("size").cast("integer")
)

In [7]:
log_df.show()

+------------+--------------------+------+-------------+------+----+
|          ip|           timestamp|method|          url|status|size|
+------------+--------------------+------+-------------+------+----+
|123.45.67.89|05/Nov/2024:02:08...|DELETE|        /cart|   500| 242|
| 192.168.1.1|04/Nov/2024:21:23...|  POST|    /checkout|   404|2781|
|234.56.78.90|05/Nov/2024:07:06...|   GET|    /api/data|   301|3758|
| 192.168.1.1|04/Nov/2024:20:03...|  POST|        /home|   200|1837|
| 192.168.1.1|04/Nov/2024:21:25...|   GET|/products/123|   200|3430|
|234.56.78.90|04/Nov/2024:07:38...|   GET|    /api/data|   404|3729|
|123.45.67.89|04/Nov/2024:12:33...|   PUT|    /api/data|   404| 799|
| 192.168.1.1|04/Nov/2024:07:37...|   GET|    /api/data|   500| 309|
|123.45.67.89|04/Nov/2024:21:52...|  POST|    /checkout|   301|2375|
|123.45.67.89|04/Nov/2024:08:36...|DELETE|    /api/data|   404|3449|
| 192.168.1.1|05/Nov/2024:03:15...|   GET|    /api/data|   200|2319|
|234.56.78.90|05/Nov/2024:01:26...

## Let's compute top 404 urls

In [8]:
# Keep only 404 error logs
error_404_logs = log_df.filter(log_df.status == 404)

In [9]:
# Group by URL and then count, and sort by count in descending order
url_404_count = error_404_logs.groupBy("url").count().orderBy(F.desc("count"))

In [10]:
# print the outcome
url_404_count.show()

+-------------+------+
|          url| count|
+-------------+------+
|/products/123|350970|
|        /cart|349830|
|    /checkout|349604|
|    /api/data|349498|
|        /home|349492|
+-------------+------+



## Now Let's do this in SparkSQL

In [11]:
# Register the DataFrame as a temporary SQL table
log_df.createOrReplaceTempView("logs")

In [12]:
# Use SparkSQL to count URLs with 404 status
url_404_count = spark.sql("""
    SELECT url, COUNT(1) as count
    FROM logs
    WHERE status = 404
    GROUP BY url
    ORDER BY count DESC
""")

In [13]:
url_404_count.show()

+-------------+------+
|          url| count|
+-------------+------+
|/products/123|350970|
|        /cart|349830|
|    /checkout|349604|
|    /api/data|349498|
|        /home|349492|
+-------------+------+



## Let's save this DF (url_404_count) as a table in Snowflake

In [14]:
from google.colab import userdata

account = userdata.get('snowflake_account')
user = userdata.get('snowflake_userid')
password = userdata.get('snowflake_password')
database = "dev"
schema = "analytics"

url = f"jdbc:snowflake://{account}.snowflakecomputing.com/?db={database}&schema={schema}&user={user}&password={password}"

In [16]:
url_404_count.write \
    .format("jdbc") \
    .option("driver", "net.snowflake.client.jdbc.SnowflakeDriver") \
    .option("url", url) \
    .mode("overwrite") \
    .option("dbtable", "url_404_count") \
    .save()

In [None]:
# Now go back to Snowflake and check the table (analytics.url_404_count)