# Question 6: For each day of the week, what is the average number of seconds over which ”re-synch state events” occurred?

# Section 1: Data Preparation and Preprocessing

**IMPORTING NECESSARY LIBRARIES**

In [1]:
#https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html
# Importinng 'SparkSession' module which is the main entry point for DataFrame and SQL functionality. SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables and cache tables.
from pyspark.sql import SparkSession

# Importing 'Date (datetime.date)', 'Timestamp (datetime.datetime)' , 'Double' data type from 'Data Types' module  
from pyspark.sql.types import DateType, TimestampType, DoubleType, IntegerType

# importing built-in functions available for DataFrame.
from pyspark.sql.functions import to_date, to_timestamp, col, month, date_format, regexp_extract, dayofweek, avg

# Importing 'matplotlib.pyplot' module for creating and customizing plots and visualizations. 
# 'pyplot' is a part of the Matplotlib library and provides a user-friendly interface to create different types of plots.
import matplotlib.pyplot as plt

# Importing'time' module for measuring execution time
import time

# Import re module which provides regular expression matching operations in Python.
# https://docs.python.org/3/library/re.html
import re

**EXECUTION TIME COUNTER START**

In [2]:
# Recording the starting time of the execution using the 'time.perf_counter()' function.It provides a high-resolution timer that measures the time elapsed in seconds to measure its execution time.
executiontimestart= time.perf_counter()

In [3]:
# Initializing SparkSession named 'ScpProject' using the SparkSession builder.
session = SparkSession.builder.appName('ScpProject').getOrCreate()

23/08/11 21:26:49 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Reading data from CSV file into a DataFrame 'bgl_log_df'
bgl_log_df = session.read.csv("BGLnew.log", sep=',', inferSchema=True, header=False)

                                                                                

In [5]:
# Renaming the columns 'toDF()' method for better understanding into our dataframe.
bgl_log_df = bgl_log_df.toDF('alert_message_flag', 'timestamp', 'date', 'node', 'date_and_time', 'node_repeated',
                             'message_type', 'system_component', 'level', 'message_content')


In [6]:
# Parsing 'date' column to standard DateType
bgl_log_df = bgl_log_df.withColumn('date', to_date('date', 'yyyy.MM.dd'))

In [7]:
# Parsing 'date_and_time' column to standard TimestampType
bgl_log_df = bgl_log_df.withColumn('date_and_time', to_timestamp('date_and_time', 'yyyy-MM-dd-HH.mm.ss.SSSSSS'))

In [8]:
# Casting columns to appropriate data types
bgl_log_df = bgl_log_df.withColumn("timestamp", bgl_log_df["timestamp"].cast(TimestampType())) \
    .withColumn("date", bgl_log_df["date"].cast(DateType())) \
    .withColumn("date_and_time", bgl_log_df["date_and_time"].cast(TimestampType()))

In [9]:
# Using 'printSchema()' method to display the updated schema of the DataFrame 'bgl_log_df'.
bgl_log_df.printSchema()

root
 |-- alert_message_flag: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)
 |-- node: string (nullable = true)
 |-- date_and_time: timestamp (nullable = true)
 |-- node_repeated: string (nullable = true)
 |-- message_type: string (nullable = true)
 |-- system_component: string (nullable = true)
 |-- level: string (nullable = true)
 |-- message_content: string (nullable = true)



In [10]:
# bgl_log_df: The DataFrame on which the operation is performed. This DataFrame contains log data with various columns like "date_and_time".
# bgl_log_df.withColumn(): A method used to add a new column to the DataFrame. It takes two arguments: the name of the new column and the expression to calculate its values.
# "day": The name of the new column that will be added to the DataFrame. This column will store the day of the week corresponding to the "date_and_time" column.
# date_format(col("date_and_time"), "EEEE"): The expression to calculate the values of the new "day" column. It uses the "date_format" function from PySpark SQL to extract the day of the week (full name) from the "date_and_time" column. The "EEEE" format string specifies the full name of the day (e.g., Monday, Tuesday, etc.).
# The result of the "date_format" function is applied to each row of the DataFrame, and the corresponding day of the week is stored in the new "day" column.
bgl_log_df = bgl_log_df.withColumn("day", date_format(col("date_and_time"), "EEEE"))

**DATA PREPRATION AND PREPROCESSING COMPLETED**

In [11]:
# We are using the 'show()' method to display the DataFrame 'bgl_log_df' in a vertical format.
# The 'vertical=True' argument is used to display the DataFrame vertically, showing each row in a single line.
# The 'truncate=False' argument is used to prevent truncation of column values, ensuring complete visibility of the data.
bgl_log_df.show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------------
 alert_message_flag | -                                        
 timestamp          | 2005-06-03 22:42:50                      
 date               | 2005-06-03                               
 node               | R02-M1-N0-C:J12-U11                      
 date_and_time      | 2005-06-03 15:42:50.363779               
 node_repeated      | R02-M1-N0-C:J12-U11                      
 message_type       | RAS                                      
 system_component   | KERNEL                                   
 level              | INFO                                     
 message_content    | instruction cache parity error corrected 
 day                | Friday                                   
-RECORD 1------------------------------------------------------
 alert_message_flag | -                                        
 timestamp          | 2005-06-03 22:42:50                      
 date               | 2005-06-03        

# Section 2: Log Data Analysis : Spark RDD

**TOTAL NUMBER OF LOG ENTRIES IN THE LOG DATASET COUNT**

In [12]:
# Count total logs in the DataFrame
total_logs_count = bgl_log_df.count()

# Display the total number of logs
total_logs_count

4747963

In [13]:
# Filter the DataFrame to extract rows containing the phrase "re-synch state event" in the message_content column.
# The result will be a new DataFrame containing only the rows with the specified phrase.
extracted_logs_df = bgl_log_df.filter(col("message_content").contains("re-synch state event"))

# The `show()` method is used to display the content of the DataFrame
extracted_logs_df.show(truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------
 alert_message_flag | -                                                                   
 timestamp          | 2005-06-04 01:21:45                                                 
 date               | 2005-06-03                                                          
 node               | R06-M1-N6-C:J15-U01                                                 
 date_and_time      | 2005-06-03 18:21:45.321124                                          
 node_repeated      | R06-M1-N6-C:J15-U01                                                 
 message_type       | RAS                                                                 
 system_component   | KERNEL                                                              
 level              | INFO                                                                
 message_content    | 1 tree receiver 2 in re-synch state event(s) (dcr 0x019a) detected  

In [14]:
# https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/functions/regexp_extract
# Defined regular expression pattern to extract the seconds from the message_content
#pattern =r"re-synch state event(?:\(s\))? \(dcr 0x019a\) detected over (\d+) seconds"

# The pattern matches logs with the following structure:
# - "re-synch state event" followed by an optional "s" inside non-capturing group "(?:...)".
# - "(dcr 0x" followed by one or more hexadecimal digits "[0-9A-Fa-f]+" and then a closing parenthesis ")".
# - Any non-digit characters "[^0-9]+" to allow for spaces or other characters between the hexadecimal number and "seconds".
# - "(\d+)" to capture one or more digits representing the number of seconds.

pattern= r"re-synch state event(?:\(s\))? \(dcr 0x[0-9A-Fa-f]+\) detected over (\d+) seconds"

# Using the regular expression pattern to extract the number of seconds from the message_content.
# The extracted value will be stored in a new column named "seconds" in the DataFrame "extracted_logs_df".
# The extracted value is cast to an IntegerType to ensure it is represented as an integer.
extracted_logs_df = extracted_logs_df.withColumn("seconds", regexp_extract(col("message_content"), pattern, 1).cast(IntegerType()))
extracted_logs_df.show(10)

+------------------+-------------------+----------+-------------------+--------------------+-------------------+------------+----------------+-----+--------------------+------+-------+
|alert_message_flag|          timestamp|      date|               node|       date_and_time|      node_repeated|message_type|system_component|level|     message_content|   day|seconds|
+------------------+-------------------+----------+-------------------+--------------------+-------------------+------------+----------------+-----+--------------------+------+-------+
|                 -|2005-06-04 01:21:45|2005-06-03|R06-M1-N6-C:J15-U01|2005-06-03 18:21:...|R06-M1-N6-C:J15-U01|         RAS|          KERNEL| INFO|1 tree receiver 2...|Friday|   null|
|                 -|2005-06-04 02:13:26|2005-06-03|R06-M1-N6-C:J15-U01|2005-06-03 19:13:...|R06-M1-N6-C:J15-U01|         RAS|          KERNEL| INFO|1 tree receiver 2...|Friday|   null|
|                 -|2005-06-05 08:55:12|2005-06-05|R06-M1-N6-C:J15-U01|2005

In [15]:
#filters out rows with 'null' values in the 'seconds' column of the DataFrame 'extracted_logs_df'. 'na.drop' method is used to remove rows containing 'null' values in the specified column 'seconds'.
extracted_logs_df = extracted_logs_df.na.drop(subset=["seconds"])
extracted_logs_df.show(truncate=False, vertical=True)



-RECORD 0--------------------------------------------------------------------------------------------------
 alert_message_flag | -                                                                                    
 timestamp          | 2005-11-15 13:18:52                                                                  
 date               | 2005-11-15                                                                           
 node               | R02-M0-NE-C:J02-U11                                                                  
 date_and_time      | 2005-11-15 05:18:52.775116                                                           
 node_repeated      | R02-M0-NE-C:J02-U11                                                                  
 message_type       | RAS                                                                                  
 system_component   | KERNEL                                                                               
 level              | INFO  

                                                                                

<span style="color:blue">***THE AVERAGE NUMBER OF SECONDS OVER WHICH "RE-SYNCH STATE EVENTS" OCCURRED FOR EACH DAY OF THE WEEK***<span style="color:blue">

In [16]:
# Calculate the average number of seconds per day for "re-synch state events" in the DataFrame 'extracted_logs_df'.
# Group the DataFrame by the 'day' column and calculate the average of the 'seconds' column for each group.
# The result will be a new DataFrame with two columns: 'day' and 'average_seconds'.
average_seconds_per_day=extracted_logs_df.groupBy("day").agg(avg("seconds").alias("average_seconds"))
average_seconds_per_day.show(truncate=False, vertical=True)

[Stage 10:>                                                         (0 + 6) / 6]

-RECORD 0-----------------------------
 day             | Tuesday            
 average_seconds | 8538.424347826087  
-RECORD 1-----------------------------
 day             | Friday             
 average_seconds | 6954.137299771167  
-RECORD 2-----------------------------
 day             | Thursday           
 average_seconds | 8412.631868131868  
-RECORD 3-----------------------------
 day             | Monday             
 average_seconds | 14511.80625        
-RECORD 4-----------------------------
 day             | Wednesday          
 average_seconds | 11170.689243027888 
-RECORD 5-----------------------------
 day             | Saturday           
 average_seconds | 17872.340482573727 
-RECORD 6-----------------------------
 day             | Sunday             
 average_seconds | 16091.967088607595 



                                                                                

**EXECUTION TIME COUNTER STOP- TOTAL EXECUTION TIME GENERATED BELOW**

In [17]:
#https://docs.python.org/3/library/time.html
#returns the value (in fractional seconds) of a performance counter.It does include time elapsed during sleep and is system-wide.
executiontimeend= time.perf_counter()

In [18]:
# Calculating the total execution time as  time.perf_counter() returns the absolute value of the counter
# https://stackoverflow.com/questions/25785243/understanding-time-perf-counter-and-time-process-time
totalexecution_time= executiontimeend - executiontimestart
totalexecution_time

9.01429712800018

In [19]:
# all the information on the 'spark runtime environment', executors sumamry, jobs and its stages can be found on 'Spark UI'
spark

In [20]:
average_seconds_per_day.show()

[Stage 13:>                                                         (0 + 6) / 6]

+---------+------------------+
|      day|   average_seconds|
+---------+------------------+
|  Tuesday| 8538.424347826087|
|   Friday| 6954.137299771167|
| Thursday| 8412.631868131868|
|   Monday|       14511.80625|
|Wednesday|11170.689243027888|
| Saturday|17872.340482573727|
|   Sunday|16091.967088607595|
+---------+------------------+



                                                                                