In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  regexp_extract, col, count, rand
# import pandas as pd
import time

# Setup 
INPUT_DIR = "data/sample" 
OUTPUT_DIR = "data/output"


# Initialize Spark session (local)
spark = SparkSession.builder \
    .appName("Problem1_Local") \
    .getOrCreate()

start_time = time.time()

# Read all text files from the input directory (recursively) into a Spark DataFrame
logs_df = spark.read.option("recursiveFileLookup", "true").text(INPUT_DIR)

# Define a regex pattern to capture log levels (INFO, WARN, ERROR, DEBUG)
pattern = r"(INFO|WARN|ERROR|DEBUG)"

# Extract the log level from each line and store it in a new column
logs_parsed = logs_df.withColumn(
    'log_level', regexp_extract('value', pattern, 1)
    )

# Keep only rows that contain a valid log level
valid_df = logs_parsed.filter(col("log_level") != "")

# Count how many times each log level appears
counts_df = valid_df.groupBy("log_level").agg(count("*").alias("count"))

# Save the counts for each log level as a CSV file
counts_df.toPandas().to_csv(f'{OUTPUT_DIR}/problem1_counts.csv', index=False)

# Randomly select 10 log entries to create a sample of the data
sample_df = valid_df.orderBy(rand()).limit(10)

# Rename the text column to 'log_entry' for clarity in the sample file
sample_df = sample_df.withColumnRenamed('value', 'log_entry')

# Save the random sample as a CSV file
sample_df.toPandas().to_csv(f'{OUTPUT_DIR}/problem1_sample.csv', index=False)

# Calculate total numbers of lines and valid log entries, and number of unique levels
total_lines = logs_df.count()
total_valid = valid_df.count()
unique_levels = counts_df.count()

# Create a summary report including totals and log level distribution
summary_text = (
    f"Total log lines processed: {total_lines}\n"
    f"Total lines with log levels: {total_valid}\n"
    f"Unique log levels found: {unique_levels}\n"
)

# Write the summary text and log level distribution with percentages to a file
with open(f'{OUTPUT_DIR}/problem1_summary.txt', "w") as f:
    f.write(summary_text)
    f.write("Log level distribution:\n")
    for _, row in counts_df.toPandas().iterrows():
        level = row["log_level"]
        cnt = int(row["count"])
        pct = (cnt / total_valid) * 100 if total_valid > 0 else 0.0
        f.write(f"  {level:<5}: {cnt:10,} ({pct:6.2f}%)\n")



# Time Summary
execution_time = time.time() - start_time
print("\n" + "=" * 70)
print("ANALYSIS COMPLETED - Summary Statistics")
print("=" * 70)
print(f"Execution time: {execution_time:.2f} seconds")
print("=" * 70)

spark.stop()



Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/23 20:16:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                


ANALYSIS COMPLETED - Summary Statistics
Execution time: 8.29 seconds
