## Parsing eventlogs

Eventlogs are json like formats with a json like dictionary per row "{k1:v1, k2:v2, ...}"

- https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-LiveListenerBus.html
- https://www.paypal-engineering.com/2016/09/08/spark-in-flames-profiling-spark-applications-using-flame-graphs/

In [113]:
import json
from pprint import pprint

logs = []
for line in open('./spark_eventlogs/local-1527585093053', 'r'):
    logs.append(json.loads(line))
    

In [114]:
len(logs)

6

In [115]:
for j in logs:
    print(j.keys())

dict_keys(['Event', 'Spark Version'])
dict_keys(['Event', 'Timestamp', 'Executor ID', 'Executor Info'])
dict_keys(['Event', 'Block Manager ID', 'Maximum Memory', 'Timestamp', 'Maximum Onheap Memory', 'Maximum Offheap Memory'])
dict_keys(['Event', 'JVM Information', 'Spark Properties', 'System Properties', 'Classpath Entries'])
dict_keys(['Event', 'App Name', 'App ID', 'Timestamp', 'User'])
dict_keys(['Event', 'Timestamp'])


In [116]:
logs[0]

{'Event': 'SparkListenerLogStart', 'Spark Version': '2.3.0'}

In [117]:
logs[1]

{'Event': 'SparkListenerExecutorAdded',
 'Executor ID': 'driver',
 'Executor Info': {'Host': 'localhost', 'Log Urls': {}, 'Total Cores': 4},
 'Timestamp': 1527585093100}

In [118]:
logs[2]

{'Block Manager ID': {'Executor ID': 'driver',
  'Host': 'bsccs436.int.bsc.es',
  'Port': 40427},
 'Event': 'SparkListenerBlockManagerAdded',
 'Maximum Memory': 384093388,
 'Maximum Offheap Memory': 0,
 'Maximum Onheap Memory': 384093388,
 'Timestamp': 1527585093111}

In [119]:
logs[3]

{'Classpath Entries': {'/home/david/spark-2.3.0-bin-hadoop2.7/conf/': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/JavaEWAH-0.3.2.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/RoaringBitmap-0.5.11.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/ST4-4.0.4.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/activation-1.1.1.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/aircompressor-0.8.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/antlr-2.7.7.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/antlr-runtime-3.4.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/antlr4-runtime-4.7.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/aopalliance-1.0.jar': 'System Classpath',
  '/home/david/spark-2.3.0-bin-hadoop2.7/jars/aopalliance-repackaged-2.4.0-b34.jar': 'System Classpath',
  '/home/da

In [120]:
logs[4]

{'App ID': 'local-1527585093053',
 'App Name': 'PySparkShell',
 'Event': 'SparkListenerApplicationStart',
 'Timestamp': 1527585092225,
 'User': 'david'}

In [121]:
logs[5]

{'Event': 'SparkListenerApplicationEnd', 'Timestamp': 1527585106390}

### Another log

In [135]:
logs2 = []
for line in open('./spark_eventlogs/local-1527585270292', 'r'):
    logs2.append(json.loads(line))    

In [123]:
len(logs2)

18

In [124]:
logs2[0]

{'Event': 'SparkListenerLogStart', 'Spark Version': '2.3.0'}

In [125]:
logs2[1]

{'Event': 'SparkListenerExecutorAdded',
 'Executor ID': 'driver',
 'Executor Info': {'Host': 'localhost', 'Log Urls': {}, 'Total Cores': 4},
 'Timestamp': 1527585270331}

In [126]:
logs2[2]

{'Block Manager ID': {'Executor ID': 'driver',
  'Host': 'bsccs436.int.bsc.es',
  'Port': 40054},
 'Event': 'SparkListenerBlockManagerAdded',
 'Maximum Memory': 384093388,
 'Maximum Offheap Memory': 0,
 'Maximum Onheap Memory': 384093388,
 'Timestamp': 1527585270344}

In [127]:
for i,j in enumerate(logs2):
    print(i, "", j.keys())

0  dict_keys(['Event', 'Spark Version'])
1  dict_keys(['Event', 'Timestamp', 'Executor ID', 'Executor Info'])
2  dict_keys(['Event', 'Block Manager ID', 'Maximum Memory', 'Timestamp', 'Maximum Onheap Memory', 'Maximum Offheap Memory'])
3  dict_keys(['Event', 'JVM Information', 'Spark Properties', 'System Properties', 'Classpath Entries'])
4  dict_keys(['Event', 'App Name', 'App ID', 'Timestamp', 'User'])
5  dict_keys(['Event', 'Job ID', 'Submission Time', 'Stage Infos', 'Stage IDs', 'Properties'])
6  dict_keys(['Event', 'Stage Info', 'Properties'])
7  dict_keys(['Event', 'Stage ID', 'Stage Attempt ID', 'Task Info'])
8  dict_keys(['Event', 'Stage ID', 'Stage Attempt ID', 'Task Info'])
9  dict_keys(['Event', 'Stage ID', 'Stage Attempt ID', 'Task Info'])
10  dict_keys(['Event', 'Stage ID', 'Stage Attempt ID', 'Task Info'])
11  dict_keys(['Event', 'Stage ID', 'Stage Attempt ID', 'Task Type', 'Task End Reason', 'Task Info', 'Task Metrics'])
12  dict_keys(['Event', 'Stage ID', 'Stage Attempt

In [128]:
### Operation colled called
logs2[6]["Properties"]

{'callSite.short': 'collect at <stdin>:1',
 'spark.rdd.scope': '{"id":"1","name":"collect"}',
 'spark.rdd.scope.noOverride': 'true'}

In [129]:
logs2[6]["Stage Info"].keys()

dict_keys(['Stage ID', 'Stage Attempt ID', 'Stage Name', 'Number of Tasks', 'RDD Info', 'Parent IDs', 'Details', 'Submission Time', 'Accumulables'])

In [130]:
logs2[6]["Stage Info"]["Number of Tasks"]

4

In [131]:
logs2[6]["Stage Info"]["RDD Info"]

[{'Callsite': 'collect at <stdin>:1',
  'Disk Size': 0,
  'Memory Size': 0,
  'Name': 'PythonRDD',
  'Number of Cached Partitions': 0,
  'Number of Partitions': 4,
  'Parent IDs': [0],
  'RDD ID': 1,
  'Storage Level': {'Deserialized': False,
   'Replication': 1,
   'Use Disk': False,
   'Use Memory': False}},
 {'Callsite': 'parallelize at PythonRDD.scala:175',
  'Disk Size': 0,
  'Memory Size': 0,
  'Name': 'ParallelCollectionRDD',
  'Number of Cached Partitions': 0,
  'Number of Partitions': 4,
  'Parent IDs': [],
  'RDD ID': 0,
  'Scope': '{"id":"0","name":"parallelize"}',
  'Storage Level': {'Deserialized': False,
   'Replication': 1,
   'Use Disk': False,
   'Use Memory': False}}]

In [132]:
logs2[6]["Stage Info"]["Number of Tasks"]

4

In [133]:
logs2[6]["Stage Info"]["Accumulables"]

[]

In [134]:
s = "Task Metrics"
    
for j in logs2: 
    if s in j:
        print(j[s], "\n")

{'Executor Deserialize Time': 36, 'Executor Deserialize CPU Time': 9248442, 'Executor Run Time': 463, 'Executor CPU Time': 7902265, 'Result Size': 9003, 'JVM GC Time': 0, 'Result Serialization Time': 0, 'Memory Bytes Spilled': 0, 'Disk Bytes Spilled': 0, 'Shuffle Read Metrics': {'Remote Blocks Fetched': 0, 'Local Blocks Fetched': 0, 'Fetch Wait Time': 0, 'Remote Bytes Read': 0, 'Remote Bytes Read To Disk': 0, 'Local Bytes Read': 0, 'Total Records Read': 0}, 'Shuffle Write Metrics': {'Shuffle Bytes Written': 0, 'Shuffle Write Time': 0, 'Shuffle Records Written': 0}, 'Input Metrics': {'Bytes Read': 0, 'Records Read': 0}, 'Output Metrics': {'Bytes Written': 0, 'Records Written': 0}, 'Updated Blocks': []} 

{'Executor Deserialize Time': 29, 'Executor Deserialize CPU Time': 4772051, 'Executor Run Time': 469, 'Executor CPU Time': 48758012, 'Result Size': 8790, 'JVM GC Time': 0, 'Result Serialization Time': 1, 'Memory Bytes Spilled': 0, 'Disk Bytes Spilled': 0, 'Shuffle Read Metrics': {'Remot