In [None]:
# Installing the dependencies
pip install matplotlib
pip install pyspark

In [None]:
# Generating the test data

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Generate Sample Apache Logs") \
    .getOrCreate()

# Generate sample data
sample_data = [
    ("2024-05-19T08:00:00", "GET /api/endpoint1 HTTP/1.1", "200", "192.168.1.1", 1024),
    ("2024-05-19T08:01:00", "POST /api/endpoint2 HTTP/1.1", "404", "192.168.1.2", 2048),
    ("2024-05-19T08:02:00", "GET /api/endpoint1 HTTP/1.1", "200", "192.168.1.3", 4096),
    # Add more sample data as needed
]

# Define schema
schema = ["timestamp", "request", "response", "ip", "size"]

# Create DataFrame
logs_df = spark.createDataFrame(sample_data, schema=schema)

# Save DataFrame as Parquet file
logs_df.write.mode("overwrite").parquet("data/apache_logs.parquet")

# Stop SparkSession
spark.stop()


In [None]:
!pwd

In [None]:
# Data Ingestion
# loading Apache Access logs stored in parquet files in my local laptop

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("API Log Anomaly Detection").getOrCreate()

# Load data
logs_df = spark.read.parquet("data/apache_logs.parquet")
logs_df.show(5)


In [None]:
# Data Transformation

from pyspark.sql.functions import col, split

logs_df = logs_df.withColumn("response_code", col("response").cast("int"))
logs_df = logs_df.withColumn("content_size", col("size").cast("int"))
logs_df = logs_df.withColumn("endpoint", split(col("request"), " ")[1])
logs_df.show(5)


In [None]:
# Data Analysis: Response Code Analysis
response_code_counts = logs_df.groupBy("response_code").count().orderBy("count", ascending=False)
response_code_counts.show()

In [None]:
# Traffic Analysis: Top Endpoints
top_endpoints = logs_df.groupBy("endpoint").count().orderBy("count", ascending=False)
top_endpoints.show(10)

In [None]:
# Frequent Visitors

frequent_visitors = logs_df.groupBy("ip").count().filter(col("count") > 10).orderBy("count", ascending=False)
frequent_visitors.show(10)

In [None]:
# Content Size Statistics
from pyspark.sql.functions import min, max, avg

content_size_stats = logs_df.select(min("content_size"), max("content_size"), avg("content_size"))
content_size_stats.show()


In [None]:
# Anomaly Detection
#IPs with More Than 10 Requests

anomalous_ips = logs_df.groupBy("ip").count().filter(col("count") > 10)
anomalous_ips.show()


In [None]:
# 404 Requests

latest_404_requests = logs_df.filter(col("response_code") == 404).orderBy(col("timestamp"), ascending=False).select("timestamp", "endpoint").limit(10)
latest_404_requests.show()



In [None]:
# 

import matplotlib.pyplot as plt

# Example: Plot response code distribution
response_code_pandas = response_code_counts.toPandas()
plt.figure(figsize=(10,6))
plt.bar(response_code_pandas['response_code'], response_code_pandas['count'])
plt.xlabel('Response Code')
plt.ylabel('Count')
plt.title('Response Code Distribution')
plt.show()
