In [68]:
"""
Computing RT scheduling statistics using Saleae Logic Analyzer

Channels:
[0]  cockpit usb in
[1]  cockpit usb out
[2]  cockpit uwb rx
[3]  cockpit uwb tx
[4]  drivetrain uwb rx
[5]  drivetrain uwb tx
[6]  drivetrain motor out
[7]  drivetrain brake
[8]  drivetrain blink L
[9]  drivetrain blink R
[10] steering uwb rx
[11] steering uwb tx
[12] steering servo out
[13] steering force in
[14] steering blink L
[15] steering blink R
"""
import numpy as np

with open("digital.csv", "r") as f:
    dat = f.readlines()[1:]
    
dat = [x.strip().split(",") for x in dat]
dat = [(float(x[0]), [int(a) for a in x[1:]]) for x in dat]

In [72]:
def edge_detect(arr):
    arr1 = []
    last = arr[0][1]
    for a, b in arr[1:]:
        x1 = [e-last[i] for i, e in enumerate(b)]
        arr1 += [(a, x1)]
        last = b
        
    return arr1

def stats(arr):
    p0 = round(np.percentile(arr, 0)*1000, 3)
    p1 = round(np.percentile(arr, 25)*1000, 3)
    p2 = round(np.percentile(arr, 50)*1000, 3)
    p3 = round(np.percentile(arr, 75)*1000, 3)
    p4 = round(np.percentile(arr, 100)*1000, 3)
    
    return f"{p0} {p1} {p2} {p3} {p4}"

def check_timesync(dat_edges, n1, n2):
    w = None
    arr = []

    for ts, v in dat_edges:
        v1, v2 = v[n1], v[n2]

        if (v1 != 0 or v2 != 0):
            if w is None:
                w = ts
            else:
                arr.append(ts - w)
                w = None

    return arr

def find_paths(dat_edges, ns):
    n0 = ns[0]
    ns = ns[1:]
    
    i = 0
    arr = []
    
    for i, tt in enumerate(dat_edges):
        start, v = tt
        end = None
        if v[n0] == 1: 
            for n in ns:
                while dat_edges[i][1][n] != 1:
                    i += 1
                    if i >= len(dat_edges): break
                if i >= len(dat_edges): break
                    
            if i >= len(dat_edges): break
            end = dat_edges[i][0]
            arr.append(end-start)
        
    return arr

In [73]:
dat_edges = edge_detect(dat)

In [74]:
# blinker sync
stats(check_timesync(dat_edges, 8, 14) +
      check_timesync(dat_edges, 9, 15))

'0.008 0.319 0.469 0.659 1.487'

In [75]:
# cockpit -> drivetrain motor
stats(find_paths(dat_edges, [0, 3, 4, 6]))

'2.234 3.675 8.338 8.399 13.584'

In [76]:
# cockpit -> steering servo
stats(find_paths(dat_edges, [0, 3, 10, 12]))

'1.183 11.422 11.5 11.576 12.884'

In [77]:
# steering force -> cockpit
stats(find_paths(dat_edges, [13, 11, 2, 1]))

'7.054 8.372 8.472 8.551 18.833'