<h1 style="color:green">POC - Sorting log files</h1>
<p>As part of the introduction of asyncio in Conway, logs are no longer appearing in the order of algorithmic declaration,
but in the order of execution, which can make it hard to read for situations like test cases</p>

<p>To address that, in November 2024 a number of "labels" (to borrow the Grafana-like term) were added to the logs to record the ancestry of stack calls that lead to asyncio calls. This creates logs that are still "unsorted" (i.e., not in the order of algorithmic declaration), but which have enough context information (the "labels") to permit re-sorting them.</p>

<p>This notebook was added to prototype such sorting.</p>

In [1]:
import os                                                               as _os
import sys
sys.path.append(_os.path.dirname(_os.getcwd())) # So we can import tvm_notebook_utils
import chassis_nb_utils
NBU                       = chassis_nb_utils.Chassis_NB_Utils()
DFU                       = NBU.DFU
T0                        = NBU.time.perf_counter()

CONWAY installation:            [34m[7m    conway_fork    [0m
Jupyter using repo[branch]:  [32m[7m    conway.ops[afin-dev]    [0m
Installation path:           [34m[7m    /home/alex/consultant1@CCL/dev/conway_fork    [0m
Application:                 [32m[7m    <class 'chassis_nb_application.Chassis_NB_Application'>    [0m


In [2]:
LOGS_FOLDER                = "/var/log/ccl/consultant1@CCL/dev/conway_fork/ConwayTestApp"
LOG_FILE                   = "241102.222513_ConwayTestApp.log"
LOG_PATH                   = f"{LOGS_FOLDER}/{LOG_FILE}"
SCHEDULING_CONTEXT         = "scheduling_context"

In [3]:
import json as _json
import re as _re

In [4]:
# Open the file in read mode
data_l = []
with open(LOG_PATH, 'r') as file:
    # Read the file line by line
    for line in file:
        data = _json.loads(line)
        data_l.append(data)
len(data_l)

40

In [9]:
data_l[10], type(data_l[10])

({'message': "Created 'integration' branch in 'scenario_8002.scenarios' with URL https://api.github.com/repos/testrobot-ccl/scenario_8002.scenarios/git/refs/heads/integration",
  'labels': {'scheduling_context': {'timestamp': '2.278 sec',
    'thread': 'MainThread',
    'task': 'Task-36',
    'source': 'repo_manipulation_test_case:100'},
   'timestamp': '4.186 sec',
   'thread': 'MainThread',
   'task': 'Task-38',
   'source': 'repo_manipulation_test_case:195'}},
 dict)

In [31]:

def _extract_task_timestamps(data_l):
    '''
    Creates and returns a dictionary with metadata about the log lines. The returned dictionary
    will have the task ids as key. For example, "Task-36" could be a key.
    
    The values will be a subdictionary with two entries:
    * An entry "timestamps" with values being list of timestamps (such as "2.101 sec") that appear in the log file for such a task,
      sorted in ascending order

    * An entry "ancestors" with values being a (possibly empty) list of the other tasks that are ancestors of this one, sorted
      from immediate parent to parent's parent, and so on.
        
     The purpose of getting this list is a preliminary piece of information to correctly sort log lines.
    
     For example, the log lines for two "leaf" tasks (i.e., tasks that don't trigger other sub-tasks) should be sorted so that 
     lines for each task are adjacent, with the lines for the task that "first appeared" listed before the lines of the other task.
     In this case, the determination of "first appeared" is made by taking the min of the list for such task in this result_dict.
    '''

    result_dict = {}

    def _extract_recursively(labels):
        '''
        Inner method so taht we extrat not just the timestamp for the log line in question, but also move up the scheduling context
        hierarchy, i.e., the timestamps of prior log lines that led to this one.
        '''
        task = labels['task']
        timestamp = labels['timestamp']
        if not task in result_dict.keys():
            result_dict[task] = {"timestamps": [], "ancestors": []}
        result_dict[task]["timestamps"].append(timestamp)

        # Now make a recursive call if needed
        if SCHEDULING_CONTEXT in labels.keys():
            parent_labels = labels[SCHEDULING_CONTEXT]
            _extract_recursively(parent_labels)
            parent_task = parent_labels['task']
            parent_ancestors = result_dict[parent_task]['ancestors']
            ancestors_so_far = result_dict[task]['ancestors']
            ancestors = list(set({parent_task}).union(set(parent_ancestors)).union(set(ancestors_so_far)))
            result_dict[task]['ancestors'] = ancestors
        

    for datum in data_l:
        labels = datum['labels']
        _extract_recursively(labels)

    # Now sort and avoid duplicates
    for task in result_dict.keys():
        ts_l = result_dict[task]["timestamps"]
        # remove duplicates
        ts_l = list(set(ts_l))
        # sort
        ts_l = sorted(ts_l, key=_timestamp_key)
        result_dict[task]["timestamps"] = ts_l
        
    return result_dict

def _timestamp_key(ts):
    '''
    This is used to sort lists of timestamps, such as those produced by _extract_task_timestamp for each task. When used as the key
    for sorting, it ensures that a timestamp like "11.344 sec" appears after a timestamp like "2.550 sec", and not vice-versa as it would
    be if the timestamps were sorted lexicographically as strings.
    
    :param ts: A timestamp produced by Conway logs, such as "2.550 sec"
    :type ts: str
    :returns: a float obtained by parsing the `ts` parameter. For example, if `ts` is "2.550 sec", then this function will return the number
        2.550
    :rtype: float
    '''
    REGEX = r"(\d+.\d+) sec"
    m = _re.match(REGEX, ts)
    return float(m[1])
    

In [32]:
ts_dict = _extract_task_timestamps(data_l)
ts_dict

{'Not using an event loop': {'timestamps': ['0.612 sec',
   '1.787 sec',
   '11.266 sec'],
  'ancestors': []},
 'Task-38': {'timestamps': ['2.623 sec', '3.641 sec', '4.186 sec'],
  'ancestors': ['Task-36']},
 'Task-36': {'timestamps': ['2.266 sec',
   '2.273 sec',
   '2.278 sec',
   '2.284 sec',
   '2.288 sec',
   '5.052 sec'],
  'ancestors': []},
 'Task-42': {'timestamps': ['2.752 sec', '4.078 sec', '4.530 sec'],
  'ancestors': ['Task-36']},
 'Task-39': {'timestamps': ['2.772 sec', '4.303 sec', '4.806 sec'],
  'ancestors': ['Task-36']},
 'Task-40': {'timestamps': ['2.797 sec', '3.752 sec', '4.242 sec'],
  'ancestors': ['Task-36']},
 'Task-41': {'timestamps': ['2.822 sec', '4.543 sec', '5.029 sec'],
  'ancestors': ['Task-36']},
 'Task-49': {'timestamps': ['5.080 sec',
   '5.101 sec',
   '5.120 sec',
   '5.135 sec',
   '5.145 sec',
   '5.159 sec'],
  'ancestors': []},
 'Task-52': {'timestamps': ['6.042 sec',
   '9.493 sec',
   '9.746 sec',
   '9.758 sec'],
  'ancestors': ['Task-49']},
 

In [47]:
def _log_line_key(ts_dict, log_line):
    '''
    Returns a sorting key to use for sorting lines created by a Conway Logger, so that the lines are re-sorted in terms of the order how
    the code that triggers them is written, as opposed to the order in which that code is executed. The two may differ because under asyncio,
    code may be written in the order in which tasks are submitted to the event queue, but then executed in a different, non-determinitic order.
    
    :param ts_dict: dictionary whose keys are asyncio task identifiers, and whose values are lists of timestamps at which that task appears,
        sorted from earlier to latest timestamp.
        
        Example: 

        .. code::
            
            {'Not using an event loop': ['0.612 sec', '1.787 sec', '11.266 sec'],
             'Task-38': ['2.623 sec', '3.641 sec', '4.186 sec'],
             'Task-36': ['2.266 sec', '2.278 sec', '5.052 sec'],

    :type ts_dict: dict

    :param log_line: a line of log output from the Conway Logger, parsed as a JSON string that is represented as a dictionary.
        Example:

        .. code::
        
            {'message': "Created 'integration' branch in 'scenario_8002.scenarios' with URL https://api.github.com/repos/testrobot-ccl/scenario_8002.scenarios/git/refs/heads/integration",
              'labels': {'scheduling_context': {'timestamp': '2.278 sec',
                'thread': 'MainThread',
                'task': 'Task-36',
                'source': 'repo_manipulation_test_case:100'},
               'timestamp': '4.186 sec',
               'thread': 'MainThread',
               'task': 'Task-38',
               'source': 'repo_manipulation_test_case:195'}}
               
    :type log_line: dict
        
    '''
    # We will be "padding" the keys so that they are dimensionally equal. For example, if we have a key like (12, 40) and another one
    # like ((3, 5), (5,7)), then we will pad the first key to be ((12, 40), (0,0)) so that both keys are dimensionally equal.
    # To do this, we need to compute just how much padding must be done per key. This depends on just how many ancestors line has 
    # relative to how many ancestors others lines have, i.e., the gap between a line's number of ancestors and the maximum number
    # of ancestors across all logs.
    # Hence we compute:
    #
    max_nb_ancestors = max([len(ts_dict[task]["ancestors"]) for task in ts_dict.keys()])

    
    # Labels may exist hierarchically, in the sense that a labels dict may have a "parent labels dict" in the form of the scheduling context.
    #
    # So the sorting policy is:
    #   1. For a line with multiple labels dicts, sort lexicographically starting with the labels dict highest in the hierarchy
    #   2. A labels dict A precedes a labels dict B if the first timestamp for A's task precedes the first timestamp for B's task.
    #      Here the meaning of "first timestamp" for a task is as determined by the `ts_dict` parameter, i.e., the "first timestamp" for
    #      a task may be different (and earlier) than the timestamp for that task in the log line we are creating a key for.
    #
    labels = log_line['labels']

    def _labels_key(labels):
        task = labels['task']
        timestamp = labels['timestamp']
        if task != "Not using an event loop":
            min_timestamp = ts_dict[task]["timestamps"][0]
        else:
            min_timestamp = timestamp

        return (_timestamp_key(min_timestamp), _timestamp_key(timestamp))

    def _hierarchical_key(labels):
        if not SCHEDULING_CONTEXT in labels.keys():
            return _labels_key(labels)
        else:
            parent_labels = labels[SCHEDULING_CONTEXT]
            parent_key = _hierarchical_key(parent_labels)
            child_key = _labels_key(labels)
            return (parent_key, child_key)

    padding_needed = max_nb_ancestors - len(ts_dict[labels['task']]['ancestors'])

    unpadded_key = _hierarchical_key(labels)
    key = unpadded_key
    for idx in range(padding_needed):
        key = (key, (0,0))

    #return _labels_key(labels)
    return key

In [50]:
for idx in range(10):
    print(_log_line_key(ts_dict, data_l[idx]))

((0.612, 0.612), (0, 0))
((1.787, 1.787), (0, 0))
((2.266, 2.278), (2.623, 2.623))
((2.266, 2.284), (2.752, 2.752))
((2.266, 2.273), (2.772, 2.772))
((2.266, 2.288), (2.797, 2.797))
((2.266, 2.266), (2.822, 2.822))
((2.266, 2.278), (2.623, 3.641))
((2.266, 2.288), (2.797, 3.752))
((2.266, 2.284), (2.752, 4.078))


In [23]:
sorted([((12, 40), (0,0)), ((3, 50), (0,0)), ((3, 5), (0,0)), ((12, 40), (2, 1))])

[((3, 5), (0, 0)), ((3, 50), (0, 0)), ((12, 40), (0, 0)), ((12, 40), (2, 1))]

In [51]:
sorted_data_l = sorted(data_l, key= lambda line: _log_line_key(ts_dict, line))
len(sorted_data_l)

40

In [68]:
x = []
foo = "[2.266 sec Task-36]"
x.append(foo)
foo in x

True

In [70]:
def _format_labels(labels):
    task = labels['task']
    ts = labels['timestamp']
    msg = datum['message']

    padding_needed = len(ts_dict[task]['ancestors'])
    padding = "\t"*padding_needed

    prefix = f"{padding}[{ts} {task}]"
    return prefix

def _format_ancestors(labels):
    if not SCHEDULING_CONTEXT in labels.keys():
        return []
    else:
        parent_labels = labels[SCHEDULING_CONTEXT]
        parent_formatted_ancestors = _format_ancestors(parent_labels)
        formatted_ancestors = parent_formatted_ancestors.copy()
        formatted_ancestors.extend([_format_labels(parent_labels)])
        return formatted_ancestors

already_seen = []
for datum in sorted_data_l:
    
    labels = datum['labels']

    for txt in _format_ancestors(labels):
        if not txt in already_seen:
            print(txt)
            already_seen.append(txt)

    msg = datum['message']
    prefix = _format_labels(labels)
    print(f"{prefix}\t{msg}")

[0.612 sec Not using an event loop]	--------- Starting Test Scenario 8001 [round=0] ---------
[1.787 sec Not using an event loop]	--------- Starting Test Scenario 8002 [round=0] ---------
[2.266 sec Task-36]
	[2.822 sec Task-41]	Removed pre-existing repo 'scenario_8002.svc' so we can re-create it - response was null
	[4.543 sec Task-41]	Created repo 'scenario_8002.svc' with URL https://github.com/testrobot-ccl/scenario_8002.svc
	[5.029 sec Task-41]	Created 'integration' branch in 'scenario_8002.svc' with URL https://api.github.com/repos/testrobot-ccl/scenario_8002.svc/git/refs/heads/integration
[2.273 sec Task-36]
	[2.772 sec Task-39]	Removed pre-existing repo 'scenario_8002.test' so we can re-create it - response was null
	[4.303 sec Task-39]	Created repo 'scenario_8002.test' with URL https://github.com/testrobot-ccl/scenario_8002.test
	[4.806 sec Task-39]	Created 'integration' branch in 'scenario_8002.test' with URL https://api.github.com/repos/testrobot-ccl/scenario_8002.test/git/re