# 🔍 Log Analysis with PySpark CLI
This notebook analyzes log data stored in HDFS and extracts meaningful insights using PySpark.

In [None]:
pyspark

## 📥 Load Data from HDFS

In [None]:

# Read logs from HDFS using the correct full path
logs  = spark.sparkContext.textFile("hdfs://localhost:9000/tmp/logGenED/25-07-19/*)

# Display the first 10 lines of the logs
logs.take(10)

## Analysis Opreation 🔍

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()
rdd = spark.sparkContext.textFile("hdfs://localhost:9000/tmp/logGenED/25-07-19/*"

# Set log level to avoid unnecessary output
sc.setLogLevel("WARN")

# Define the parser function
def parse_log(line):
    parts = line.split(" - ")
    timestamp = parts[0].strip()
    level = parts[1].strip()
    service = parts[2].strip()
    response_str = parts[3].split()[-1]
    try:
        response_time = int(response_str.replace("ms", ""))
    except:
        response_time = 0
    return (timestamp, level, service, response_time)

# Create parsed RDD and cache it for multiple uses
parsed_rdd = rdd.map(parse_log).filter(lambda x: x[3] > 0).cache()

# Run analyses
print("\n" + "="*50)
print(f"Total log entries: {rdd.count()}")

print("\nLog level distribution:")
level_counts = parsed_rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a,b: a+b).collect()
for level, count in sorted(level_counts):
    print(f"{level}: {count}")

print("\nService distribution:")
service_counts = parsed_rdd.map(lambda x: (x[2], 1)).reduceByKey(lambda a,b: a+b).collect()
for service, count in sorted(service_counts):
    print(f"{service}: {count}")

print("\nAverage response time by service:")
service_times = parsed_rdd.map(lambda x: (x[2], (x[3], 1)))
service_avg = service_times.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).mapValues(lambda x: x[0]/x[1]).collect()
for service, avg in sorted(service_avg):
    print(f"{service}: {avg:.2f}ms")

print("\nTop 5 slowest responses:")
slowest = parsed_rdd.map(lambda x: (x[3], f"{x[0]} | {x[1]} | {x[2]}")).top(5)
for time, entry in slowest:
    print(f"{time}ms: {entry}")

# Unpersist cached data
parsed_rdd.unpersist()
print("="*50)
spark.stop()