In [0]:
# -------------------------------------------------------------------
# 1) Imports and setup
# -------------------------------------------------------------------
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    input_file_name, monotonically_increasing_id, to_timestamp,
    sum as _sum, when, lit
)
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
from pyspark.sql.window import Window
import re

# Path to the folder containing log4j .log files
logs_path = "/Volumes/eastus/default/logs/0425-120643-4kmoay69/driver/*.log"

# Compile a timestamp pattern for lines that look like 'dd/MM/yy HH:mm:ss'
log_entry_pattern = re.compile(r'^\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}')

# -------------------------------------------------------------------
# 2) Read the log lines and tag lines starting a new log entry
# -------------------------------------------------------------------
# Read lines from log files
df_raw = (
    spark.read.text(logs_path)
         .withColumn("filename", input_file_name())
         .withColumn("line_id", monotonically_increasing_id())
)

# Define UDF to detect lines starting with a timestamp
@F.udf(returnType=BooleanType())
def starts_with_timestamp(line):
    return bool(log_entry_pattern.match(line.strip() if line else ""))

# Tag each line: 1 if starts a new entry, else 0
df_tagged = df_raw.withColumn(
    "is_new",
    when(starts_with_timestamp(F.col("value")), lit(1)).otherwise(lit(0))
)

# -------------------------------------------------------------------
# 3) Create cumulative group IDs based on new entry detection
# -------------------------------------------------------------------
# Use a window to cumulative sum is_new flags
window_spec = (
    Window.partitionBy("filename")
          .orderBy("line_id")
          .rowsBetween(Window.unboundedPreceding, 0)
)

df_grouped = df_tagged.withColumn(
    "group_id",
    _sum("is_new").over(window_spec)
)

# -------------------------------------------------------------------
# 4) Group by 'filename' and 'group_id' and collect lines into arrays
# -------------------------------------------------------------------
df_collected = (
    df_grouped
    .groupBy("filename", "group_id")
    .agg(F.collect_list("value").alias("lines"))
)

# -------------------------------------------------------------------
# 5) Define UDF to parse collected lines into timestamp, log level, message
# -------------------------------------------------------------------
parse_schema = StructType([
    StructField("timestamp_str", StringType()),
    StructField("log_level",     StringType()),
    StructField("message",       StringType()),
])

@F.udf(returnType=parse_schema)
def parse_log_entry(lines):
    if not lines:
        return ("", "", "")

    first_line = lines[0].strip()
    parts = first_line.split(maxsplit=3)

    if len(parts) >= 3:
        timestamp_str = parts[0] + " " + parts[1]
        log_level     = parts[2]
        message       = parts[3] if len(parts) > 3 else ""
    else:
        # If parsing fails, treat all lines as message
        return ("", "", "\n".join(lines))

    # Append continuation lines if any
    if len(lines) > 1:
        message += "\n" + "\n".join(lines[1:])

    return (timestamp_str, log_level, message)

# -------------------------------------------------------------------
# 6) Apply parsing function to grouped lines
# -------------------------------------------------------------------
df_parsed = df_collected.withColumn(
    "parsed",
    parse_log_entry(F.col("lines"))
)

# -------------------------------------------------------------------
# 7) Extract final columns and convert timestamp
# -------------------------------------------------------------------
final_df = (
    df_parsed
    .select(
        "filename",
        "group_id",
        F.col("parsed.timestamp_str").alias("timestamp_str"),
        F.col("parsed.log_level").alias("log_level"),
        F.col("parsed.message").alias("message")
    )
    .withColumn("timestamp", to_timestamp("timestamp_str", "dd/MM/yy HH:mm:ss"))
    .orderBy("filename", "group_id")
    .select("timestamp", "log_level", "message")
    
)

# -------------------------------------------------------------------
# 8) Show the final structured logs
# -------------------------------------------------------------------
display(final_df)
