In [None]:
import holoviews as hv
import pandas as pd
import numpy as np

from holoviews.streams import Pipe, Buffer
from scipy.ndimage.filters import uniform_filter1d
from tqdm.notebook import trange
from collections import OrderedDict  

import chipwhisperer.analyzer as cwa
import chipwhisperer as cw
from chipwhisperer.common.traces import Trace
import edge_counter
import chipfail_lib
import serial

import random

# init plotting        
hv.extension('bokeh')

def random_color():
    return "#"+''.join([random.choice('0123456789ABCDEF') for j in range(6)])


nb_win_size = 200
nb_threshold = 0.015
nb_edge_type = "falling_edge"
nb_pretrigger_ctr = 150
nb_edge_num = 1

mb2_proj = cw.open_project("projects/working_mb2")
mb1_proj = cw.open_project("projects/invalid_mb1")

ec_offline = edge_counter.edge_count(nb_win_size, nb_threshold, nb_edge_num, nb_edge_type, nb_pretrigger_ctr)

# Isolate Triggers

In [None]:
mb2_traces = {}
mb1_traces = {}
test = {}
j = 0
tmp = {}
all_triggers_falling_mb2 = []
all_triggers_falling_mb1 = []
all_triggers_rising_mb2 = []
all_triggers_rising_mb1 = []
for i in range(0, 500):

    ec_offline.edge_type = "falling_edge"
    _, triggers_falling_mb2 = ec_offline.run(mb2_proj.waves[i], "sum")
    _, triggers_falling_mb1 = ec_offline.run(mb1_proj.waves[i], "sum")
    ec_offline.edge_type = "rising_edge"
    _, triggers_rising_mb2 = ec_offline.run(mb2_proj.waves[i], "sum")
    _, triggers_rising_mb1 = ec_offline.run(mb1_proj.waves[i], "sum")
    
    all_triggers_falling_mb2.extend([trigger for trigger in triggers_falling_mb2 if trigger > 9000 and trigger < 12000])
    all_triggers_falling_mb1.extend([trigger for trigger in triggers_falling_mb1 if trigger > 9000 and trigger < 12000])
    
    all_triggers_rising_mb2.extend([trigger for trigger in triggers_rising_mb2 if trigger > 9000 and trigger < 12000])
    all_triggers_rising_mb1.extend([trigger for trigger in triggers_rising_mb1 if trigger > 9000 and trigger < 12000])

    
# all_triggers = zip(all_triggers_rising_mb2, all_triggers_rising_mb1)

# Show traces

In [None]:
mb2_traces = {}
mb1_traces = {}
test = {}
j = 0
tmp = {}
all_triggers = []
for i in range(300, 305):
    rand_col = random_color()
    
    mb2_traces[i] = hv.Curve(mb2_proj.waves[i]).opts(tools=["hover"], width=800, height=600, title="orig", axiswise=False, color=rand_col)
    mb1_traces[i] = hv.Curve(mb1_proj.waves[i]).opts(tools=["hover"], width=800, height=600, title="orig", axiswise=False, color=rand_col)
    
    avg, triggers_falling = ec_offline.run(mb2_proj.waves[i], "sum")
    
    trigger_lines = {i: hv.VLine(trigger).opts(line_width=1, color=rand_col) for i, trigger in enumerate(triggers)}
    tmp[i] = (mb2_traces[i]*hv.NdOverlay(trigger_lines).opts(legend_limit=1)).opts(legend_limit=1)

# mb2_overlay = hv.NdOverlay(mb2_traces).opts(legend_limit=1)
mb1_overlay = hv.NdOverlay(mb1_traces).opts(legend_limit=1)
# line_overlay = hv.NdOverlay(trigger_lines).opts(legend_limit=1)
mb1_overlay
# tmp_overlay = hv.NdOverlay(tmp).opts(legend_limit=1)
# tmp_overlay


In [None]:
def calc_stats(all_triggers):
    distances = []
    x1s = []
    x2s = []
    for x1, x2 in all_triggers:
        x1s.append(x1)
        x2s.append(x2)
        distances.append(x2-x1)

    print(f"avg dist: {np.average(distances)}")
    print(f"median dist: {np.median(distances)}")
    print(f"median x1: {np.median(x1)}")
    print(f"median x2: {np.median(x2)}")
    
print("\nfalling_mb2 --> rising_mb1")
calc_stats(zip(all_triggers_falling_mb2, all_triggers_rising_mb1))
print("\nfalling_mb2 --> falling_mb1")
calc_stats(zip(all_triggers_falling_mb2, all_triggers_falling_mb1))
print("\nrising_mb1 --> rising_mb2")
calc_stats(zip(all_triggers_rising_mb1, all_triggers_rising_mb2))
print("\nfalling_mb2 --> rising_mb2")
calc_stats(zip(all_triggers_falling_mb2, all_triggers_rising_mb2))

In [None]:
all_triggers_bak = list(all_triggers)

In [None]:
print(all_triggers)