In [None]:
!pip install -r requirements.txt

In [None]:
import re
import urllib.request
import os
import geoip2.database
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()
sc = spark.sparkContext

In [None]:
log_path = "/data/apache_logs.txt"
log_rdd = sc.textFile(log_path)

In [None]:
log_pattern = re.compile(
    r'^(\S+) (\S+) (\S+) \[(.*?)\] "(.*?)" (\d{3}) (\S+)'
)

def parse_log(line):
    match = log_pattern.match(line)
    if match:
        return {
            "ip": match.group(1),
            "timestamp": match.group(4),
            "request": match.group(5),
            "status": match.group(6),
            "bytes": match.group(7)
        }
    else:
        return None

parsed_logs = log_rdd.map(parse_log).filter(lambda x: x is not None)

In [None]:
# Top 10 IPs by number of requests
top_ips = (
    parsed_logs.map(lambda log: (log["ip"], 1))
    .reduceByKey(lambda a, b: a + b)
    .takeOrdered(10, key=lambda x: -x[1])
)

print("Top 10 IPs:")
for ip, count in top_ips:
    print(f"{ip}: {count}")

# Count of status codes
status_counts = (
    parsed_logs.map(lambda log: (log["status"], 1))
    .reduceByKey(lambda a, b: a + b)
    .collect()
)

print("\nHTTP Status Code Breakdown:")
for status, count in status_counts:
    print(f"{status}: {count}")


In [None]:
# IPs with more than 100 requests
suspicious_ips = (
    parsed_logs.map(lambda log: (log["ip"], 1))
    .reduceByKey(lambda a, b: a + b)
    .filter(lambda x: x[1] > 100)
    .collect()
)

print("\nSuspicious IPs (more than 100 requests):")
for ip, count in suspicious_ips:
    print(f"{ip}: {count}")

auth_failures = (
    parsed_logs.filter(lambda log: log["status"] in ["401", "403", "400","404"])
               .map(lambda log: (log["ip"], 1))
               .reduceByKey(lambda a, b: a + b)
               .filter(lambda x: x[1] > 10)
               .collect()
)

print("\nAuth Failures IPs :")
for ip, count in auth_failures:
    print(f"{ip}: {count}")

In [None]:
# GeoIP Lookup (requires GeoLite2 database)
geoip_db_path = "/data/GeoLite2-ASN.mmdb"

if not os.path.exists(geoip_db_path):
    print("\nGeoIP database not found. Please download the GeoLite2-City.mmdb file from MaxMind and upload to /tmp.")
else:
    reader = geoip2.database.Reader(geoip_db_path)


def ip_to_country(ip):
    try:
        response = reader.asn(ip)
        return f"ASN: {response.autonomous_system_number}, Org: {response.autonomous_system_organization}"
    except Exception as e:
        print(f"Lookup failed for {ip}: {e}")
        return "Unknown"

In [None]:
# Example: enrich suspicious IPs with country info
print("\nSuspicious IPs with GeoIP Lookup:")
for ip, count in suspicious_ips:
    print(f"{ip}: {count} requests - {ip_to_country(ip)}")

In [None]:
# Plot Top IPs
plt.figure(figsize=(8, 4))
df_top_ips.plot(kind='bar', x='IP', y='Requests', legend=False, color='orange')
plt.title("Top 10 IPs by Request Count")
plt.ylabel("Requests")
plt.xlabel("IP Address")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Status Code Pie Chart
plt.figure(figsize=(6, 6))
plt.pie(df_status['Count'], labels=df_status['Status'], autopct='%1.1f%%', startangle=90)
plt.title("HTTP Status Code Distribution")
plt.axis('equal')
plt.show()

# Display Tables
print("\n🔍 Suspicious IPs (w/ ASN info if available):")
print(df_suspicious)

print("\n🔐 Repeated Authorization Failures:")
print(df_auth_failures)
