# Journal processing demonstration

This notebook demonstrates how to load HyperQueue JSON journal export and
do a simple analysis over it.

You can export journal file to JSON via:

```bash
$ hq journal export <JOURNAL_FILE> > journal.json
```

or get JSON events from live server via:

```bash
$ hq journal replay > journal.json
```

In [None]:
import json
import dateutil.parser
from datetime import datetime
import pandas as pd
import plotly.express as px

In [None]:
# Load journal from file
with open("journal.json") as f:
    journal_events = [json.loads(s) for s in f]

# Show first 10 events
for event in journal_events[:10]:
    print(event["time"], event["event"]["type"])    

# Analysis 1: What tasks were assigned to each worker

In [None]:
# Mapping from workers to tasks: dict[int, list[str]]
worker_tasks = {}

for event in journal_events:
    ev = event["event"]        
    if ev["type"] == "worker-connected":
        # If worker is a new connected then create an entry in worker_tasks
        worker_tasks[ev["id"]] = []
    if ev["type"] == "task-started":        
        # This event is called when a task is started on a worker

        # Construct a string <job-id>@<task-id>
        job_task_id = f"{ev["job"]}@{ev["task"]}"
        
        if "worker" in ev:
            # Handle single-node tasks and put the task into record for the worker
            worker_tasks[ev["worker"]].append(job_task_id)
        else:
            # Handle multi-node tasks and put the task into record for the workers
            for w in ev["workers"]:
                worker_tasks[w].append(job_task_id)

print("Tasks assigned to workers:")
for worker_id, tasks in worker_tasks.items():
    print(f"{worker_id}: {','.join(tasks)}")

# Analsis 2: CPU utilization over time across all workers

This tracks the total number of CPUs in use at each point in time

In [None]:
def parse_time(tm):
    """
    Parse time of an event
    """
    try:
        return datetime.strptime(tm, "%Y-%m-%dT%H:%M:%S.%fZ")
    except ValueError:
        return datetime.strptime(tm, "%Y-%m-%dT%H:%M:%SZ")


In [None]:
def task_configs(submit_desc):
    """
    Extract resource configurations from a submit description
    """
    task_desc = submit_desc["task_desc"]
    result = {}  
    if "n_tasks" in task_desc:
        raise Exception("Task graphs not supported")
    for id_range in task_desc["ids"]["ranges"]:
        start = id_range["start"]
        for i in range(0, id_range["count"], id_range["step"]):
            result[start + i] = task_desc["resources"]
    return result

def get_resource_amount(resources, name, all_amount):
    """
    Get a resource amount from task configution.
    
    The parameter "all_amount" are resources on a worker to resolve policy "all"
    that takes all resources of the worker, i.e. it depends
    on the specific worker how many resources we get.
    """
    for r in resources["resources"]:        
        if r["resource"] == name:
            if r["policy"] == "All":
                return all_amount
            elif r["policy"] in ("ForceCompact", "Compact", "ForceTight", "Tight", "Scatter"):
                # Resources are represented in fixed point where 1.0 = 10_000
                return list(r["policy"].values())[0] / 10_000
    return 0

def get_worker_res_count(resources, name):
    """
    Get an amount of a resources provided by a worker
    """
    for r in resources:
        if r["name"] == name:
            kind = r["kind"]            
            if "List" in kind:
                return len(kind["List"]["values"])
            elif "Groups" in kind:
                return sum(len(g) for g in kind["Groups"]["groups"])
            elif "Range" in kind:
                return kind["Range"]["end"] - kind["Range"]["start"] + 1
            elif "Sum" in kind:
                # Resources are represented in fixed point where 1.0 = 10_000
                return kind["Sum"]["size"] / 10_000
            else:
                raise Exception("Unknown resurce kind")
    return 0

In [None]:
# Read the starting time from the first event
BASE_TIME = parse_time(journal_events[0]["time"])

# Job definitions, we need it to get resource requests of tasks
job_defs = {}  

# Worker deinitions, we need to be able to resolve allocation policy "all"
worker_resources = {}

# Amount of cpus of currently running tasks
running_tasks = {}

# Output variable. It is initialized by (0,0), i.e. at time 0, there are 0 running cpus
running_cpus = [(0, 0)]


for event in journal_events:
    ev = event["event"]       
    if ev["type"] == "job-created":
        # When a job is created, remember resource requests of tasks        
        # Note that they may be multiple submits into one job
        job_defs.setdefault(ev["job"], {})        
        job_defs[ev["job"]].update(task_configs(ev["submit_desc"]))
    elif ev["type"] == "worker-connected":
        # When a worker is connected, lets remember its resources
        worker_resources[ev["id"]] = ev["configuration"]["resources"]["resources"]        
    elif ev["type"] == "task-started":        
        # When task is started, compute allocated cpus and store them in `running_cpus`.
        time = (parse_time(event["time"]) - BASE_TIME).total_seconds()
        
        # Get task resource request
        # There may be more resource request variants, so we have to choose the resource request that
        # was actually started on the worker
        task_def = job_defs[ev["job"]][ev["task"]]
        task_resources = task_def["variants"][ev.get("variant", 0)]        
        
        if "worker" in ev:
            # Get CPUs of a where the task was started
            worker_res = worker_resources[ev["worker"]]
            all_amount = get_worker_res_count(worker_res, "cpus")

            # Get amount of resources that task asked for
            amount = get_resource_amount(task_resources, "cpus", all_amount)

            # Store how many CPUs we have asked for
            running_tasks[(ev["job"], ev["task"])] = amount

            # Remember the current used CPUs in the given time
            running_cpus.append((time, running_cpus[-1][1] + amount))
        else:
            raise Exception("This analysis support only single node tasks")
    elif ev["type"] in ("task-finished", "task-failed", "task-canceled"):
        # When task is finished/failed/canceled, we need to modify our counter
        time = (parse_time(event["time"]) - BASE_TIME).total_seconds()
        amount = running_tasks.get((ev["job"], ev["task"]), 0)
        if amount > 0:
            running_cpus.append((time, running_cpus[-1][1] - all_amount))        

In [None]:
# Let's visualize the result
df = pd.DataFrame(running_cpus, columns=["time", "cpus"])
px.line(df, x="time", y="cpus", line_shape="vh")