In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
import time
from pathlib import Path
import timeit
import datetime

from tensorflow.keras.models import model_from_json
from scipy.io import wavfile
from scipy import signal
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer

from tensorflow.keras.layers import Conv2D, BatchNormalization, MaxPooling2D, Dense, Input, Dropout, Flatten
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint
from tensorflow.keras import regularizers

train_path = './data'
sample_rate = 200

In [2]:
class get_data:
    def __init__(self, path):
        self.path = path
        self.get_data()
        self.get_speg()
        self.split_data()
        self.get_label()

    def get_data(self):
        data_dir = Path(self.path)
        # next(os.walk(path))[2]
        files = [(str(file), file.parts[-2]) for file in data_dir.glob("**/*.txt") if file]
        self.df = pd.DataFrame(files, columns=['path', 'state'])

    def get_speg(self):
        ret_arr = list()
        for i in range(len(self.df['path'])):
            file = pd.read_csv(self.df['path'][i], index_col=0)
            
            f_lst = list()
            for ch in range(4):
                _, _, speg = self.log_specgram(file['ch' + str(ch + 1)], sample_rate)
                speg = speg.reshape(len(speg), len(speg[0]), 1)
                f_lst.append(speg)
            speg_4ch = np.concatenate((f_lst[0], f_lst[1], f_lst[2], f_lst[3]), axis=2)
            ret_arr.append(speg_4ch)
        self.df['speg'] = ret_arr
    
    def log_specgram(
        self,
        audio,
        sample_rate,
#         window_size=20,
#         step_size=10,
        eps=1e-10
    ):
#         nperseg = int(round(window_size * sample_rate / 1e3))
#         noverlap = int(round(step_size * sample_rate / 1e3))
        nperseg=15 #25 40
        noverlap=7 #20 20
        freqs, times, spec = signal.spectrogram(audio,
                                        fs=sample_rate,
                                        window='hann',
                                        nperseg=nperseg,
                                        noverlap=noverlap,
                                        detrend=False)
        return freqs, times, np.log(spec.T.astype(np.float32) + eps)
        
    def split_data(self):
#         labelbinarizer = LabelBinarizer()
        self.X = self.df.speg
        self.y = self.df.state
        self.ts_f, self.vs_f, self.ts_l, self.vs_l =\
        train_test_split(
            self.X,
            self.y,
            test_size=0.3,
            stratify=self.y
        )
        self.vs_f, self.test_f, self.vs_l, self.test_l =\
        train_test_split(
            self.vs_f,
            self.vs_l,
            test_size=0.5,
            stratify=self.vs_l
        )
    
    def get_label(self):
        self.ts_l = pd.get_dummies(self.ts_l)
        self.vs_l = pd.get_dummies(self.vs_l)
        self.test_l = pd.get_dummies(self.test_l)

In [3]:
# need:
#   init shape
class create_model:
    def __init__(self, shape):
        self.shape = shape
        
        self.build_model()
        self.compile_model()
        
    def build_model(self):
        input_layer = Input(shape=self.shape)
        
        model = BatchNormalization()(input_layer)
        
        model = Conv2D(filters=8, kernel_size=(2, 2), padding='same', activation='relu')(model)
        model = Conv2D(8, (2, 2), padding='same', activation='relu')(model)
        model = MaxPooling2D((2, 2))(model)
        model = Dropout(0.2)(model)
        
        model = Conv2D(16, (3, 3), padding='same', activation='relu')(model)
        model = Conv2D(16, (3, 3), padding='same', activation='relu')(model)
        model = MaxPooling2D((2, 2))(model)
        model = Dropout(0.2)(model)
        
        model = Conv2D(32, (3, 3), padding='same', activation='relu')(model)
        model = MaxPooling2D((2, 2))(model)
        model = Dropout(0.2)(model)

        model = Flatten()(model)
        model = BatchNormalization()(Dense(128, kernel_regularizer=regularizers.l2(0.01), activation='relu')(model))
        model = BatchNormalization()(Dense(128, kernel_regularizer=regularizers.l2(0.01), activation='relu')(model))
        
        model = Dense(2, activation='softmax')(model)

        self.model = Model(inputs=input_layer, outputs=model)
        
    def compile_model(self):
        my_opt = tf.optimizers.Adam(
            learning_rate=0.001
        )
        self.model.compile(loss='binary_crossentropy', optimizer=my_opt, metrics=['accuracy'])
    
    def train(
        self,
        ts_f,
        ts_l,
        vs_f,
        vs_l,
        epochs=10,
        batch_size=25,
        has_tb=True,
        tb_path='./tensorboard/default'
    ):
        self.ts_f = np.array([data for data in ts_f])
#         print(self.ts_f.shape)
        self.ts_l = ts_l
        self.vs_f = np.array([data for data in vs_f])
        self.vs_l = vs_l
        self.epochs = epochs
        self.batch_size = batch_size
        self.has_tb = has_tb
        self.tb_path = tb_path
        
        fit_arg = dict(
            x=self.ts_f,
            y=self.ts_l,
            epochs=self.epochs,
            batch_size=self.batch_size,
            validation_data=(self.vs_f, self.vs_l)
        )
        if has_tb:
            tensorboard = TensorBoard(log_dir=self.tb_path)
            fit_arg['callbacks'] = [tensorboard]
        self.model.fit(**fit_arg)
        
    def evaluate(
        self,
        ts_f,
        ts_l
    ):
        self.ts_f = np.array([data for data in ts_f])
        self.ts_l = ts_l
        
        self.score = self.model.evaluate(self.ts_f, self.ts_l, verbose=1)
#         print(self.score)
#         print(self.model.metrics_names)
        print("%s: %.2f" % (self.model.metrics_names[0], self.score[0]))
        print("%s: %.2f" % (self.model.metrics_names[1], self.score[1]))

In [6]:
data = get_data(train_path)
# data.df.head()

In [7]:
model = create_model(shape=data.ts_f[0].shape)

In [8]:
model.train(data.ts_f, data.ts_l, data.vs_f, data.vs_l, epochs=20, has_tb=True, tb_path='./tensorboard/baseline_v0')

Train on 700 samples, validate on 150 samples
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [9]:
model.evaluate(data.test_f, data.test_l)

loss: 0.75
accuracy: 0.99


In [None]:
# TEST AREA

In [65]:
train = get_data(train_path)
t_file = pd.read_csv(train.df['path'][0], index_col=0)
t_file.head()

t_ret = list()
for ch in range(4):
    _, _, speg = log_specgram(t_file['ch' + str(ch + 1)], sample_rate,) #window_size=50, step_size=35)
    speg = speg.reshape(len(speg), len(speg[0]), 1)
    t_ret.append(speg)
    
t = np.concatenate((t_ret[0], t_ret[1], t_ret[2], t_ret[3]), axis=2)
print(t.shape)

(199, 3, 4)


In [144]:
def t_log_specgram(audio, sample_rate, window_size=20,
                 step_size=10, eps=1e-10):
    nperseg = int(round(window_size * sample_rate / 1e3))
    noverlap = int(round(step_size * sample_rate / 1e3))
    print(nperseg, noverlap)
    freqs, times, spec = signal.spectrogram(audio,
                                    fs=sample_rate,
                                    window='hann',
                                    nperseg=14,
                                    noverlap=7,
                                    detrend=False)
    return freqs, times, np.log(spec.T.astype(np.float32) + eps)

t_file = pd.read_csv(train.df['path'][0], index_col=0)
_, _, t_speg = t_log_specgram(t_file['ch1'], sample_rate, window_size=30, step_size=25)
print(t_speg.shape)
# print(t_speg)
# print(1e3)
# _, _, speg = log_specgram(train.df['path'][0])

6 5
(56, 8)


In [118]:
def log_specgram(audio, sample_rate, window_size=20,
                 step_size=10, eps=1e-10):
    nperseg = int(round(window_size * sample_rate / 1e3))
    noverlap = int(round(step_size * sample_rate / 1e3))
    freqs, times, spec = signal.spectrogram(audio,
                                    fs=sample_rate,
                                    window='hann',
#                                     nperseg=nperseg,
#                                     noverlap=noverlap,
                                    nperseg=14,
                                    noverlap=7,
                                    detrend=False)
    return freqs, times, np.log(spec.T.astype(np.float32) + eps)

freqs1, times1, spectrogram1 = log_specgram(t_file['ch1'], sample_rate, window_size=50, step_size=35)
freqs2, times2, spectrogram2 = log_specgram(t_file['ch2'], sample_rate, window_size=50, step_size=35)
freqs3, times3, spectrogram3 = log_specgram(t_file['ch3'], sample_rate, window_size=50, step_size=35)
freqs4, times4, spectrogram4 = log_specgram(t_file['ch2'], sample_rate, window_size=50, step_size=35)

print(spectrogram1.shape)
print(spectrogram1[0][0])
print(spectrogram2[0][0])
print(spectrogram3[0][0])
print(spectrogram4[0][0])

ta1 = spectrogram1.reshape(len(spectrogram1), len(spectrogram1[0]), 1)
print(ta1.shape)
ta2 = spectrogram2.reshape(len(spectrogram2), len(spectrogram2[0]), 1)
ta3 = spectrogram3.reshape(len(spectrogram3), len(spectrogram3[0]), 1)
ta4 = spectrogram4.reshape(len(spectrogram4), len(spectrogram4[0]), 1)
# ta = np.append(ta1, ta2, axis=2)
# ta = np.append(ta, ta3, axis=2)
# ta = np.append(ta, ta4, axis=2)
ta = np.concatenate((ta1, ta2, ta3, ta4), axis=2)
print(ta)

# speg = np.concatenate((spectrogram1, spectrogram2, spectrogram3, spectrogram4), axis=0)

# print(speg)
# print(speg.shape)

# speg_4ch = []
# for i in range(len(spectrogram1)):
#     i_lst = list()
#     for j in range(len(spectrogram1[0])):
#         j_lst = list()
#     speg_4ch.append(list())

#     for j in range(len(spectrogram1[0])):
#         print(spectrogram1[i][j])

# print(len(speg_4ch))

# print("freqs:\n", freqs, "\n")
# print(freqs.shape)
# print("times:\n", times, "\n")
# print(times.shape)
# print("spectrogram:\n", spectrogram4, "\n")
# print(spectrogram4.shape)

(131, 6)
-5.2645254
-5.430552
-5.951558
-5.430552
(131, 6, 1)
[[[ -5.2645254   -5.430552    -5.951558    -5.430552  ]
  [ -3.627599    -1.7014384   -3.5742292   -1.7014384 ]
  [ -2.8383954   -1.1714436   -2.3975449   -1.1714436 ]
  [ -2.670378    -2.4234064   -2.0491207   -2.4234064 ]
  [ -6.7914553   -5.824142    -6.251271    -5.824142  ]
  [ -5.7685757   -6.5883374   -6.0156307   -6.5883374 ]]

 [[ -1.7589514   -5.4641457   -2.8844445   -5.4641457 ]
  [ -0.8550425   -1.5390472   -2.9510193   -1.5390472 ]
  [ -0.910285    -1.4026263   -2.3193738   -1.4026263 ]
  [ -1.9432485   -2.9584148   -2.4692895   -2.9584148 ]
  [ -3.945669    -7.3207483   -4.3558655   -7.3207483 ]
  [ -6.797452   -10.346286    -8.22204    -10.346286  ]]

 [[ -1.5686886   -0.7950065   -2.900094    -0.7950065 ]
  [ -1.9682143   -1.0798578   -2.2139988   -1.0798578 ]
  [ -1.8498638   -3.7858036   -2.0329413   -3.7858036 ]
  [ -3.023065    -3.234389    -3.8842907   -3.234389  ]
  [ -4.5496087   -6.3293905   -4.85687

In [33]:
a = [[1, 2, 3], [1, 2, 3]]
b = [[4, 5, 6], [4, 5, 6]]
c = [[7, 8, 9], [7, 8, 9]]
a = np.array(a)
b = np.array(b)
c = np.array(c)
d = np.append(a, b, axis=1)
print(d)
# print(a.shape)
# print(np.concatenate((a, b, c), axis=0))
ta = []
for i in range(300):
    ta.append(i);
# print(ta)
ta = np.array(ta)
ta = ta.reshape(10, 10, 3)
# print(ta)
# print(len(ta))
# print(ta[0])
taa = [[[1, 2, 3], [4, 5, 6]]]
taa = np.array(taa)
print(taa.shape)


[[1 2 3 4 5 6]
 [1 2 3 4 5 6]]
(1, 2, 3)
