In [1]:
import numpy as np
import keras
from os import system as sys

Using TensorFlow backend.


In [3]:
def fft_trapezoid(length, rise_time, flat_top , tau):
    length = length+2*rise_time+flat_top;
    p = np.zeros(length)
    s = np.zeros(length)
    input2 = np.zeros(length)
    p[0] = s[0] = input2[0] = 0.;
    for i in range(1,length):
        input2[i] = s[i] = 0.;
    input2[1]=1.;
    tau = 1/(np.exp(1./tau)-1);
    for i in range(1,length):
        if i>=2*rise_time+flat_top:
            d = input2[i]-input2[i-rise_time]-input2[i-rise_time-flat_top]+input2[i-2*rise_time-flat_top]
        else:
            if i>=rise_time+flat_top:
                d = input2[i]-input2[i-rise_time]-input2[i-rise_time-flat_top]
            else:
                if i>=rise_time:
                    d = input2[i]-input2[i-rise_time]
                else:
                    d = input2[i];
        p[i] = p[i-1]+d;
        s[i] = s[i-1]+p[i]+tau*d;
    for i in range(length):
        s[i] = s[i]/(rise_time*tau);
    
    res = np.fft.rfft(s)
    return res[:-(flat_top+rise_time)]

In [4]:
class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, batch_size=32, dim=(3500,1), n_channels=1, n_classes=1, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(len(self.X)/self.batch_size)

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate data
        X, y = self.__data_generation()

        return X, y
    def shuffle_in_unison(self, a, b):
        rng_state = np.random.get_state()
        np.random.shuffle(a)
        np.random.set_state(rng_state)
        np.random.shuffle(b)
        
    def on_epoch_end(self):
        'Updates indexes after each epoch'
        sys('../data_generator/bin/pulse_generation /dev/null my_data.dat')
        my_data = np.loadtxt('my_data.dat')
        my_labels = np.append(np.zeros(int(len(my_data)/2)),np.ones(int(len(my_data)/2)))
        trap=fft_trapezoid(3500, 20,0,1250)
        self.X = np.empty((len(my_data), 800))
        for i in range(len(my_data)):
            self.X[i] = np.fft.irfft(trap*np.fft.rfft(my_data[i]))[700:1500]
        self.X = self.X.reshape(len(my_data), 800, 1)
        #y = convert_to_one_hot(my_labels.astype(int64), 2).T
        self.y = my_labels
        sys('rm my_data.dat')
        if (self.shuffle):
            self.shuffle_in_unison(self.X, self.y)
        self.index=0
        

    def __data_generation(self):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X_tmp = self.X[self.index*self.batch_size:(self.index+1)*self.batch_size]
        y_tmp = self.y[self.index*self.batch_size:(self.index+1)*self.batch_size]
        self.index += 1
        return X_tmp, y_tmp

In [12]:
class DataReader(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, batch_size=32, dim=(3500,1), n_channels=1, n_classes=1, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.epoch = 0
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(len(self.X)/self.batch_size)

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate data
        X, y = self.__data_generation()
        return X, y
    
    def shuffle_in_unison(self, a, b):
        rng_state = np.random.get_state()
        np.random.shuffle(a)
        np.random.set_state(rng_state)
        np.random.shuffle(b)
        
    def on_epoch_end(self):
        if (self.epoch % 5 == 0):
            num = int(self.epoch/5)
            'Updates indexes after each epoch'
            my_data = np.loadtxt('../data/'+str(num+1)+'.dat')
            my_labels = np.append(np.zeros(int(len(my_data)/2)),np.ones(int(len(my_data)/2)))
            trap=fft_trapezoid(3500, 20,0,1250)
            self.X = np.empty((len(my_data), 800))
            for i in range(len(my_data)):
                my_data[i] = np.fft.irfft(trap*np.fft.rfft(my_data[i]))
                self.X[i] = self.X[i] - np.mean(my_data[i][:100])
                self.X[i] = self.X[i]/max(self.X[i])
            self.X = self.X.reshape(len(my_data), 800, 1)
            #y = convert_to_one_hot(my_labels.astype(int64), 2).T
            self.y = my_labels            
        self.shuffle_in_unison(self.X, self.y)
        self.index=0
        self.epoch += 1
        

    def __data_generation(self):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X_tmp = self.X[self.index*self.batch_size:(self.index+1)*self.batch_size]
        y_tmp = self.y[self.index*self.batch_size:(self.index+1)*self.batch_size]
        self.index += 1
        return X_tmp, y_tmp