# Verify data between new node and old streams

To do verification:
- run the nsp_all_verify_graph.yaml for 1-2min
- make sure the parameters are same across old pipeline and new one
- stop graph without running the saveRDB derivative
- run this notebook in rt env


In [2]:
import json
import numpy as np
from redis import  Redis
from brand.redis import xread_sync


Matplotlib created a temporary cache directory at /tmp/matplotlib-34c9_q5s because the default path (/home/pdeevi/.config/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [10]:

redis_host = '192.168.30.6'
redis_port = 27263
redis_socket = 10

r = Redis(redis_host, redis_port, retry_on_timeout=True)
r.ping()

supergraph_entry = r.xrevrange(b'supergraph_stream', '+', '-', 1)[0]

In [141]:

print(r.xlen("binned_spikes_old"),r.xlen("binned_spikes_all"))
print(r.xlen("reref_neural_old"),r.xlen("reref_neural_all"))
print(r.xlen("thresh_cross_old"),r.xlen("thresh_cross_all"))


33167 33033
331680 330337
331676 330337


In [181]:
count = 3000
rn_all = r.xread({"reref_neural_all": b'0-0'}, count=count)[0][1]
rn_old = r.xread({"reref_neural_old": b'0-0'}, count=count)[0][1]
tx_all = r.xread({"thresh_cross_all": b'0-0'}, count=count)[0][1]
tx_old = r.xread({"thresh_cross_old": b'0-0'}, count=count)[0][1]
sb_all = r.xread({"sbp_all": b'0-0'}, count=count)[0][1]
sb_old = r.xread({"sbp_old": b'0-0'}, count=count)[0][1]
bs_all = r.xread({"binned_spikes_all": b'0-0'}, count=count)[0][1]
bs_old = r.xread({"binned_spikes_old": b'0-0'}, count=count)[0][1]

In [182]:
nsp_idx_all = np.array([json.loads(tx_all_i[1][b'sync'].decode())['nsp_idx_1'] for tx_all_i in tx_all])
nsp_idx_old = np.array([json.loads(tx_old_i[1][b'sync'].decode())['nsp_idx_1'] for tx_old_i in tx_old])
print(f"Expected num of matches {count - (nsp_idx_all[0] - nsp_idx_old[0])}")

Expected num of matches 1661


In [183]:
nsp_idx_all = np.array([json.loads(tx_all_i[1][b'sync'].decode())['nsp_idx_1'] for tx_all_i in sb_all])
nsp_idx_old = np.array([json.loads(tx_old_i[1][b'sync'].decode())['nsp_idx_1'] for tx_old_i in sb_old])
print(f"Expected num of matches {count - (nsp_idx_all[0] - nsp_idx_old[0])}")

Expected num of matches 1661


In [185]:

matches = []
for i,rn_all_i in enumerate(rn_all):
    for j,rn_old_i in enumerate(rn_old):
        if rn_all_i[1][b'samples'] == rn_old_i[1][b'samples']:
            matches.append((j,i))
            
len(matches)

1657

In [173]:
matches = []
for i,tx_all_i in enumerate(tx_all):
    for j,tx_old_i in enumerate(tx_old):
        if tx_all_i[1][b'crossings'] == tx_old_i[1][b'crossings']:
            matches.append((j,i))

len(matches)

1661

In [186]:
# spike band power old is computer with an IIR to match with nsp_implementation
# old implementation uses mean(axis=1) and new one uses sum(axis=1)/buffer_size
matches = []
for i,sb_all_i in enumerate(sb_all):
   for j,sb_old_i in enumerate(sb_old):
       if np.all(np.isclose(np.frombuffer(sb_all_i[1][b'samples'], np.float32),
                            np.frombuffer(sb_old_i[1][b'samples'], np.float32))):
            matches.append((j,i))
            
len(matches)

1635

In [None]:
# binning might not match as the alignment be off by a few bins
matches = []
for i,bs_all_i in enumerate(bs_all):
   for j,bs_old_i in enumerate(bs_old):
       if np.all(np.isclose(np.frombuffer(bs_all_i[1][b'samples'], np.float32),
                            np.frombuffer(bs_old_i[1][b'samples'], np.float32))):
            matches.append((j,i))
            
len(matches)

In [194]:
# verify threshold crossing and spike band power
# Prints number of matching packets
cnt=0
offset = nsp_idx_all[0] - nsp_idx_old[0]
for i in range(count-offset):
    idx,tx_old_data =tx_old[offset+i]
    idx,tx_all_data = tx_all[i]
    idx,sb_old_data =sb_old[offset+i]
    idx,sb_all_data = sb_all[i]

    p_old=np.frombuffer(sb_old_data[b'samples'],dtype=np.float32)
    p_all=np.frombuffer(sb_all_data[b'samples'],dtype=np.float32)
    c_old=np.frombuffer(tx_old_data[b'crossings'],dtype=np.int16)
    c_all=np.frombuffer(tx_all_data[b'crossings'],dtype=np.int16)

    if   np.all(np.isclose(p_old,p_all)) and np.all(np.isclose(c_old,c_all)):cnt+=1
if cnt:
    print(f"Matched/Expected = {cnt}/{count-offset}")

Matched/Expected = 1635/1661


In [196]:
# computes binned thresholds using old an new thresholds and compares them 
buffer_num=0
cross_bin_buffer_old= np.zeros((256,count-offset),np.int16)
cross_bin_buffer_all = np.zeros((256,count-offset),np.int16)

for i in range(count-offset):
    cross_bin_buffer_old[:, i] = np.frombuffer(tx_old[offset+i][1][b'crossings'],dtype=np.int16)
    cross_bin_buffer_all[:, i] = np.frombuffer(tx_all[i][1][b'crossings'],dtype=np.int16)


print(f"Does binning from old and new threshold streams match? \n{np.all(cross_bin_buffer_old.sum(axis=1) == cross_bin_buffer_all.sum(axis=1))}")


Does binning from old and new threshold streams match? 
True


# Run node and get timing

In [50]:
from nsp_all import NSP_all
import matplotlib.pyplot as plt
import numpy as np

In [3]:
import sys
sys.argv = ['tx', '-n', 'nsp_alll', '-i', '192.168.30.6', '-p', '27263']

params = {
    "input_stream": "nsp_neural",
    "output_reref_stream": False,
    "reref_maxlen": 2000,
    "coefs_stream_name": "rereference_parameters",
    "reref_stream_idx": 0,
    "thresh_cross_stream_idx": 1,
    "band_power_stream_idx": 2,
    "binned_spikes_stream_idx": 3,
    "output_streams": ["reref_neural_a", "thresh_cross_a", "sbp_a", "binned_spikes_a"],
    "sync_key": ["tracking_id", "sync", "sync", "sync"],
    "ts_key": ["BRANDS_time", "ts", "ts", "ts"],
    "use_tracking_id": True,
    "td_type": "uint64",
    "sync_dict_key": "nsp_idx_1",
    "filt_stream": None,
    "adaptive_rms_stream": None,
    "pack_per_call": 1,
    "thresh_mult": -4.5,
    "thresh_calc_len": 2000,
    "butter_lowercut": 250,
    "butter_uppercut": None,
    "butter_order": 4,
    "enable_CAR": False,
    "output_filtered": False,
    "acausal_filter_lag": 120,
    "acausal_filter": "IIR",
    "ch_mask_stream": "z_mask_stream",
    "bandpower_logscale": False,
    "chan_per_stream": 256,  # Preserved as string since it's a reference
    "samp_per_stream": 30,
    "chan_total": 256,  # Preserved as string since it's a reference
    "start_channel": 0,
    "samp_freq": 30000,
    "output_dtype": "float32",
    "total_channels": "*total_features",  # Preserved as string since it's a reference
    "bin_size": 10,
    "bin_enable": True
}

node = NSP_all(params)

[nsp_alll] INFO: Coefficients entry found in Redis stream: rereference_parameters
[nsp_alll] INFO: Unshuffling matrix loaded from stream.
[nsp_alll] INFO: Loading 4 order, 250 hz highpass acausal IIR-IIR filter
[nsp_alll] INFO: Loaded thresholds from the thresholds stream


[nsp_alll] Redis connection established on host: 192.168.30.6, port: 27263


In [None]:
node.run()

In [5]:
node.profiler.print_stats()


Timing Statistics (in milliseconds):
--------------------------------------------------------------------------------
Operation                            Mean        Min        Max      Count
--------------------------------------------------------------------------------
INIT                                8.757      8.757      8.757          1
Redis read                          0.475      0.075      1.747      67557
Re-referencing                      0.101      0.099      0.237      67557
Filtering                           0.262      0.253      0.724      67553
Threshold crossing                  0.021      0.019      0.278      67553
Spike band power                    0.004      0.004      0.050      67553
Total Exec Time                     0.445      0.429      0.997      67553
Redis write                         0.132      0.113      0.683      67553
Total Time                          1.000      0.582      2.279      67553
Binning                             0.002      0.0

# Rough

In [None]:
input_streams= ["thresh_cross_old", "sbp_old"]
stream_dict = {name.encode(): '$' for name in input_streams}
streams = xread_sync(r,stream_dict,block=0,
                        sync_field=b'timestamps',
                        sync_dtype='uint64',
                        count=10)
in_field={b"thresh_cross_old":b"crossings", b"sbp_old":b"samples"}
in_dtype={b"thresh_cross_old":np.dtype("int16"), b"sbp_old":np.dtype("float32")}
window =np.zeros((256,10),dtype=np.float32)


sync_entries = []
sync_entries_stream_dump={}
for stream in streams:
    stream_name, stream_entries = stream
    field = in_field[stream_name]
    in_type = in_dtype[stream_name]
    sync_entries_stream = []
    synx=[]
    for i, (entry_id, entry_dict) in enumerate(stream_entries):
        # load the input
        window[:, i] = np.frombuffer(entry_dict[field],
                                            dtype=in_type).astype(
                                                'float32')
        # log sync for this entry
        sync_entries_stream.append(
            json.loads(entry_dict[b'sync'].decode()))
        synx.append(json.loads(entry_dict[b'sync'].decode()))
    sync_entries_stream_dump[stream_name.decode()]=synx
    sync_entries.append(sync_entries_stream)
    # update the xread ID
    stream_dict[stream_name] = entry_id

# create sync dict from sync entries from input streams
sync_dict = {}
for stream in sync_entries:
    sync_entry_dict = stream[0]  # first entry from each stream
    for key in sync_entry_dict:
        sync_dict[key] = sync_entry_dict[key]
sync_dict_json = json.dumps(sync_dict)

In [46]:
# verify threshold crossing and binned threshould_crossing

cnt=0
for i in range(1000):
    idx,tx4_data =tx4[i]
    idx,txa_data = txa[i]
    idx,sb4_data =sb4[i]
    idx,sba_data = sba[i]
    idx,bs4_data =bs4[i]
    idx,bsa_data = bsa[i]
    p4=np.frombuffer(sb4_data[b'samples'],dtype=np.float32)
    pa=np.frombuffer(sba_data[b'samples'],dtype=np.float32)
    c4=np.frombuffer(tx4_data[b'crossings'],dtype=np.int16)
    ca=np.frombuffer(txa_data[b'crossings'],dtype=np.int16)
    b4=np.frombuffer(bs4_data[b'samples'],dtype=np.float32)[:256]
    ba=np.frombuffer(bsa_data[b'samples'],dtype=np.float32)[:256]
    if np.all(np.isclose(b4,ba)) and not np.all(np.isclose(p4,pa)) and np.all(np.isclose(c4,ca)):cnt+=1
if cnt:
    print(cnt)

995


In [59]:
tx_old_data

{b'sync': b'{"nsp_idx_1": 1000}',
 b'timestamps': b'\xe8\x03\x00\x00\x00\x00\x00\x00',
 b'crossings': b'\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00