# Web Server Log Analytics with Spark Optimizations


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
df_logs= None
spark=SparkSession.builder\
     .appName("WebLogAnalytics") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()


#Sample Raw Log Data (in place of real log file)

In [2]:
log_data = [
    ("192.168.1.1", "2023-07-01 10:00:00", "/home", 200),
    ("192.168.1.2", "2023-07-01 10:01:00", "/login", 200),
    ("192.168.1.3", "2023-07-01 10:02:00", "/cart", 500),
    ("192.168.1.1", "2023-07-01 10:03:00", "/checkout", 404),
    ("192.168.1.2", "2023-07-01 10:04:00", "/home", 200),
    ("192.168.1.3", "2023-07-01 10:05:00", "/home", 200)
]

columns = ["ip", "timestamp", "url", "status"]
df_logs = spark.createDataFrame(log_data, columns)

# Convert timestamp column to proper type
df_logs = df_logs.withColumn("timestamp", to_timestamp("timestamp"))

# Column + Predicate + Filter + Project Pushdown simulated with select/filter

In [3]:
df_filtered = df_logs.select("ip", "url", "status", "timestamp") \
    .filter("status >= 400")

# Cache for reuse

In [4]:
cached_errors = df_filtered.cache()

In [5]:
# Group by URL to find most common errors
error_counts = cached_errors.groupBy("url").count().orderBy(desc("count"))

# Broadcast Join with small IP-to-country mapping
ip_country = [
    ("192.168.1.1", "US"),
    ("192.168.1.2", "UK"),
    ("192.168.1.3", "PK")
]
df_country = spark.createDataFrame(ip_country, ["ip", "country"])
df_joined = cached_errors.join(broadcast(df_country), "ip")

# Skew join simulation with salting
salted_logs = df_logs.withColumn("salted_ip", concat(col("ip"), (rand()*10).cast("int")))

# Repartition by URL
reparted = df_logs.repartition(4, "url")

# Coalesce to write final output
reparted.coalesce(1).write.mode("overwrite").parquet("/tmp/final_logs")

# Save optimized file format with max record size
reparted.write.option("maxRecordsPerFile", 100000).parquet("/tmp/optimized_logs")

# Show final analysis
print("most commen error urls")
error_counts.show()

print("join with country info")
df_joined.show()

# Monitor Spark UI: http://localhost:4040
# Stop session
spark.stop()

--- Most Common Error URLs ---
+---------+-----+
|      url|count|
+---------+-----+
|    /cart|    1|
|/checkout|    1|
+---------+-----+

--- Joined With Country Info ---
+-----------+---------+------+-------------------+-------+
|         ip|      url|status|          timestamp|country|
+-----------+---------+------+-------------------+-------+
|192.168.1.3|    /cart|   500|2023-07-01 10:02:00|     PK|
|192.168.1.1|/checkout|   404|2023-07-01 10:03:00|     US|
+-----------+---------+------+-------------------+-------+

