In [25]:
# Import necessary PySpark modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, when, current_timestamp, to_timestamp, col, lit
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [21]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Log Transformation") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("hostname", StringType(), True),
    StructField("process", StringType(), True),
    StructField("pid", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("severity", StringType(), True),
    StructField("message", StringType(), True),
    StructField("log_source", StringType(), True),
    StructField("processed_at", TimestampType(), True)
])

In [22]:
# Your sample logs as a list of JSON strings
logs = [
    {"message":"2025-04-07T12:21:07.533748+01:00 anis-virtualbox systemd-logind[841]: Session 1 logged out. Waiting for processes to exit."},
    {"message":"2025-04-07T12:21:07.536974+01:00 anis-virtualbox systemd-logind[841]: Removed session 1."},
    {"message":"2025-04-07T12:21:07.779391+01:00 anis-virtualbox sddm-helper: gkr-pam: unlocked login keyring"},
    {"message":"2025-04-07T12:21:09.340317+01:00 anis-virtualbox polkitd[826]: Registered Authentication Agent for unix-session:3 (system bus name :1.51 [/usr/bin/lxqt-policykit-agent], object path /org/lxqt/PolicyKit1/AuthenticationAgent, locale en_US.UTF-8)"},
    {"message":"2025-04-07T12:21:20.870258+01:00 anis-virtualbox sudo:     anis : TTY=pts/0 ; PWD=/home/anis ; USER=root ; COMMAND=/usr/local/bin/fluentd -c /etc/fluentd.conf -v"},
    {"message":"2025-04-07T12:21:20.871847+01:00 anis-virtualbox sudo: pam_unix(sudo:session): session opened for user root(uid=0) by anis(uid=1000)"},
    {"message":"2025-04-07T12:21:31.609513+01:00 anis-virtualbox sudo:     anis : TTY=pts/2 ; PWD=/home/anis ; USER=root ; COMMAND=/usr/bin/su"},
    {"message":"2025-04-07T12:21:31.609831+01:00 anis-virtualbox sudo: pam_unix(sudo:session): session opened for user root(uid=0) by anis(uid=1000)"},
    {"message":"2025-04-07T12:21:31.620219+01:00 anis-virtualbox su[1719]: (to root) root on pts/3"},
    {"message":"2025-04-07T12:21:31.620730+01:00 anis-virtualbox su[1719]: pam_unix(su:session): session opened for user root(uid=0) by anis(uid=0)"},
    {"message":"2025-04-07T12:21:33.769836+01:00 anis-virtualbox su[1719]: pam_unix(su:session): session closed for user root"},
    {"message":"2025-04-07T12:21:33.772637+01:00 anis-virtualbox sudo: pam_unix(sudo:session): session closed for user root"},
    {"message":"2025-04-07T12:21:34.862691+01:00 anis-virtualbox dbus-daemon[804]: [system] Failed to activate service 'org.bluez': timed out (service_start_timeout=25000ms)"},
    {"message":"2025-04-07T12:25:01.129908+01:00 anis-virtualbox CRON[1766]: pam_unix(cron:session): session opened for user root(uid=0) by root(uid=0)"},
    {"message":"2025-04-07T12:25:01.138020+01:00 anis-virtualbox CRON[1766]: pam_unix(cron:session): session closed for user root"},
    {"message":"2025-04-07T12:30:01.145120+01:00 anis-virtualbox CRON[1836]: pam_unix(cron:session): session opened for user root(uid=0) by root(uid=0)"},
    {"message":"2025-04-07T12:30:01.150387+01:00 anis-virtualbox CRON[1836]: pam_unix(cron:session): session closed for user root"},
    {"message":"2025-04-07T12:35:01.161847+01:00 anis-virtualbox CRON[1909]: pam_unix(cron:session): session opened for user root(uid=0) by root(uid=0)"},
    {"message":"2025-04-07T12:35:01.170794+01:00 anis-virtualbox CRON[1909]: pam_unix(cron:session): session closed for user root"},
    {"message":"2025-04-07T12:36:47.879075+01:00 anis-virtualbox sudo: pam_unix(sudo:auth): authentication failure; logname=anis uid=1000 euid=0 tty=/dev/pts/2 ruser=anis rhost=  user=anis"},
    {"message":"2025-04-07T12:36:54.388158+01:00 anis-virtualbox dbus-daemon[804]: [system] Failed to activate service 'org.bluez': timed out (service_start_timeout=25000ms)"},
    {"message":"2025-04-07T12:36:56.593690+01:00 anis-virtualbox sudo:     anis : TTY=pts/2 ; PWD=/home/anis ; USER=root ; COMMAND=/usr/bin/su"},
    {"message":"2025-04-07T12:36:56.595061+01:00 anis-virtualbox sudo: pam_unix(sudo:session): session opened for user root(uid=0) by anis(uid=1000)"},
    {"message":"2025-04-07T12:36:56.602089+01:00 anis-virtualbox su[1974]: (to root) root on pts/3"},
    {"message":"2025-04-07T12:36:56.604534+01:00 anis-virtualbox su[1974]: pam_unix(su:session): session opened for user root(uid=0) by anis(uid=0)"},
    {"message":"2025-04-07T12:36:58.170036+01:00 anis-virtualbox su[1974]: pam_unix(su:session): session closed for user root"},
    {"message":"2025-04-07T12:36:58.175962+01:00 anis-virtualbox sudo: pam_unix(sudo:session): session closed for user root"}
]

In [23]:
# Create a DataFrame from the logs
raw_df = spark.createDataFrame(logs)

In [26]:
# Transformation function applied as a DataFrame operation
transformed_df = raw_df.select(
    to_timestamp(
        regexp_extract(col("message"), r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?\+\d{2}:\d{2})", 1),
        "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX"
    ).alias("timestamp"),
    regexp_extract(col("message"), r"^\S+\s+(\S+)", 1).alias("hostname"),
    regexp_extract(
        regexp_extract(col("message"), r"\s+(\S+(?:\[\d+\])?):\s+", 1), 
        r"^(\S+?)(?:\[\d+\])?$", 1
    ).alias("process"),
    regexp_extract(
        regexp_extract(col("message"), r"\s+(\S+(?:\[\d+\])?):\s+", 1), 
        r"\[(\d+)\]", 1
    ).alias("pid"),
    when(col("message").contains("session opened for user"), "session_open")
    .when(col("message").contains("session closed for user"), "session_close")
    .when(col("message").contains("COMMAND="), "sudo_command")
    .when(col("message").contains("pam_unix(su:session)"), "su_session")
    .when(col("message").contains("pam_unix(cron:session)"), "cron_session")
    .when(col("message").contains("systemd-logind"), "logind_event")
    .when(col("message").contains("New session") | col("message").contains("Removed session"), "session_event")
    .when(col("message").contains("Failed to activate") | col("message").contains("unable to locate"), "error")
    .otherwise("auth_misc").alias("event_type"),
    when(col("message").contains("Failed to activate") | col("message").contains("unable to locate"), "high")
    .when(col("message").contains("COMMAND=") | col("message").contains("su:session"), "medium")
    .when(col("message").contains("session opened") | col("message").contains("session closed"), "low")
    .otherwise("info").alias("severity"),
    regexp_extract(col("message"), r"\s+\S+(?:\[\d+\])?:\s+(.*)", 1).alias("message"),
    lit("system").alias("log_source"),
    current_timestamp().alias("processed_at")
)

In [27]:
# Show the transformed DataFrame
print("Transformed Logs DataFrame:")
transformed_df.show(5, truncate=False)

# Display schema
print("\nDataFrame Schema:")
transformed_df.printSchema()

# Optional: Save to CSV
transformed_df.write.csv("transformed_logs_spark", header=True, mode="overwrite")
print("\nSaved to 'transformed_logs_spark' directory")

# Stop the Spark session
spark.stop()

Transformed Logs DataFrame:


                                                                                

+--------------------------+---------------+--------------+---+------------+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------------+
|timestamp                 |hostname       |process       |pid|event_type  |severity|message                                                                                                                                                                             |log_source|processed_at             |
+--------------------------+---------------+--------------+---+------------+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------------+
|2025-04-07 12:21:07.533748|anis-virtualbox|systemd-logind|841|logind_event|info    |Ses

                                                                                


Saved to 'transformed_logs_spark' directory
