In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Reading file

In [None]:
spark = SparkSession.builder.appName("Spark Data Analysis").getOrCreate()
logs_df = spark.read.csv("web_server_logs.csv", header=True, inferSchema=True)

Checking schema of the table:

In [26]:
logs_df.show(1)
logs_df.printSchema()

+-------------+-------------------+------+--------+-------------+-------------+
|           ip|          timestamp|method|     url|response_code|response_size|
+-------------+-------------------+------+--------+-------------+-------------+
|59.120.253.50|2024-04-01 23:02:08|DELETE|category|          500|         3224|
+-------------+-------------------+------+--------+-------------+-------------+
only showing top 1 row

root
 |-- ip: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- method: string (nullable = true)
 |-- url: string (nullable = true)
 |-- response_code: integer (nullable = true)
 |-- response_size: integer (nullable = true)



Top 10 active IP addresses:

In [63]:
active_ip_df = (logs_df.groupBy(col("ip"))
                .agg(count("ip").alias("request_count"))
                .orderBy(col("request_count").desc()).limit(10))
active_ip_df.show()

+---------------+-------------+
|             ip|request_count|
+---------------+-------------+
|214.118.224.200|            2|
| 179.212.53.235|            1|
|   146.33.77.45|            1|
| 56.167.163.123|            1|
|152.203.115.177|            1|
| 30.190.251.242|            1|
|208.144.202.211|            1|
|  149.39.84.148|            1|
|  166.186.6.249|            1|
|   86.74.92.200|            1|
|  202.78.244.99|            1|
|  140.255.154.3|            1|
|   16.89.96.163|            1|
|   74.75.97.164|            1|
|    53.202.64.6|            1|
|   17.26.250.59|            1|
|  88.197.64.155|            1|
|  64.154.183.70|            1|
|    90.3.250.99|            1|
|   188.6.194.24|            1|
+---------------+-------------+
only showing top 20 rows



Request count by HTTP method:

In [64]:
http_count_df = (logs_df.groupBy("method")
                 .agg(count("method").alias("method_count"))
                 .orderBy(col("method_count").desc()))
http_count_df.show()

+------+------------+
|method|method_count|
+------+------------+
|  POST|       25176|
|DELETE|       25052|
|   GET|       24986|
|   PUT|       24786|
+------+------------+



Number of 404 response codes:

In [61]:
error_code_df = (logs_df.filter(col("response_code") == 404)
                 .agg(count("response_code").alias("404_count")))
print(f"Number of 404 responses: {error_code_df.collect()[0][0]}")


Number of 404 responses: 24850


Total response size by day:

In [60]:
response_size_df = (logs_df.withColumn("timestamp", to_date(col("timestamp")))
                    .groupBy("timestamp")
                    .agg(sum("response_size").alias("response_size_count"))
                    .orderBy("timestamp"))
response_size_df.show()

+----------+-----------------------------------------+
| timestamp|sum(response_size AS response_size_count)|
+----------+-----------------------------------------+
|2024-01-01|                                  2052392|
|2024-01-02|                                  2108796|
|2024-01-03|                                  2186464|
|2024-01-04|                                  1970542|
|2024-01-05|                                  1852021|
|2024-01-06|                                  1933700|
|2024-01-07|                                  2150467|
|2024-01-08|                                  2058076|
|2024-01-09|                                  2302898|
|2024-01-10|                                  2062825|
|2024-01-11|                                  2082929|
|2024-01-12|                                  2000169|
|2024-01-13|                                  2241225|
|2024-01-14|                                  1892717|
|2024-01-15|                                  1865733|
|2024-01-1