In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ShortType, ByteType, DateType, TimestampType, LongType
session = SparkSession.builder.appName("logs").getOrCreate()

23/08/11 10:40:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
logs_schema = StructType([
    StructField("alert", StringType()),
    StructField("timestamp", LongType()),
    StructField("date", DateType()),
    StructField("node", StringType()),
    StructField("timestamp2", TimestampType()),
    StructField("node2", StringType()),
    StructField("ras", StringType()),
    StructField("kernel", StringType()),
    StructField("info", StringType()),
    StructField("message", StringType())
])

In [3]:
logs_df = session.read.csv(
    "BGLnew.log",
    schema=logs_schema,
    header=True,
    dateFormat="yyyy.MM.dd",
    timestampFormat = "yyyy-MM-dd-HH.mm.ss.SSSSSS"
)

In [4]:
logs_df.printSchema()

root
 |-- alert: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- date: date (nullable = true)
 |-- node: string (nullable = true)
 |-- timestamp2: timestamp (nullable = true)
 |-- node2: string (nullable = true)
 |-- ras: string (nullable = true)
 |-- kernel: string (nullable = true)
 |-- info: string (nullable = true)
 |-- message: string (nullable = true)



In [5]:
logs_df.show()

[Stage 0:>                                                          (0 + 1) / 1]23/08/11 10:41:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: -, 1117838570, 2005.06.03, R02-M1-N0-C:J12-U11, 2005-06-03-15.42.50.363779, R02-M1-N0-C:J12-U11, RAS, KERNEL, INFO, instruction cache parity error corrected
 Schema: alert, timestamp, date, node, timestamp2, node2, ras, kernel, info, message
Expected: alert but found: -
CSV file: file:///home/anuhya/ScalableProject/BGLnew.log


+-----+----------+----------+-------------------+--------------------+-------------------+---+------+----+--------------------+
|alert| timestamp|      date|               node|          timestamp2|              node2|ras|kernel|info|             message|
+-----+----------+----------+-------------------+--------------------+-------------------+---+------+----+--------------------+
|    -|1117838570|2005-06-03|R02-M1-N0-C:J12-U11|2005-06-03 15:42:...|R02-M1-N0-C:J12-U11|RAS|KERNEL|INFO|instruction cache...|
|    -|1117838570|2005-06-03|R02-M1-N0-C:J12-U11|2005-06-03 15:42:...|R02-M1-N0-C:J12-U11|RAS|KERNEL|INFO|instruction cache...|
|    -|1117838570|2005-06-03|R02-M1-N0-C:J12-U11|2005-06-03 15:42:...|R02-M1-N0-C:J12-U11|RAS|KERNEL|INFO|instruction cache...|
|    -|1117838570|2005-06-03|R02-M1-N0-C:J12-U11|2005-06-03 15:42:...|R02-M1-N0-C:J12-U11|RAS|KERNEL|INFO|instruction cache...|
|    -|1117838571|2005-06-03|R02-M1-N0-C:J12-U11|2005-06-03 15:42:...|R02-M1-N0-C:J12-U11|RAS|KERNEL|INF



In [9]:
SQLQuery1 = """
SELECT 
   COUNT(*) AS no_of_fatallogs
FROM 
    logs
WHERE 
    MONTH(date) = 12 -- December
    AND message LIKE '%invalid or missing program image%'
"""

In [10]:
session.sql(SQLQuery1).show()

23/08/11 10:43:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2005.06.03, instruction cache parity error corrected
 Schema: date, message
Expected: date but found: 2005.06.03
CSV file: file:///home/anuhya/ScalableProject/BGLnew.log
                                                                                

+---------------+
|no_of_fatallogs|
+---------------+
|          18584|
+---------------+



In [8]:
logs_df.createOrReplaceTempView("logs")

In [11]:
SQLQuery5 = """SELECT 
DATE_TRUNC('month', date) AS month,
AVG(error_duration_seconds) AS avg_error_duration_per_month
FROM 
  (
    SELECT 
      date, 
      COUNT(*) AS error_duration_seconds
    FROM logs
    WHERE message LIKE '%EDRAM%' 
    GROUP BY date
  ) 
GROUP BY
  month
"""

In [12]:
session.sql(SQLQuery5).show()

23/08/11 10:44:59 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2005.06.03, instruction cache parity error corrected
 Schema: date, message
Expected: date but found: 2005.06.03
CSV file: file:///home/anuhya/ScalableProject/BGLnew.log

+-------------------+----------------------------+
|              month|avg_error_duration_per_month|
+-------------------+----------------------------+
|2005-12-01 00:00:00|          42.166666666666664|
|2005-11-01 00:00:00|          28.133333333333333|
|2006-01-01 00:00:00|                        25.0|
|2005-10-01 00:00:00|          28.258064516129032|
|2005-08-01 00:00:00|          23.285714285714285|
|2005-09-01 00:00:00|          22.620689655172413|
|2005-06-01 00:00:00|          49.607142857142854|
|2005-07-01 00:00:00|          17.741935483870968|
+-------------------+----------------------------+



                                                                                

In [13]:
SQLQuery9 = """
    SELECT date, COUNT(*) as frequency
    FROM logs
    GROUP BY date
    ORDER BY frequency DESC
    LIMIT 5;
"""

In [14]:
session.sql(SQLQuery9).show()

23/08/11 10:46:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2005.06.03
 Schema: date
Expected: date but found: 2005.06.03
CSV file: file:///home/anuhya/ScalableProject/BGLnew.log

+----------+---------+
|      date|frequency|
+----------+---------+
|2005-07-09|   381827|
|2005-06-14|   381561|
|2005-12-01|   271341|
|2005-11-03|   200937|
|2005-07-23|   200654|
+----------+---------+



                                                                                

In [15]:
SQLQuery15 = """
SELECT node, COUNT(*) as appbusy_events_count, COUNT(*) OVER() as total_entries
FROM logs
WHERE alert like '%KERNRTSP%'
GROUP BY node
ORDER BY appbusy_events_count DESC
LIMIT 1;
"""

In [16]:
session.sql(SQLQuery15).show()

23/08/11 10:46:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/08/11 10:46:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/08/11 10:46:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/08/11 10:46:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: -, R02-M1-N0-C:J12-U11
 Schema: alert, node
Expected: alert but found: -
CSV file: file:///home/anuhya/ScalableProject/BGLnew.log
23/08/11 10:46:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/08/11 10:46:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, 

+-------------------+--------------------+-------------+
|               node|appbusy_events_count|total_entries|
+-------------------+--------------------+-------------+
|R63-M0-NE-C:J12-U01|                  22|         3394|
+-------------------+--------------------+-------------+



                                                                                

In [17]:
SQLQuery18 = """
SELECT 
    MIN(date) AS earliest_error_date
FROM 
    logs
WHERE 
    message LIKE '%Power Good signal deactivated%'
"""

In [18]:
session.sql(SQLQuery18).show()

23/08/11 10:47:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2005.06.03, instruction cache parity error corrected
 Schema: date, message
Expected: date but found: 2005.06.03
CSV file: file:///home/anuhya/ScalableProject/BGLnew.log

+-------------------+
|earliest_error_date|
+-------------------+
|         2005-11-17|
+-------------------+



                                                                                

In [19]:
SQLQuery2 = """
SELECT 
   COUNT(*) AS no_of_fatallogs
FROM 
    logs
WHERE 
    MONTH(date) = 9 -- September
    AND message LIKE '%major internal error%'
"""

In [20]:
session.sql(SQLQuery2).show()

23/08/11 10:48:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2005.06.03, instruction cache parity error corrected
 Schema: date, message
Expected: date but found: 2005.06.03
CSV file: file:///home/anuhya/ScalableProject/BGLnew.log

+---------------+
|no_of_fatallogs|
+---------------+
|             10|
+---------------+



                                                                                

In [21]:
SQLQuery11 = """
    SELECT node, COUNT(*) as frequency
    FROM logs
    GROUP BY node
    ORDER BY frequency DESC
    LIMIT 5;
"""

In [22]:
session.sql(SQLQuery11).show()

23/08/11 10:49:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: R02-M1-N0-C:J12-U11
 Schema: node
Expected: node but found: R02-M1-N0-C:J12-U11
CSV file: file:///home/anuhya/ScalableProject/BGLnew.log

+-------------------+---------+
|               node|frequency|
+-------------------+---------+
|R30-M0-N9-C:J16-U01|   152329|
|               NULL|    89296|
|R02-M1-N0-C:J12-U11|    64650|
|R37-M1-NC-C:J02-U11|    35288|
|   UNKNOWN_LOCATION|    27039|
+-------------------+---------+



23/08/11 10:50:33 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/spark-7a454bd9-abe0-47ee-b10d-4a1cae37ba16/userFiles-087dcaf7-eef5-4b9e-8072-abe4c30f8340. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/spark-7a454bd9-abe0-47ee-b10d-4a1cae37ba16/userFiles-087dcaf7-eef5-4b9e-8072-abe4c30f8340
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:177)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:113)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
	at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
	at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.fo