In [1]:
from collections import namedtuple

import numpy as np
import pandas as pd

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots


In [2]:
COLORS = px.colors.qualitative.Alphabet
PERIOD = 360

In [3]:
class Task:
    Parameters = namedtuple('Parameters', 'O C D T A')

    def __init__(self, parameters) -> None:
        self.parameters = parameters

    def __str__(self) -> str:
        return str(self.parameters)


In [4]:
def read_tasks(filename):
    tasks = []

    with open(filename) as fp:
        n = fp.readline()
        n = int(n)

        for i in range(n):
            line = fp.readline()
            line = line.strip()
            params = [int(v) for v in line.split()]

            task = Task( Task.Parameters(*params) )
            tasks.append(task)

    return tasks

In [5]:
def read_traces(filename):
    traces = []

    with open(filename) as fp:
        traces = fp.readline()
        traces = traces.split()

    traces = np.array(traces, dtype=int)
    return traces

In [6]:
def compute_H(tasks):
    periods = [task.parameters.T for task in tasks]
    H = (periods and np.lcm.reduce(periods)) or 0
    return H

In [7]:
def get_ticks(task, H, for_deadlines=False, n_repeats=1):
    ticks = []
    offset = task.parameters.O
    upper = H
    if for_deadlines:
        offset += task.parameters.D
        upper += task.parameters.D

    ticks = np.array( range(offset, (upper * n_repeats), 
                            task.parameters.T ) )
    return ticks

In [8]:
def plot_events(i, ticks, theta_per_tick, r=60, symbol=None, size=20):
    r = [r] * ticks.size
    theta = ticks * theta_per_tick
    scatter = go.Scatterpolar(r=r, theta=theta, mode='markers', showlegend=False,
                            marker=dict(color=COLORS[i], symbol=symbol, size=20) )

    return scatter

def plot_task_events(i, task, H, for_deadlines=False):
    theta_per_tick = PERIOD / H
    symbol = 'star-diamond'
    r = 60
    
    if for_deadlines:
        symbol = 'circle'
        r = 50

    ticks = get_ticks(task, H, for_deadlines=for_deadlines)
    scatter = plot_events(i, ticks, theta_per_tick, r, symbol)
    return scatter

In [9]:
def plot_trace(_id, traces, H):
    traces = np.clip(traces, a_min=-1, a_max=100)
    trace = np.where(traces[:H] == _id) [0]

    theta_per_tick = PERIOD / H
    r = [50] * trace.size
    theta = ((trace + 0.5) % H) * theta_per_tick

    bar = go.Barpolar(r=r, theta=theta, width=theta_per_tick, name=f'Task {_id}',
                    marker=dict(color=COLORS[_id-1], line_width=0.2, line_color='white'), 
                    opacity=0.8)

    return bar

In [10]:
tasks = read_tasks('data/example1.txt')
traces = read_traces('data/trace1.txt')

In [11]:
H = compute_H(tasks)
n = len(tasks)
ticks = np.arange(H)

In [12]:
fig_polar = go.Figure()

for i, task in enumerate(tasks):
    scatter_releases = plot_task_events(i, task, H)
    scatter_deadlines = plot_task_events(i, task, H, for_deadlines=True)

    bar = plot_trace((i + 1), traces, H)

    fig_polar.add_trace(scatter_releases)
    fig_polar.add_trace(scatter_deadlines)
    fig_polar.add_trace(bar)

# Add cost trace
bar = plot_trace(-1, traces, H)
bar.showlegend = False
fig_polar.add_trace(bar)

fig_polar.update_layout(template=None, autosize=False, width=800, height=800,
                        polar=dict(angularaxis=dict(ticks='outside',
                                            # type='category', period=H,
                                            tickmode='array', tickvals=ticks * (360 / H), ticktext=ticks),
                                    radialaxis = dict(showticklabels=False, ticks='') ) )

fig_polar.show()

In [13]:
df_traces = pd.DataFrame(traces, columns=['id'])

df_traces['dt'] = df_traces['id'].ne( df_traces['id'].shift() ).cumsum()
df_traces = df_traces.groupby('dt', as_index=False).aggregate({'id': 'first', 'dt': 'count'})

df_traces['end'] = df_traces['dt'].cumsum()
df_traces['start'] = df_traces['end'] - df_traces['dt']

df_traces['task'] = df_traces['id'].abs()
df_traces = df_traces[df_traces['id'] != 0]
df_traces.loc[ df_traces['id'] < 0, 'id'] = 0
df_traces[['id']] = df_traces[['id']].astype(str)

In [51]:
fig_gantt = px.timeline(df_traces, x_start='start', x_end='end', y='task', 
                        color='id', color_discrete_sequence=COLORS)
fig_gantt.update_traces(marker=dict(line_color='black', line_width=1))
    
y_offset = -0.25
def plot_events_gantt(i, ticks, for_deadlines=False):
    y = n - (i + 1) + y_offset
    for t in ticks:
        if for_deadlines:
            fig_gantt.add_shape(type='circle', x0=t, y0=(y - 0.05), x1=(t + 0.25), y1=(y + 0.05), 
                                xref='x', yref='y', 
                                line_color=COLORS[i], fillcolor=COLORS[i])
        else:
            fig_gantt.add_annotation(x=t, y=(y + 0.75), ax=t, ay=(y - 0.1), 
                                    axref='x', ayref='y',
                                    showarrow=True, arrowhead=2, arrowwidth=2, arrowcolor=COLORS[i])

def plot_task_events_gantt(i, task, H, for_deadlines=False, size=None):
    if size is None:
        size = H
    n_repeats = (size // H) + 1

    ticks = get_ticks(task, H, for_deadlines=for_deadlines, n_repeats=n_repeats)
    ticks = ticks[ ticks <= size]
    plot_events_gantt(i, ticks, for_deadlines)

for data in fig_gantt.data:
    data.width = 0.5
    filtered = df_traces['id'] == data.name

    if data.name == '0':
        data.showlegend = False

    data.base = df_traces[filtered]['start'].tolist()
    data.x = df_traces[filtered]['dt'].tolist()
    data.y = df_traces[filtered]['task'].tolist()

    i = int(data.name) - 1
    data.marker.color = COLORS[i]

    if i == -1:
        continue

    # Add datum lines
    fig_gantt.add_hline(y=(i + y_offset) )
    
    # Add events
    plot_task_events_gantt(i, tasks[i], H, size=traces.size)
    plot_task_events_gantt(i, tasks[i], H, for_deadlines=True, size=traces.size)

fig_gantt.update_xaxes(type='linear')
fig_gantt.update_yaxes(type='category', categoryorder='category descending')
fig_gantt.show()