## Tests for tof_functions

In [1]:
import os
import numpy                 as np
import pandas                as pd
import hypothesis.strategies as st

from hypothesis  import given

import antea.reco.reco_functions as rf
import antea.elec.tof_functions  as tf
import antea.database.load_db as db
from antea.io.mc_io import load_mcTOFsns_response

### spe_dist function

In [3]:
tau_sipm       = (100, 15000)
time           = np.arange(10)
exp_dist, norm = tf.spe_dist(tau_sipm, time)

assert len(exp_dist) == len(time)
assert (exp_dist >= 0.).all()
assert np.sum(exp_dist) == 1

In [4]:
time = np.array([1, 1491])
alfa         = 1.0/2
beta         = 1.0/1
if np.isclose(alfa, beta, rtol=1e-2):
    spe_response = np.zeros(len(time))
t_p          = np.log(beta/alfa)/(beta-alfa)
K            = (beta)*np.exp(alfa*t_p)/(beta-alfa)
spe_response = K*(np.exp(-alfa*time)-np.exp(-beta*time))
print(np.exp(-alfa*time))
print(np.exp(-beta*time))
print(np.exp(-alfa*time)-np.exp(-beta*time))
print(K*(np.exp(-alfa*time)-np.exp(-beta*time)))
print(spe_response/np.sum(spe_response))
spe_response = spe_response/np.sum(spe_response) #Normalization
print(spe_response)

np.isclose(alfa, beta, rtol=1e-2)# or 
np.count_nonzero(np.exp(-(alfa)*time))# and ((np.exp(-(1/b)*l)).all()==0)):
#np.exp(-(alfa)*time)


[0.60653066 0.        ]
[0.36787944 0.        ]
[0.23865122 0.        ]
[0.95460487 0.        ]
[1. 0.]
[1. 0.]


1

In [5]:
def spe_dist(tau_sipm, time):
    alfa         = 1.0/tau_sipm[1]
    beta         = 1.0/tau_sipm[0]
    if np.isclose(alfa, beta, rtol=1e-3):
        return np.zeros(len(time)), 0
    t_p          = np.log(beta/alfa)/(beta-alfa)
    K            = (beta)*np.exp(alfa*t_p)/(beta-alfa)
    spe_response = K*(np.exp(-alfa*time)-np.exp(-beta*time))
    if np.sum(spe_response) == 0:
        return np.zeros(len(time)), 0
    norm_dist    = np.sum(spe_response)
    spe_response = spe_response/norm_dist #Normalization

    return spe_response, norm_dist

In [31]:
a = 1.9799999604007856
b = 2.0
l = np.array([1, 1])

exp_dist, norm = spe_dist((a, b), np.unique(l))
count_a = np.count_nonzero(np.exp(-(1/a)*time))
count_b = np.count_nonzero(np.exp(-(1/b)*time))
print(1/a, 1/b)
print(count_a, count_b)
if np.isclose(1/a, 1/b, rtol=1e-3) or (not count_a and not count_b):
    print('one')
    print(exp_dist)
    print(np.count_nonzero(exp_dist))#assert np.count_nonzero(exp_dist) == 0
    print(np.sum(exp_dist))#assert np.isclose(np.sum(exp_dist), 0)
else:
    print('two')
    print(len(exp_dist))
    assert len(exp_dist) == len(np.unique(l))
    print(exp_dist)
    assert (exp_dist >= 0.).all()
    print(np.sum(exp_dist))
    assert np.isclose(np.sum(exp_dist), 1)

1.989966479060683
270.4673654061509
[0.82643188]
0.5050505151513149 0.5
1 1
two
1
[1.]
1.0


In [5]:
#a = st.floats(min_value=1, max_value=1)
#b = st.floats(min_value=2, max_value=2)
#l = st.lists(st.floats(min_value=1, max_value=1), st.floats(min_value=1491, max_value=1491))

a = st.floats(min_value=1, max_value=100000)
b = st.floats(min_value=2, max_value=100000)
l = st.lists(st.integers(min_value=1, max_value=10000), min_size=2, max_size=1000)

@given(a, b, l)
def test_spe_dist(a, b, l):
    l = np.array(l)
    exp_dist = tf.spe_dist((a, b), np.unique(l))
    count_a = np.count_nonzero(np.exp(-(1/a)*time))
    count_b = np.count_nonzero(np.exp(-(1/b)*time))
    if np.isclose(1/a, 1/b, rtol=1e-2) or (not count_a and not count_b):
        assert np.count_nonzero(exp_dist) == 0
        assert np.isclose(np.sum(exp_dist), 0)
    else:
        assert len(exp_dist) == len(np.unique(l))
        assert (exp_dist >= 0.).all()
        assert np.isclose(np.sum(exp_dist), 1)

In [6]:
test_spe_dist()

In [7]:
np.count_nonzero(np.array([0.001, 0.2, 2, 0.0005, 0]))
np.nonzero([0.001, 0.2, 2, 0.0005, 0])

(array([0, 1, 2, 3]),)

### convolve_tof function

In [6]:
def convolve_tof(spe_response, signal):
    if not np.count_nonzero(spe_response):
        print('Distrib values are zero')
        return np.zeros(len(spe_response)+len(signal)-1 )
    conv_first = np.hstack([spe_response, np.zeros(len(signal)-1)])
    conv_res   = np.zeros(len(signal)+len(spe_response)-1)
    pe_pos     = np.argwhere(signal > 0)
    pe_recov   = signal[pe_pos]
    for i in range(len(pe_recov)): #Loop over the charges
        conv_first_ch = conv_first*pe_recov[i]
        desp          = np.roll(conv_first_ch, pe_pos[i])
        conv_res     += desp
    return conv_res

In [7]:
#t = st.tuples(st.floats(min_value=1, max_value=1000), st.floats(min_value=1, max_value=1000))
#l = st.lists(st.integers(min_value=1, max_value=10000), min_size=2, max_size=1000)
#s = st.lists(st.integers(min_value=1, max_value=10000), min_size=2, max_size=1000)

t = st.tuples(st.floats(min_value=1, max_value=1), st.floats(min_value=2, max_value=2))
l = st.lists(st.integers(min_value=1491, max_value=1491), min_size=2, max_size=2)
s = st.lists(st.integers(min_value=1, max_value=1), min_size=2, max_size=2)

@given(t, l, s)
def test_convolve_tof(t, l, s):
    spe_response = tf.spe_dist(t, np.unique(np.array(l)))
    print(spe_response)
    conv_res     = convolve_tof(spe_response, np.array(s))
    assert len(conv_res) == len(spe_response) + len(s) - 1
    if np.count_nonzero(spe_response):
        print(' nooon zeeero')
        assert np.isclose(np.sum(s), np.sum(conv_res))

In [8]:
test_convolve_tof()

(array([0.]), 0.0)
Distrib values are zero


### tdc convolution function:

In [11]:
def tdc_convolution_old(tof_response, spe_response, time_window, n_sipms, first_sipm, te_tdc):
    """
    Apply the spe_response distribution to every sipm and returns a charge matrix of time and n_sipms dimensions.
    """
    pe_table  = np.zeros((time_window, n_sipms))
    for i, wf in tof_response.iterrows():
        if wf.time_bin < time_window:
            s_id = - wf.sensor_id - first_sipm
            pe_table[wf.time_bin, s_id] = wf.charge

    conv_table = np.zeros((len(pe_table) + len(spe_response)-1, n_sipms))
    for i in range(n_sipms):
        if np.count_nonzero(pe_table[:,i]):
            conv_table[:,i] = convolve_tof(spe_response, pe_table[0:time_window,i])
    return conv_table

In [16]:
def tdc_convolution(tof_response, spe_response, time_window, n_sipms, first_sipm, te_tdc):
    """
    Apply the spe_response distribution to every sipm and returns a charge matrix of time and n_sipms dimensions.
    """
    pe_table   = np.zeros((time_window, n_sipms))
    sel_tof    = tof_response[tof_response.time_bin < time_window]
    s_ids      = - sel_tof.sensor_id.values - first_sipm
    conv_table = np.zeros((len(pe_table) + len(spe_response)-1, n_sipms))
    if sel_tof.empty:
        print('Tof dataframe is empty')
        return conv_table
    pe_table[sel_tof.time_bin.values, s_ids] = sel_tof.charge.values
    for i in range(n_sipms):
        if np.count_nonzero(pe_table[:,i]):
            conv_table[:,i] = convolve_tof(spe_response, pe_table[0:time_window,i])
    return conv_table

In [17]:
SIPM           = {'n_sipms':3500, 'first_sipm':1000, 'tau_sipm':[100,15000]}
n_sipms        = SIPM['n_sipms']
first_sipm     = SIPM['first_sipm']
tau_sipm       = SIPM['tau_sipm']
TE_range       = [0.25]
TE_TDC         = TE_range[0]
time_window    = 10000
time_bin       = 5
time           = np.arange(0, 80000, time_bin)
spe_resp, norm_dist = tf.spe_dist(tau_sipm, time)

In [20]:
PATH_IN        = '/Users/carmenromoluque/nexus_petit_analysis/PETit-ring/DAQ_antea/petit_ring_tof_all_tables.001.pet.h5'
tof_response   = load_mcTOFsns_response(PATH_IN)
tof_waveforms  = tof_response[tof_response.event_id == 20000]
tdc_conv_table = tdc_convolution(tof_waveforms, spe_resp, time_window, n_sipms, first_sipm, TE_TDC)
print(tdc_conv_table.shape)

assert tdc_conv_table.shape == (time_window + len(spe_resp)-1, n_sipms)

keys           = np.array(['event_id', 'sensor_id', 'time_bin', 'charge'])
wf_df          = pd.DataFrame({}, columns=keys)
tdc_conv_table = tdc_convolution(wf_df, spe_resp, time_window, n_sipms, first_sipm, TE_TDC)
assert np.all(tdc_conv_table==0)


   event_id  sensor_id  time_bin  charge
0     20000      -3892       574       1
1     20000      -3886       173       1
2     20000      -3886       195       1
3     20000      -3886       199       1
4     20000      -3886       222       1
(25999, 3500)
Tof dataframe is empty


### translate_charge_matrix_to_wf_df function

In [109]:
def translate_charge_matrix_to_wf_df0(event_id, conv_table, first_sipm):
    """
    Transform the charge matrix into a tof dataframe.
    """
    keys         = np.array(['event_id', 'sensor_id', 'time_bin', 'charge'])
    if np.all(conv_table==0):
        return pd.DataFrame({}, columns=keys)
    t_bin, s_id  = np.where(conv_table>0)
    s_id         = - s_id - first_sipm
    conv_tb_flat = conv_table.flatten()
    charge       = conv_tb_flat[conv_tb_flat>0]
    evt          = np.full(len(t_bin), event_id)
    a_wf         = np.array([evt, s_id, t_bin, charge])
    wf_df        = pd.DataFrame(a_wf.T, columns=keys)
    return wf_df

In [None]:
def translate_charge_matrix_to_wf_df1(event_id, conv_table, first_sipm):
    """
    Transform the charge matrix into a tof dataframe.
    """
    keys         = np.array(['event_id', 'sensor_id', 'time_bin', 'charge'])
    if not np.count_nonzero(conv_table):
        return pd.DataFrame({}, columns=keys)
    t_bin, s_id  = np.where(conv_table>0)
    s_id         = - s_id - first_sipm
    conv_tb_flat = conv_table.flatten()
    charge       = conv_tb_flat[conv_tb_flat>0]
    evt          = np.full(len(t_bin), event_id)
    a_wf         = np.array([evt, s_id, t_bin, charge])
    wf_df        = pd.DataFrame(a_wf.T, columns=keys)
    return wf_df

In [110]:
my_table = np.array([[0,0,0], [0,0,0]])
translate_charge_matrix_to_wf_df(0, my_table, 1000).empty

True

In [111]:
t_bin, s_id  = np.where(my_table>0)
print(t_bin, s_id)
translate_charge_matrix_to_wf_df(0, my_table, 1000)

[] []


Unnamed: 0,event_id,sensor_id,time_bin,charge


In [112]:
my_table2 = np.array([[0,0], [0,0,0]])
my_table[0]*my_table[1]
np.matrix([1,1,1,1]).T * np.matrix([2,3])
np.matrix([1,1,1,1])

matrix([[1, 1, 1, 1]])

In [133]:
l0 = st.lists(st.floats(min_value=0, max_value=1000), min_size=2, max_size=100)
l  = st.lists(l0, min_size=2, max_size=2)
e  = st.floats(min_value=0, max_value=1000)
f  = st.floats(min_value=0, max_value=1000)

@given(l, e, f)
def test_translate_charge_matrix_to_wf_df(l, e, f):
    matrx = np.array(np.matrix(l[0]).T * np.matrix(l[1]))
    wf_df = translate_charge_matrix_to_wf_df(e, matrx, f)
    assert len(wf_df) == np.count_nonzero(matrx)
    assert len(wf_df.keys()) == 4
    if np.count_nonzero(matrx) == 0:
        assert wf_df.empty

In [134]:
test_translate_charge_matrix_to_wf_df()