In [None]:
def parse_timestamp(splitted: list[str]) -> int:
    return int(float(splitted[3][:-1]) * 1e6)  # s -> us


def calc_switch_latencies(log_path: str) -> tuple[list[int], int]:
    """This function ignores the swapper tasks."""

    with open(log_path, 'r') as file:
        lines = file.readlines()

    switch_latencies_us: list[int] = []
    for i in range(len(lines)-1):
        splitted = lines[i].split()
        if splitted[0] != 'componen':
            continue

        if splitted[4] == 'sched:sched_switch:' and lines[i+1].split()[0] == 'componen':
            switch_latencies_us.append(parse_timestamp(
                lines[i+1].split()) - parse_timestamp(splitted))

    duration_us = parse_timestamp(
        lines[-1].split()) - parse_timestamp(lines[0].split())

    return switch_latencies_us, duration_us

In [None]:
import matplotlib.pyplot as plt


def plot_hist(latencies_us: list[int], title: str) -> None:
    plt.title(title)
    plt.hist(latencies_us, bins=100)
    plt.xlabel('Latency [us]')
    plt.ylabel('Count')
    plt.show()

In [None]:
def show_stats(log_path: str) -> None:
    switch_latencies_us, duration_us = calc_switch_latencies(log_path)

    print(f'===== Context Switch Latency =====')
    sum_latency = sum(switch_latencies_us)
    print(f'Log file : {log_path}')
    print(f'CPU usage: {sum_latency / duration_us * 100} [%]')
    print(f'Minimum  : {min(switch_latencies_us)} [us]')
    print(f'Average  : {sum_latency / len(switch_latencies_us)} [us]')
    print(f'Maximum  : {max(switch_latencies_us)} [us]')
    plot_hist(switch_latencies_us, 'Histogram of Context Switch Latency')

In [None]:
EDF_LOG_FILE = '/home/atsushi/ros2_task_set/perf_logs/20240709_edf_cpu2.txt'
CFS_LOG_FILE = '/home/atsushi/ros2_task_set/perf_logs/20240709_cfs_cpu2.txt'

In [None]:
show_stats(EDF_LOG_FILE)
show_stats(CFS_LOG_FILE)