In [5]:
# Importing necessary libraries and functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, expr
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ShortType, ByteType, DateType, LongType

# Createing a Spark session
session = SparkSession.builder.appName("kevent").getOrCreate()

23/07/28 11:49:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [6]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, expr
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ShortType, ByteType, DateType, LongType

# Defining the schema for the DataFrame
small_schema = StructType([
    StructField("alert_flag", StringType()),
    StructField("timestamp", LongType()),
    StructField("date", StringType()),
    StructField("node", StringType()),
    StructField("datetime", StringType()),
    StructField("repeated_node", StringType()),
    StructField("message_type", StringType()),
    StructField("system_component", StringType()),
    StructField("level", StringType()),
    StructField("message_content", StringType())
])




In [7]:
# Reading CSV data with specified schema into a DataFrame
small_df = spark.read.csv(
    "file:/home/hduser/BGL.log",
    schema=small_schema,
    sep=' ',
    header=False
)

In [8]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, expr
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ShortType, ByteType, DateType, LongType
# Concatenating selected columns into a new column
small_df = small_df.withColumn("joined_content", expr("concat_ws(' ', array({}))"
                              .format(','.join(small_df.columns[9:]))))

In [9]:
# Printing the schema of the DataFrame
small_df.printSchema()

root
 |-- alert_flag: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- date: string (nullable = true)
 |-- node: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- repeated_node: string (nullable = true)
 |-- message_type: string (nullable = true)
 |-- system_component: string (nullable = true)
 |-- level: string (nullable = true)
 |-- message_content: string (nullable = true)
 |-- joined_content: string (nullable = false)



In [10]:

small_df.printSchema()

root
 |-- alert_flag: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- date: string (nullable = true)
 |-- node: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- repeated_node: string (nullable = true)
 |-- message_type: string (nullable = true)
 |-- system_component: string (nullable = true)
 |-- level: string (nullable = true)
 |-- message_content: string (nullable = true)
 |-- joined_content: string (nullable = false)



In [10]:
small_df.show()


[Stage 0:>                                                          (0 + 1) / 1]

+----------+----------+----------+-------------------+--------------------+-------------------+------------+----------------+-----+---------------+--------------+
|alert_flag| timestamp|      date|               node|            datetime|      repeated_node|message_type|system_component|level|message_content|joined_content|
+----------+----------+----------+-------------------+--------------------+-------------------+------------+----------------+-----+---------------+--------------+
|         -|1117838570|2005.06.03|R02-M1-N0-C:J12-U11|2005-06-03-15.42....|R02-M1-N0-C:J12-U11|         RAS|          KERNEL| INFO|    instruction|   instruction|
|         -|1117838570|2005.06.03|R02-M1-N0-C:J12-U11|2005-06-03-15.42....|R02-M1-N0-C:J12-U11|         RAS|          KERNEL| INFO|    instruction|   instruction|
|         -|1117838570|2005.06.03|R02-M1-N0-C:J12-U11|2005-06-03-15.42....|R02-M1-N0-C:J12-U11|         RAS|          KERNEL| INFO|    instruction|   instruction|
|         -|1117838570


                                                                                

In [11]:
# Creating a temporary view for the DataFrame
small_df.createOrReplaceTempView("kevent")

In [24]:

# Defining an SQL query to analyze the data
SQLQuery = """
    SELECT node, COUNT(*) AS event_count
    FROM
        kevent
    WHERE 
        alert_flag = 'KERNRTSP'
    GROUP BY node
    HAVING COUNT(*) = (
        SELECT MIN(event_count)
        FROM (
            SELECT COUNT(*) AS event_count
            FROM kevent
            WHERE 
            alert_flag = 'KERNRTSP'
            GROUP BY node
        ) AS subquery
    )
"""

In [25]:
#Display output
session.sql(SQLQuery).show()

                                                                                

+-------------------+-----------+
|               node|event_count|
+-------------------+-----------+
|R32-M0-NC-C:J05-U01|          1|
|R26-M0-NB-C:J14-U11|          1|
|R26-M0-N3-C:J11-U01|          1|
|R27-M1-N9-C:J06-U01|          1|
|R26-M1-N2-C:J06-U01|          1|
|R26-M1-N1-C:J12-U01|          1|
|R26-M1-NB-C:J17-U11|          1|
|R26-M1-N0-C:J14-U11|          1|
|R25-M0-N4-C:J13-U11|          1|
|R37-M1-NB-C:J12-U01|          1|
|R26-M0-ND-C:J17-U01|          1|
|R26-M0-N5-C:J12-U01|          1|
|R26-M0-N5-C:J02-U01|          1|
|R27-M1-N7-C:J02-U11|          1|
|R27-M0-N6-C:J05-U01|          1|
|R27-M1-NA-C:J02-U01|          1|
|R26-M1-NA-C:J04-U01|          1|
|R26-M1-N7-C:J14-U01|          1|
|R27-M1-N1-C:J17-U01|          1|
|R26-M0-N7-C:J14-U11|          1|
+-------------------+-----------+
only showing top 20 rows

