# Question3. How many fatal log entries in the months of December or January resulted from an ”invalid or missing program image”?

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, month
import time

#Intialize the Spark
start_time = time.time()

# Initialize Spark session
spark = SparkSession.builder \
      .config("spark.driver.host", "localhost") \
      .master("local[1]") \
      .appName("FatalProgramImageError") \
      .getOrCreate()

#Log File Path 
log_file_path = "BGL"


log_rdd = spark.sparkContext.textFile(log_file_path)

#Function to extract relevant data
def extract_fatal_program_image(line):
    columns = line.split()
    if len(columns) >= 10:
        timestamp_str = columns[1]
        date_str = columns[2]
        date_time_str = columns[4]
        level = columns[8]
        message_content = ' '.join(columns[9:])
        if level == "FATAL" and "invalid or missing program image" in message_content:
            return (date_time_str, timestamp_str)
    return None

#None values are filtered
filtered_log_rdd = log_rdd.map(extract_fatal_program_image).filter(lambda x: x is not None)

#RDD to DataFrame coversion
log_df = filtered_log_rdd.toDF(["DateTime", "Timestamp"])


log_df = log_df.withColumn("Timestamp", col("Timestamp").cast("long"))
log_df = log_df.withColumn("DateTime", unix_timestamp(col("DateTime"), 'yyyy-MM-dd-HH.mm.ss.SSSSSS').cast("timestamp"))

#Filtering the log entries and including onlt that occurred in December or January
filtered_df = log_df.filter((month(col("DateTime")) == 12) | (month(col("DateTime")) == 1))

#Count the number of relevant log entries
fatal_count = filtered_df.count()

#Showcasing the result
print(f"Number of fatal log entries in December or January resulting from 'invalid or missing program image': {fatal_count}")

#Recording the end time
end_time = time.time()

#Calculation and printing of the total execution time
total_time = end_time - start_time
print(f"Total execution time: {total_time:.2f} seconds")

#Stop the Spark session
spark.stop()


                                                                                

Number of fatal log entries in December or January resulting from 'invalid or missing program image': 18616
Total execution time: 21.19 seconds


# Question12. What are the top 6 most frequently occurring hours in the log?

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

# Initialize Spark session
spark = SparkSession.builder \
    .config("spark.driver.host", "localhost") \
    .master("local[1]") \
    .appName("TopHours") \
    .getOrCreate()

#Log File Path
log_file_path = "BGL"

#log file into an RDD
log_rdd = spark.sparkContext.textFile(log_file_path)

#Defining the function to extract the hour
def extract_hour(line):
    columns = line.split()
    if len(columns) >= 10:
        date_time_str = columns[4]
        hour = date_time_str.split('-')[3].split('.')[0]
        return (hour, 1)
    return None

#Record the start time
start_time = time.time()

#Extracting relevant information and filtering out None values
filtered_log_rdd = log_rdd.map(extract_hour).filter(lambda x: x is not None)

#Converting the RDD to a DataFrame
log_df = filtered_log_rdd.toDF(["Hour", "Count"])

#Log hour count
hour_counts_df = log_df.groupBy("Hour").sum("Count").withColumnRenamed("sum(Count)", "TotalCount")

#Obtaining the top 6 most frequently occurring hours
top_6_hours_df = hour_counts_df.orderBy(col("TotalCount").desc()).limit(6)

#Displaying the result
top_6_hours_df.show()

#End time
end_time = time.time()

#Calculating and printing the execution time
execution_time = end_time - start_time
print(f"Execution Time: {execution_time} seconds")

#Stop the Spark session
spark.stop()


                                                                                

+----+----------+
|Hour|TotalCount|
+----+----------+
|  11|    385826|
|  10|    373509|
|  12|    351982|
|  08|    315017|
|  09|    303681|
|  17|    299275|
+----+----------+

Execution Time: 24.364806175231934 seconds


# Qustion14. Which node generated the largest number of KERNRTSP events?

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

#Initialize Spark session
spark = SparkSession.builder.appName("LogParser").getOrCreate()

#Log File Path
log_file_path = "BGL"

#log file into an RDD
logs_rdd = spark.sparkContext.textFile(log_file_path)

#Deining the function to parse each line
def parse_line(line):
    parts = line.split()
    alert_message_flag = parts[0]
    timestamp = parts[1]
    date = parts[2]
    node = parts[3]
    date_time = parts[4]
    node_repeated = parts[5]
    ras_message_type = parts[6]
    system_component = parts[7]
    level = parts[8]
    message_content = " ".join(parts[9:])
    return (alert_message_flag, timestamp, date, node, date_time, node_repeated, ras_message_type, system_component, level, message_content)

#Recording the start time
start_time = time.time()

#Parsing the RDD
parsed_logs_rdd = logs_rdd.map(parse_line)

#Schema for the DataFrame
columns = ["alert_message_flag", "timestamp", "date", "node", "date_time", "node_repeated", "ras_message_type", "system_component", "level", "message_content"]

#Convert the RDD to a DataFrame
logs_df = parsed_logs_rdd.toDF(columns)

#Filtering the KERNRTSP events
kernrtsp_logs_df = logs_df.filter(col("alert_message_flag") == "KERNRTSP")

#Count the number of KERNRTSP events per node
node_counts_df = kernrtsp_logs_df.groupBy("node").count()

# Finding the node with the largest number of KERNRTSP events
max_node_count = node_counts_df.orderBy(col("count").desc()).first()

#End time
end_time = time.time()

#Calculating the execution time
execution_time = end_time - start_time
print(f"Node with the largest number of KERNRTSP events: {max_node_count['node']} with {max_node_count['count']} events")
print(f"Execution Time: {execution_time} seconds")

#Stop the Spark session
spark.stop()


                                                                                

Node with the largest number of KERNRTSP events: R63-M0-NE-C:J12-U01 with 22 events
Execution Time: 21.74234700202942 seconds


# Below Questions are using MapReduce

# Question 6. For each day of the week, what is the average number of seconds during which ”torus receiver z+ input pipe errors” were detected and corrected?

In [None]:
#!/usr/bin/env python3

import datetime
import re
from mrjob.job import MRJob
from mrjob.step import MRStep
import time

# Defining the function of Mapper and Reducer
class MRTorusReceiverErrors(MRJob):
    log_pattern = re.compile(r'- \d+ \d{4}\.\d{2}\.\d{2} .+ (\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}\.\d{2})\.\d+ .+ RAS .+ torus receiver z\+ input pipe error')

    def mapper_extract_errors(self, _, line):
        match = self.log_pattern.match(line)
        if match:
            timestamp_str = match.group(1)
            timestamp = datetime.datetime.strptime(timestamp_str, '%Y-%m-%d-%H.%M.%S')
            day_of_week = timestamp.strftime('%A')
            yield day_of_week, float(timestamp.strftime('%S'))

    def reducer_average_errors(self, day, counts):
        total_counts = list(counts)
        yield day, sum(total_counts) / len(total_counts)

    def steps(self):
        return [
            MRStep(mapper=self.mapper_extract_errors,
                   reducer=self.reducer_average_errors),
        ]

if __name__ == '__main__':
    start_time = time.time()
    MRTorusReceiverErrors.run()
    
    # End the timer and print the total execution time
    end_time = time.time()
    total_time = end_time - start_time
    print(f"Total execution time: {total_time} seconds")


# 19. On which date and time was the latest fatal app error where the message contains ”message prefix on control stream”?

In [None]:
#!/usr/bin/env python

from mrjob.job import MRJob
from mrjob.step import MRStep
import time

# Defining the function of Mapper and Reducer phase
class MRLatestFatalAppError(MRJob):

    def mapper_extract_fatal_app_error(self, _, line):
        columns = line.split()
        if len(columns) >= 10:
            timestamp_str = columns[1]
            date_str = columns[2]
            date_time_str = columns[4]
            level = columns[8]
            message_content = ' '.join(columns[9:])
            if level == "FATAL" and "message prefix on control stream" in message_content:
                yield None, (date_time_str, timestamp_str)

    def reducer_find_latest(self, _, date_time_pairs):
        latest_date_time = max(date_time_pairs, key=lambda x: int(x[1]))
        yield "Latest Fatal App Error", latest_date_time

    def steps(self):
        return [
            MRStep(mapper=self.mapper_extract_fatal_app_error,
                   reducer=self.reducer_find_latest)
        ]

if __name__ == '__main__':
    start_time = time.time()
    MRLatestFatalAppError.run()
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"Execution Time: {execution_time} seconds")
