In [None]:
#%run '/home/gvolta/Desktop/Tutorial/STRAX/PlugIn-v2.ipynb'

### Plug-In definition

In [1]:
import strax
import pandas as pd
pd.options.display.max_colwidth = 100
from tqdm import tqdm
import numpy as np
import numba
from scipy.optimize import curve_fit
from scipy import stats
import straxen
st = straxen.contexts.strax_workshop_dali()

In [2]:
#@export
@strax.takes_config(
    strax.Option(
        'LED_window',
        default=(125, 250),
        help="Window (samples) where we expect signal in LED calibration"))


def get_amplitude(raw_records, windows):  
    on = []
    off = []
    for r in raw_records:
        amp_LED = np.max(r['data'][window[0]:window[1]])
        amp_NOISE = np.max(r['data'][2*window[0]:2*window[1]])
        on.append(amp_LED)
        off.append(amp_NOISE)
    on = np.array(on, dtype=[('amplitudeLED', '<i4')])
    off = np.array(off, dtype=[('amplitudeNOISE', '<i4')])
    return on, off

def get_area(raw_records, windows):
    left = window[0]
    end_pos = [window[1]+2*i for i in range(6)]
    n_channel_s = np.arange(0, 249, 1)
    Area = np.array(raw_records[['channel', 'area']],dtype=[('channel','int16'),('area','float32')])

    for n_channel in n_channel_s:
        wf_tmp = raw_records[raw_records['channel'] == n_channel]
        area = 0
        for right in end_pos:
            area += wf_tmp['data'][:,left:right].sum(axis=1)

        mask = np.where(Area['channel'] == n_channel)[0]
        Area['area'][mask] = area.astype(np.float)/6.

    return Area

    
class LEDCalibration(strax.Plugin):
    '''
    Let's start with something really easy.
    LEDCalibration gives -> look dtype
    '''   
    __version__ = '0.0.4'

    depends_on = ('raw_records',)
    data_kind = 'led_cal_0'
    compressor = 'zstd'
    parallel = 'process'
    rechunk_on_save = False
    dtype = [('area', np.int32, 'Area'),
             ('amplitudeLED', np.int32, 'Amplitude in LED window'),
             ('amplitudeNOISE', np.int32, 'Amplitude in off LED window'),
             ('channel', np.int16, 'Channel'),
             ('time', np.int64, 'Start time of the interval (ns since unix epoch)'),
             ('dt', np.int16, 'Time resolution in ns'),
             ('length', np.int32, 'Length of the interval in samples')
            ]

    def compute(self, raw_records):
        # Remove records from funny channels (if present)
        r = raw_records[raw_records['channel'] < 248] # hardcoded for now
        temp = np.zeros(len(r), dtype=self.dtype)
        
        temp['channel'] = r['channel']
        temp['time'] = r['time']
        temp['dt'] = r['dt']
        temp['length'] = r['length']
        
        #info, window = get_LED_window(r)
        #print(window)
        on, off = get_amplitude(r, self.config['LED_window'])
        temp['amplitudeLED'] = on
        temp['amplitudeNOISE'] = off
        area = get_area(r, self.config['LED_window'])
        print(area['channel'] == r['channel'])
        temp['area'] = area['area']
        
        return temp

In [3]:
st.register(LEDCalibration)
st.data_info('led_calibration')

Unnamed: 0,Field name,Data type,Comment
0,area,int32,Area
1,amplitudeLED,int32,Amplitude in LED window
2,amplitudeNOISE,int32,Amplitude in off LED window
3,channel,int16,Channel
4,time,int64,Start time of the interval (ns since unix epoch)
5,dt,int16,Time resolution in ns
6,length,int32,Length of the interval in samples


### Normal code

In [4]:
runs = st.select_runs(run_mode='LED*')

Checking data availability: 100%|██████████| 5/5 [00:11<00:00,  2.24s/it]


In [5]:
run_id = '180219_1011'
df = st.get_array(run_id, 'raw_records', seconds_range=(0,20))

In [6]:
def gaus(x, a, mu, sig, const):
    return a*np.exp(-0.5*(x-mu)**2/sig**2)+const

def get_LED_window(raw_records):
    
    info = {'mean': [ ], 'sig': [ ], 'Norm': [ ], 'Const': [ ], 'pmt': [ ]}
    d2 = pd.DataFrame({'channel': [ ], 'idx_LED': [ ]}) 
    n_channel_s = np.arange(0, 249, 1)
    pmts_rejected = [ ]
    x = np.arange(0,len(raw_records['data']),1)

    for n_channel in tqdm(n_channel_s):
    #for n_channel in n_channel_s:
                #if n_channel in pmts_rejected:
                 #     continue   
                wf_tmp = raw_records[raw_records['channel'] == n_channel]
                amp_wf = [ ]
                idx_amp_wf = [ ]
                for wf_ in wf_tmp:
                    amp_wf.append(np.max(wf_['data']))
                    idx_amp_wf.append(np.argmax(wf_['data']))
                    
                amp_wf = np.array(amp_wf)
                idx_amp_wf = np.array(idx_amp_wf)

                hist, xbins = np.histogram(amp_wf, bins=100, range=(0,200))
                xbins_center = np.array([0.5*(xbins[i]+xbins[i+1]) for i in range(len(xbins)-1)])

                mask = (xbins_center > 30) & (xbins_center < 200)
                popt, pcov = curve_fit(gaus, xbins_center[mask], hist[mask], p0=[10, 60, 20, 0], maxfev=int(1e6))

                N, mean, sig, c = popt[0], popt[1], popt[2], popt[3]

                info['mean'].append(popt[1])
                info['sig'].append(popt[2])
                info['Norm'].append(popt[0])
                info['Const'].append(popt[3])
                info['pmt'].append(n_channel)

                d1 = pd.DataFrame({'channel': [ ], 'idx_LED': [ ]})
                mask = (amp_wf < mean + sig) & (amp_wf > mean - sig)
                idx_LED = idx_amp_wf[mask]
                if len(idx_LED)==0:
                    d1['channel'] = n_channel
                    d1['idx_LED'] = np.nan
                else:
                    d1['idx_LED'] = idx_LED
                    d1['channel'] = np.ones_like(idx_LED) * n_channel
                    
                d2 = d2.append(d1, ignore_index=True)
                del d1, idx_amp_wf, idx_LED, amp_wf
                  
    mean = d2['idx_LED'].mean()
    std = d2['idx_LED'].std()
    window = [int(mean-0.5*std),int(mean+0.5*std)]
    return info, window

In [7]:
info, window = get_LED_window(df)

100%|██████████| 249/249 [00:35<00:00, 10.25it/s]


In [8]:
st.show_config('led_calibration', 'LED*')

In [None]:
st_2 = st.new_context(config=dict(LED_window = window))
st_2.show_config('led_calibration', 'LED*')

In [None]:
data = st.get_array(run_id, 'led_calibration')