Skip to content

Commit

Permalink
Add fold_logs.py script to summarize workflow execution by parsing lo…
Browse files Browse the repository at this point in the history
…gs (flyteorg#434)

* resurrected fold-logs.py script

Signed-off-by: Daniel Rammer <daniel@union.ai>

* printing more info

Signed-off-by: Daniel Rammer <daniel@union.ai>

* formatting output

Signed-off-by: Daniel Rammer <daniel@union.ai>

* removed queue tracking

Signed-off-by: Daniel Rammer <daniel@union.ai>

* added cache logs

Signed-off-by: Daniel Rammer <daniel@union.ai>

* cleaning up block definitions for uniformity

Signed-off-by: Daniel Rammer <daniel@union.ai>

* added comments

Signed-off-by: Daniel Rammer <daniel@union.ai>

* added argparse

Signed-off-by: Daniel Rammer <daniel@union.ai>

* tracking workflow enqueues on node updates

Signed-off-by: Daniel Rammer <daniel@union.ai>

* moved fold-logs.py to a script directory

Signed-off-by: Daniel Rammer <daniel@union.ai>

* parse gcp formatted logs (flyteorg#435)

Co-authored-by: Babis Kiosidis <babisk@spotify.com>
Signed-off-by: Daniel Rammer <daniel@union.ai>

Co-authored-by: Babis Kiosidis <ckiosidis@gmail.com>
Co-authored-by: Babis Kiosidis <babisk@spotify.com>
  • Loading branch information
3 people committed Jul 13, 2022
1 parent 6cd3279 commit d7fb99b
Showing 1 changed file with 267 additions and 0 deletions.
267 changes: 267 additions & 0 deletions flytepropeller/script/fold-logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
#!/usr/bin/env python

from abc import ABC, abstractmethod
from datetime import datetime

import argparse
import json
import random
import re
import sys

header = """Timestamp Line Duration Heirarchical Log Layout
----------------------------------------------------------------------------------------------------"""
printfmt = "%-11s %-7d %-11s %s"

# define propeller log blocks
class Block(ABC):
def __init__(self, line_number):
self.children = []
self.start_time = None
self.end_time = None
self.line_number = line_number

def get_id(self):
return self.__class__.__name__

@abstractmethod
def handle_log(self, log):
pass

def parse(self):
while True:
log, line_number = parser.next()
if not log:
return

if msg_key not in log:
continue

global enqueue_msgs
# match gcp {"data":{"src":"controller.go:156"},"message":"==\u003e Enqueueing workflow [flytesnacks-development/a4f8dxddvnfrfs6jr9nx-n0-0]","severity":"INFO","timestamp":"2022-05-03T09:28:01Z"}
# match other {"level":"debug", "msg":"Enqueueing owner for updated object [flytesnacks-development/a4f8dxddvnfrfs6jr9nx-n0-0]"}
if "Enqueueing owner for updated object" in log[msg_key]:
enqueue_msgs.append((line_number, log[ts_key], True))

# match {"level":"info", "msg":"==\u003e Enqueueing workflow [NAMESPACE/ID]}"}
if "Enqueueing workflow" in log[msg_key]:
enqueue_msgs.append((line_number, log[ts_key], False))

self.handle_log(log, line_number)
if self.end_time:
return

class Workflow(Block):
def __init__(self):
super().__init__(-1)

def handle_log(self, log, line_number):
# match {"level":"info", "msg":"==\u003e Enqueueing workflow [NAMESPACE/ID]}"}
if "Enqueueing workflow" in log[msg_key]:
if not self.start_time:
self.start_time = log[ts_key]
self.line_number = line_number

# match {"level":"info", "msg":"Processing Workflow"}
if "Processing Workflow" in log[msg_key]:
block = Processing(log[ts_key], line_number)
block.parse()
self.children.append(block)

class IDBlock(Block):
def __init__(self, id, start_time, end_time, line_number):
super().__init__(line_number)
self.id = id
self.start_time = start_time
self.end_time = end_time

def handle_log(self, log):
pass

def get_id(self):
return self.id

class Processing(Block):
def __init__(self, start_time, line_number):
super().__init__(line_number)
self.start_time = start_time
self.last_recorded_time = start_time

def handle_log(self, log, line_number):
# match {"level":"info", "msg":"Completed processing workflow"}
if "Completed processing workflow" in log[msg_key]:
self.end_time = log[ts_key]

# match {"level":"info", "msg":"Handling Workflow [ID], id: [project:\"PROJECT\" domain:\"DOMAIN\" name:\"ID\" ], p [STATUS]"}
if "Handling Workflow" in log[msg_key]:
match = re.search(r'p \[([\w]+)\]', log[msg_key])
if match:
block = StreakRound(f"{match.group(1)}", log[ts_key], line_number)
block.parse()
self.children.append(block)

class StreakRound(Block):
def __init__(self, phase, start_time, line_number):
super().__init__(line_number)
self.phase = phase
self.start_time = start_time
self.last_recorded_time = start_time

def get_id(self):
return f"{self.__class__.__name__}({self.phase})"

def handle_log(self, log, line_number):
# match {"level":"info", "msg":"Catalog CacheEnabled: recording execution [PROJECT/DOMAIN/TASK_ID/TASK_VERSION]"}
if "Catalog CacheEnabled. recording execution" in log[msg_key]:
id = "CacheWrite(" + log[blob_key]["node"] + ")"
self.children.append(IDBlock(id, self.last_recorded_time, log[ts_key], line_number))
self.last_recorded_time = log[ts_key]

# match {"level":"info", "msg":"Catalog CacheHit: for task [PROJECT/DOMAIN/TASK_ID/TASK_VERSION]"}
if "Catalog CacheHit" in log[msg_key]:
id = "CacheHit(" + log[blob_key]["node"] + ")"
self.children.append(IDBlock(id, self.last_recorded_time, log[ts_key], line_number))
self.last_recorded_time = log[ts_key]

# match {"level":"info", "msg":"Catalog CacheMiss: Artifact not found in Catalog. Executing Task."}
if "Catalog CacheMiss" in log[msg_key]:
id = "CacheMiss(" + log[blob_key]["node"] + ")"
self.children.append(IDBlock(id, self.last_recorded_time, log[ts_key], line_number))
self.last_recorded_time = log[ts_key]

# match {"level":"info", "msg":"Change in node state detected from [STATUS] -\u003e [STATUS], (handler phase [STATUS])"}
# or {"level":"info", "msg":"Change in node state detected from [STATUS] -\u003e [STATUS]"}
if "Change in node state detected" in log[msg_key]:
id = "UpdateNodePhase(" + log[blob_key]["node"]

match = re.search(r'\[([\w]+)\] -> \[([\w]+)\]', log[msg_key])
if match:
id += f",{match.group(1)},{match.group(2)})"

self.children.append(IDBlock(id, self.last_recorded_time, log[ts_key], line_number))
self.last_recorded_time = log[ts_key]

# match {"level":"info", "msg":"Handling Workflow [ID] done"}
if "Handling Workflow" in log[msg_key]:
self.end_time = log[ts_key]

# match {"level":"debug", "msg":"node succeeding"}
if "node succeeding" in log[msg_key]:
id = "UpdateNodePhase(" + log[blob_key]["node"] + ",Succeeding,Succeeded)"
self.children.append(IDBlock(id, self.last_recorded_time, log[ts_key], line_number))
self.last_recorded_time = log[ts_key]

# match {"level":"debug", "msg":"Sending transition event for plugin phase [STATUS]"}
if "Sending transition event for plugin phase" in log[msg_key]:
id = "UpdatePluginPhase(" + log[blob_key]["node"]

match = re.search(r'\[([\w]+)\]', log[msg_key])
if match:
id += f",{match.group(1)})"

self.children.append(IDBlock(id, self.last_recorded_time, log[ts_key], line_number))
self.last_recorded_time = log[ts_key]

# match {"level":"info", "msg":"Transitioning/Recording event for workflow state transition [STATUS] -\u003e [STATUS]"}
if "Transitioning/Recording event for workflow state transition" in log[msg_key]:
id = "UpdateWorkflowPhase("

match = re.search(r'\[([\w]+)\] -> \[([\w]+)\]', log[msg_key])
if match:
id += f"{match.group(1)},{match.group(2)})"

self.children.append(IDBlock(id, self.last_recorded_time, log[ts_key], line_number))
self.last_recorded_time = log[ts_key]

# define JsonLogParser class
class JsonLogParser:
def __init__(self, file, workflow_id):
self.file = file
self.workflow_id = workflow_id
self.line_number = 0

def next(self):
while True:
# read next line
line = self.file.readline()
if not line:
return None, -1
self.line_number += 1

try:
# only process if {blob_key:{"exec_id":"WORKFLOW_ID"}} or json.msg contains WORKFLOW_ID
log = json.loads(line)
if ("exec_id" in log[blob_key] and log[blob_key]["exec_id"] == self.workflow_id) \
or (msg_key in log and self.workflow_id in log[msg_key]):
return log, self.line_number
except:
# TODO - stderr?
pass

def print_block(block, prefix, print_enqueue):
# print all enqueue messages from prior to this line number
if print_enqueue:
while len(enqueue_msgs) > 0 and enqueue_msgs[0][0] <= block.line_number:
enqueue_msg = enqueue_msgs.pop(0)
enqueue_time = datetime.strptime(enqueue_msg[1], '%Y-%m-%dT%H:%M:%S%z').strftime("%H:%M:%S")
id = "EnqueueWorkflow"
if enqueue_msg[2]:
id += "OnNodeUpdate"

print(printfmt %(enqueue_time, enqueue_msg[0], "-", id))

# compute block elapsed time
elapsed_time = 0
if block.end_time and block.start_time:
elapsed_time = datetime.strptime(block.end_time, '%Y-%m-%dT%H:%M:%S%z').timestamp() \
- datetime.strptime(block.start_time, '%Y-%m-%dT%H:%M:%S%z').timestamp()

# compute formatted id
start_time = datetime.strptime(block.start_time, '%Y-%m-%dT%H:%M:%S%z').strftime("%H:%M:%S")
id = prefix + " " + block.get_id()

print(printfmt %(start_time, block.line_number, str(elapsed_time) + "s", id))

# process all children
count = 1
for child in block.children:
print_block(child, f" {prefix}.{count}", print_enqueue)
count += 1

if __name__ == "__main__":
# parse arguments
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("path", help="path to FlytePropeller log dump")
arg_parser.add_argument("workflow_id", help="workflow ID to analyze")
arg_parser.add_argument("-e", "--print-enqueue", action="store_true", help="print enqueue workflow messages")
arg_parser.add_argument("-gcp", "--gcp", action='store_true', default=False, help="enable for gcp formatted logs")
args = arg_parser.parse_args()


global msg_key
global ts_key
global blob_key
if args.gcp:
blob_key = "data"
msg_key = "message"
ts_key = "timestamp"
else:
msg_key = "msg"
ts_key = "ts"
blob_key = "json"
# parse workflow
workflow = Workflow()
with open(args.path, "r") as file:
global parser
parser = JsonLogParser(file, args.workflow_id)

global enqueue_msgs
enqueue_msgs = []

workflow.parse()

workflow.end_time = workflow.children[len(workflow.children) - 1].end_time

# print workflow
print(header)
print_block(workflow, "1", args.print_enqueue)

0 comments on commit d7fb99b

Please sign in to comment.