In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import concat_ws, col

session = SparkSession.builder.appName("bgl").getOrCreate()

24/07/23 16:37:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
max_columns = 20  # Based on the log file we assume to have a maximum of 20 columns when splitted in space
fields = [
    StructField("Alert_message_flag", StringType(), True),
    StructField("Timestamp", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Node", StringType(), True),
    StructField("DateTime", StringType(), True),
    StructField("Node2", StringType(), True),
    StructField("MessageT", StringType(), True),
    StructField("SystemC", StringType(), True),
    StructField("Level", StringType(), True),
    StructField("Message", StringType(), True)
] + [StructField(f"ExtraColumn_{i}", StringType(), True) for i in range(max_columns - 10)] # we create extra columns to put the remaining values of the message

bgl_schema = StructType(fields)


In [3]:
bgl_df = session.read.option("delimiter", " ").csv(
    "BGL.log",
    schema=bgl_schema,
    header=False,  # BGL.log does not have a header
    dateFormat="dd-MMM-yyyy HH:mm"
)

In [4]:
total_col = bgl_df.columns

#we split the columns into two
first10 = total_col[:10]  # columns to the 9th index
additional_col = total_col[10:]  # Columns from 10th index

# merge
bgl_df = bgl_df.withColumn(
    "Message",
    concat_ws(" ", col("Message"), *[col(col_name) for col_name in additional_col])
)

# Select the final columns to keep in the DataFrame
final_columns = first10[:9] + ["Message"]  # Keep first 9 columns and the concatenated "Message" column
bgl_df = bgl_df.select(*final_columns)

# Show the result
bgl_df.show(truncate=False)


                                                                                

+------------------+----------+----------+-------------------+--------------------------+-------------------+--------+-------+-----+----------------------------------------+
|Alert_message_flag|Timestamp |Date      |Node               |DateTime                  |Node2              |MessageT|SystemC|Level|Message                                 |
+------------------+----------+----------+-------------------+--------------------------+-------------------+--------+-------+-----+----------------------------------------+
|-                 |1117838570|2005.06.03|R02-M1-N0-C:J12-U11|2005-06-03-15.42.50.363779|R02-M1-N0-C:J12-U11|RAS     |KERNEL |INFO |instruction cache parity error corrected|
|-                 |1117838570|2005.06.03|R02-M1-N0-C:J12-U11|2005-06-03-15.42.50.527847|R02-M1-N0-C:J12-U11|RAS     |KERNEL |INFO |instruction cache parity error corrected|
|-                 |1117838570|2005.06.03|R02-M1-N0-C:J12-U11|2005-06-03-15.42.50.675872|R02-M1-N0-C:J12-U11|RAS     |KERNEL |INFO

In [11]:
bgl_df.createOrReplaceTempView("bgl_logs")

SQLQuery = """
WITH EventCounts AS (
    SELECT Node, COUNT(*) AS Event_Count
    FROM bgl_logs
    WHERE Alert_message_flag = 'APPUNAV'
    GROUP BY Node
),
MaxEventCount AS (
    SELECT MAX(Event_Count) AS Max_Count
    FROM EventCounts
)
SELECT COUNT(*) AS Node_Count
FROM EventCounts
JOIN MaxEventCount
ON EventCounts.Event_Count = MaxEventCount.Max_Count
"""

result = session.sql(SQLQuery).collect()
print("Number of nodes with the largest number of events:", result[0]["Node_Count"])


                                                                                

Number of nodes with the largest number of events: 512


In [3]:
session.stop()