In [1]:
import os
# Set the exact package versions for Spark 3.0.1
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.kafka:kafka-clients:2.4.1 pyspark-shell'

# Then create your SparkSession


In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract, window, count, current_timestamp
from pyspark.sql.types import StringType, IntegerType

# Initialize Spark Session 
spark = SparkSession \
    .builder \
    .appName("ApacheLogAnalysis60SecIntervals") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .getOrCreate()

# Set log level to reduce notebook output noise
spark.sparkContext.setLogLevel("WARN")

In [None]:


# Read streaming data from Kafka
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "apache_logs") \
    .option("startingOffsets", "latest") \
    .load()

# Extract the log message from Kafka value
logs_df = kafka_df.selectExpr(
    "CAST(value AS STRING) as log_line",
    "CAST(timestamp AS TIMESTAMP) as kafka_timestamp"
)

# Add processing timestamp for windowing
logs_with_time = logs_df.withColumn("processing_time", current_timestamp())

# Parse Apache log fields using regex patterns
parsed_logs = logs_with_time \
    .withColumn("ip", regexp_extract("log_line", r"^(\S+)", 1)) \
    .withColumn("timestamp", regexp_extract("log_line", r"$$(\S+ [+\-]\d{4})$$", 1)) \
    .withColumn("method", regexp_extract("log_line", r"\"(\S+) ", 1)) \
    .withColumn("endpoint", regexp_extract("log_line", r"\"(?:\S+) (\S+) ", 1)) \
    .withColumn("protocol", regexp_extract("log_line", r"\"(?:\S+) (?:\S+) (\S+)\"", 1)) \
    .withColumn("status", regexp_extract("log_line", r" (\d{3}) ", 1).cast(IntegerType())) \
    .withColumn("bytes", regexp_extract("log_line", r" (\d+|-) ", 1).cast(IntegerType()))

# Display schema in notebook
parsed_logs.printSchema()

# Group data into 60-second windows based on processing time
windowed_logs = parsed_logs \
    .groupBy(window(col("processing_time"), "60 seconds")) \
    .count() \
    .withColumnRenamed("count", "total_requests")

# Status code distribution in 60-second windows
status_distribution = parsed_logs \
    .groupBy(window(col("processing_time"), "60 seconds"), "status") \
    .count() \
    .orderBy("window", "status")

# Endpoint traffic in 60-second windows
endpoint_traffic = parsed_logs \
    .groupBy(window(col("processing_time"), "60 seconds"), "endpoint") \
    .count() \
    .orderBy("window", col("count").desc())

# HTTP method distribution in 60-second windows
method_distribution = parsed_logs \
    .groupBy(window(col("processing_time"), "60 seconds"), "method") \
    .count() \
    .orderBy("window", col("count").desc())

# IP traffic in 60-second windows (top IPs)
ip_traffic = parsed_logs \
    .groupBy(window(col("processing_time"), "60 seconds"), "ip") \
    .count() \
    .orderBy("window", col("count").desc())

# Error rates in 60-second windows
error_rates = parsed_logs \
    .filter(col("status") >= 400) \
    .groupBy(window(col("processing_time"), "60 seconds"), "status") \
    .count() \
    .orderBy("window", "status")

# Define a function to start streaming queries in notebook
def start_query(df, name, num_rows=20):
    return df.writeStream \
        .outputMode("complete") \
        .format("console") \
        .trigger(processingTime="60 seconds") \
        .queryName(name) \
        .option("truncate", "false") \
        .option("numRows", num_rows) \
        .start()

# Start all queries
queries = []

# Total requests per 60 seconds
queries.append(start_query(windowed_logs, "total_requests"))

# Status distribution
queries.append(start_query(status_distribution, "status_distribution"))

# Endpoint traffic
queries.append(start_query(endpoint_traffic, "endpoint_traffic"))

# Method distribution
queries.append(start_query(method_distribution, "method_distribution"))

# IP traffic
queries.append(start_query(ip_traffic, "ip_traffic"))

# Error rates
queries.append(start_query(error_rates, "error_rates"))

# Display active queries
for query in queries:
    print(f"Query '{query.name}' is active.")

# To stop all queries (run in a separate cell when ready)
# for query in queries:
#     query.stop()
# print("All streaming queries stopped.")

root
 |-- log_line: string (nullable = true)
 |-- kafka_timestamp: timestamp (nullable = true)
 |-- processing_time: timestamp (nullable = false)
 |-- ip: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- bytes: integer (nullable = true)

Query 'total_requests' is active.
Query 'status_distribution' is active.
Query 'method_distribution' is active.
Query 'ip_traffic' is active.
Query 'error_rates' is active.


In [5]:
#To stop all queries (run in a separate cell when ready)
for query in queries:
    
    query.stop()
print("All streaming queries stopped.")

All streaming queries stopped.
