In [0]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("usecase").master("local[*]").getOrCreate())
spark

In [0]:
df = spark.read.option("multiline","True").json("/FileStore/tables/sample_logs.json")

In [0]:
df.display()

log_level,message,server_id,timestamp
ERROR,Disk space low.,Server_18,2024-12-10T14:39:01
WARN,Memory usage exceeded 80%.,Server_05,2024-12-07T11:30:20
INFO,Backup completed successfully.,Server_20,2024-12-12T04:17:29
ERROR,Critical security vulnerability detected.,Server_10,2024-12-06T01:34:52
ERROR,Application crashed due to an unknown error.,Server_11,2024-12-11T06:43:10
WARN,High memory usage detected.,Server_08,2024-12-11T10:28:16
ERROR,Disk write failure.,Server_04,2024-12-06T00:16:07
INFO,Server started successfully.,Server_01,2024-12-09T00:44:49
INFO,System maintenance completed.,Server_01,2024-12-06T02:32:51
WARN,High memory usage detected.,Server_10,2024-12-07T15:03:02


In [0]:
df.printSchema()

root
 |-- log_level: string (nullable = true)
 |-- message: string (nullable = true)
 |-- server_id: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
from pyspark.sql.functions import to_date

df_date = df.withColumn("Timestamp",to_date("timestamp"))
df_date.display()

log_level,message,server_id,Timestamp
ERROR,Disk space low.,Server_18,2024-12-10
WARN,Memory usage exceeded 80%.,Server_05,2024-12-07
INFO,Backup completed successfully.,Server_20,2024-12-12
ERROR,Critical security vulnerability detected.,Server_10,2024-12-06
ERROR,Application crashed due to an unknown error.,Server_11,2024-12-11
WARN,High memory usage detected.,Server_08,2024-12-11
ERROR,Disk write failure.,Server_04,2024-12-06
INFO,Server started successfully.,Server_01,2024-12-09
INFO,System maintenance completed.,Server_01,2024-12-06
WARN,High memory usage detected.,Server_10,2024-12-07


In [0]:
df_filter_error = df_date.filter("log_level == 'ERROR'")
df_filter_error.display()

log_level,message,server_id,Timestamp
ERROR,Disk space low.,Server_18,2024-12-10
ERROR,Critical security vulnerability detected.,Server_10,2024-12-06
ERROR,Application crashed due to an unknown error.,Server_11,2024-12-11
ERROR,Disk write failure.,Server_04,2024-12-06
ERROR,Failed to connect to the database.,Server_15,2024-12-07
ERROR,Failed to connect to the database.,Server_06,2024-12-07
ERROR,Disk write failure.,Server_08,2024-12-09
ERROR,Disk space low.,Server_08,2024-12-05
ERROR,Disk write failure.,Server_15,2024-12-08
ERROR,Failed to connect to the database.,Server_20,2024-12-07


In [0]:
from pyspark.sql.functions import current_date,date_sub,col
df_filter_date = df_filter_error.filter(col("timestamp") >= date_sub(current_date(),7))
df_filter_date.display()

log_level,message,server_id,Timestamp
ERROR,Disk space low.,Server_18,2024-12-10
ERROR,Application crashed due to an unknown error.,Server_11,2024-12-11
ERROR,Disk write failure.,Server_08,2024-12-09
ERROR,Failed to connect to the database.,Server_06,2024-12-09
ERROR,Critical security vulnerability detected.,Server_03,2024-12-10
ERROR,Application crashed due to an unknown error.,Server_04,2024-12-09
ERROR,Critical security vulnerability detected.,Server_06,2024-12-10
ERROR,Application crashed due to an unknown error.,Server_07,2024-12-11
ERROR,Failed to connect to the database.,Server_01,2024-12-11
ERROR,Disk write failure.,Server_13,2024-12-12


In [0]:
df_group = df_filter_date.groupBy("server_id").count()
df_group.display()

server_id,count
Server_16,136
Server_12,182
Server_04,174
Server_17,155
Server_06,151
Server_13,155
Server_09,165
Server_02,153
Server_05,185
Server_01,145


In [0]:
from pyspark.sql.functions import desc
df_order = df_group.orderBy(col("count").desc())
df_order.display()

server_id,count
Server_05,185
Server_12,182
Server_04,174
Server_09,165
Server_19,164
Server_18,158
Server_07,157
Server_17,155
Server_13,155
Server_02,153


In [0]:
df_top_3 = df_order.limit(3)
df_top_3.display()

server_id,count
Server_05,185
Server_12,182
Server_04,174


In [0]:
df_order2 = df_filter_date.groupBy("server_id","timestamp").count().orderBy(col("timestamp").desc(),col("server_id"))
df_order2.display()

server_id,timestamp,count
Server_01,2024-12-12,13
Server_02,2024-12-12,17
Server_03,2024-12-12,10
Server_04,2024-12-12,12
Server_05,2024-12-12,16
Server_06,2024-12-12,10
Server_07,2024-12-12,14
Server_08,2024-12-12,9
Server_09,2024-12-12,7
Server_10,2024-12-12,10


In [0]:
from pyspark.sql.functions import avg
df_average = df_order2.groupBy("server_id").agg(avg("count").alias("avg_count")).orderBy(col("server_id"))
df_average.display()

server_id,avg_count
Server_01,36.25
Server_02,38.25
Server_03,35.25
Server_04,43.5
Server_05,46.25
Server_06,37.75
Server_07,39.25
Server_08,37.75
Server_09,41.25
Server_10,36.75


In [0]:
df_order3 = df.groupBy("log_level","message").count()
df_order3.display()

log_level,message,count
INFO,Server started successfully.,1602
WARN,Memory usage exceeded 80%.,2293
ERROR,Failed to connect to the database.,1332
INFO,Backup completed successfully.,1653
ERROR,Disk write failure.,1385
WARN,High CPU usage detected.,2203
ERROR,Application crashed due to an unknown error.,1361
ERROR,Critical security vulnerability detected.,1324
ERROR,Disk space low.,1264
INFO,System maintenance completed.,1709


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
df_window = Window.partitionBy("log_level").orderBy(col("count").desc())
df_rank = rank().over(df_window)
df_summary = df_order3.withColumn("rank",df_rank)
df_summary.display()

log_level,message,count,rank
ERROR,Disk write failure.,1385,1
ERROR,Application crashed due to an unknown error.,1361,2
ERROR,Failed to connect to the database.,1332,3
ERROR,Critical security vulnerability detected.,1324,4
ERROR,Disk space low.,1264,5
INFO,System maintenance completed.,1709,1
INFO,Backup completed successfully.,1653,2
INFO,Server started successfully.,1602,3
INFO,Service restarted successfully.,1598,4
WARN,Memory usage exceeded 80%.,2293,1


In [0]:
top_messages = df_summary.filter(col("rank")==1).select("log_level","message","count")
top_messages.show(truncate=False)

+---------+-----------------------------+-----+
|log_level|message                      |count|
+---------+-----------------------------+-----+
|ERROR    |Disk write failure.          |1385 |
|INFO     |System maintenance completed.|1709 |
|WARN     |Memory usage exceeded 80%.   |2293 |
+---------+-----------------------------+-----+

