In [None]:
import pathlib
import pandas as pd
import numpy as np
import tensorflow as tf
from numpy.random import seed
from keras import layers
from keras import models
from tensorflow.keras import optimizers
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  1


# read_csv加了个header=None，会多读取一行。后续训练要修正一下input shape
### 运行一次后自动加载
### 1. 两个分类的音频csv自动加载
### 2. 合并channel形成4d矩阵后，生成对应label
### 3. 构建model，训练model
### 4. 存储val_loss最小的model
### 5. 初步以0.1为跨度检查哪个区间的阈值，f1 score最高
### 6. 在f1 score最高的区间内，再细分1000个刻度进一步检查最合适阈值。
### 7. 输出结果
### 可选（调用matlab将两个class的音频文件转换为对应矩阵）

In [None]:
def readCsv(fileName):
    filePath = f'/content/drive/MyDrive/0819/{fileName}'
    folder = pathlib.Path.cwd().parent.joinpath(filePath)
    print(folder)
    soundMap = dict()  # map[string]list, list elems are np array

    for fp in folder.iterdir():
        if fp.match('*.csv'):
            # 因为有可能名字里带'.'，不能split('.'后取index0. 那就划分完把整个list都取出来，去掉最后index[-1]的csv部分做个拼接)
            varname = fp.parts[-1].split('.')
            # print(type(varname))
            # print(varname)
            # if "ScountRSound_1_7764_0_20220801173146669-3" in varname[0]:
            #   print(type(varname))
            #   print(varname)
            soundMatrix = pd.read_csv(fp,header=None).values
            # print(varname[:-8])  # each file has 4 matrices: Log, Org, Amp, Phs

            # ignore the matrices type of sound matrix
            varnameList = ','.join(varname[:-1])
            # print(varnameList)
            soundFileName = varnameList[:-8]
            # print(soundFileName)
            if soundFileName not in soundMap:
                soundMap[soundFileName] = []  
            soundMap[soundFileName].append(soundMatrix)
            # print("type is ", type(soundMatrix))
            # print(soundMatrix.shape)
    for key in soundMap:
      if len(soundMap[key])!=4:
        print(key)
    return soundMap
# readCsv('normal')

### get normal and error sound matrix

In [None]:
normalSound = readCsv('normal')
# normalSound['ScountRSound_1_7764_0_20220801173146669-3'].append(pd.read_csv("/content/drive/MyDrive/0819/normal/ScountRSound_1_7764_0_20220801173146669-3STFT_Phs.csv"))
normalSoundList = []
for v in normalSound.values():
    if np.array(v).shape!= (4,38,30):
      print(np.array(v).shape)
    normalSoundList.append(np.array(v))
normalSoundList = np.array(normalSoundList)
print(normalSoundList.shape)
transposeNormalSoundList = normalSoundList.transpose((0, 2, 3, 1))
# transpose of normal sound
print(transposeNormalSoundList.shape)

/content/drive/MyDrive/0819/normal
(630, 4, 38, 30)
(630, 38, 30, 4)


In [None]:
errorSound = readCsv('error')
errSoundList = []
for v in errorSound.values():
    # print(np.array(v).shape)
    errSoundList.append(np.array(v))
errSoundList = np.array(errSoundList)
transposeErrorSoundList = errSoundList.transpose((0, 2, 3, 1))

# transpose of error sound
print(transposeErrorSoundList.shape)

/content/drive/MyDrive/0819/error
(570, 38, 30, 4)


In [None]:
# seed(123)


### get sound labels

In [None]:
normalLabels = np.ones(len(normalSound), dtype=int)
errorLabels = np.zeros(len(errorSound), dtype=int)
print(normalLabels.shape)

totalLabels = np.concatenate((normalLabels, errorLabels),axis=0)
print('len of labels',len(totalLabels))
print(totalLabels)

totalData = np.concatenate((transposeNormalSoundList, transposeErrorSoundList), axis=0)
print(totalData.shape)

(630,)
len of labels 1200
[1 1 1 ... 0 0 0]
(1200, 38, 30, 4)


### build model

In [None]:
x_train, x_test, y_train, y_test = train_test_split(totalData, totalLabels, test_size=0.2, random_state=0)
print(y_test)
# one-hot
y_train = to_categorical(y_train)
y_test= to_categorical(y_test)


# # separate train and validation set
# x, x_test, y, y_test = train_test_split(totalData, totalLabels, test_size=0.2, random_state=0)
# print(y_test)
# # one-hot
# y_train = to_categorical(y_train)
# y_test= to_categorical(y_test)
# y_test

# x_train,x_val,y_train,y_val = train_test_split(x,y,test_size=0.2,random_state=0)

In [None]:
# # from keras.constraints import MinMaxNorm
# model = models.Sequential()
# model.add(layers.Conv2D(32,
#                         (3, 3),
#                         activation='relu',
#                         padding='same',
#                         input_shape=(38, 30, 4)))
# model.add(layers.Conv2D(32,
#                         (3, 3),
#                         activation='relu',
#                         padding='same',))

# model.add(layers.Conv2D(32,
#                         (3, 3),
#                         activation='relu',
#                         padding='same',))
# model.add(layers.Conv2D(32,
#                         (3, 3),
#                         activation='relu',
#                         padding='same',))

# model.add(layers.Conv2D(32,
#                         (3, 3),
#                         activation='relu',
#                         padding='same',))
# model.add(layers.Conv2D(32,
#                         (3, 3),
#                         activation='relu',
#                         padding='same',))

# model.add(layers.Dropout(0.1))
# model.add(layers.Flatten())
# model.add(layers.Dense(32,activation='relu'))
# model.add(layers.Dense(2, activation='softmax'))


# opt = optimizers.Adam(learning_rate=0.002, decay=1e-6) #0.002， 0.0005
# model.compile(loss='categorical_crossentropy',
#               optimizer=opt,
#               metrics=['accuracy'])

# model.summary()

In [None]:
from tensorflow import keras
from keras.layers import *
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Model
import datetime
import matplotlib.pyplot as plt
import numpy as np



# def residual_block(layer_input, filters):
#     """Residual block described in paper"""
#     d = Conv2D(filters, 3, activation='relu', padding='same')(layer_input)
#     d = BatchNormalization(momentum=0.8)(d)
#     d = Conv2D(filters, 3, activation='relu', padding='same')(d)
#     d = BatchNormalization(momentum=0.8)(d)
#     d = Add()([d, layer_input])
#     return d


inputs = Input(shape=(38, 30, 4))
conv1 = Conv2D(32, 3, activation='relu', padding='same')(inputs)
conv2 = Conv2D(32, 3, activation='relu', padding='same')(conv1)
conv2 = Conv2D(32, 3, activation='relu', padding='same')(conv2)
pool1 = MaxPooling2D(pool_size=(2, 2))(conv2)
conv3 = Conv2D(32, 3, activation='relu', padding='same')(pool1)
conv3 = Conv2D(32, 3, activation='relu', padding='same')(conv3)
conv3 = Conv2D(32, 3, activation='relu', padding='same')(conv3)
conv3 = Conv2D(32, 3, activation='relu', padding='same')(conv3)

D1 = Dropout(0.1)(conv3)
D1 = Flatten()(D1)
D1 = Dense(32,activation='relu')(D1) #relu
Out = Dense(2, activation='softmax')(D1)

model = Model(inputs, Out)
model.summary()

opt = optimizers.Adam(learning_rate=0.0001, decay=1e-6) #0.002， 0.0005
model.compile(loss='categorical_crossentropy',
              optimizer=opt,
              metrics=['accuracy'])


Model: "model_38"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_39 (InputLayer)       [(None, 38, 30, 4)]       0         
                                                                 
 conv2d_323 (Conv2D)         (None, 38, 30, 32)        1184      
                                                                 
 conv2d_324 (Conv2D)         (None, 38, 30, 32)        9248      
                                                                 
 conv2d_325 (Conv2D)         (None, 38, 30, 32)        9248      
                                                                 
 max_pooling2d_10 (MaxPoolin  (None, 19, 15, 32)       0         
 g2D)                                                            
                                                                 
 conv2d_326 (Conv2D)         (None, 19, 15, 32)        9248      
                                                          

In [None]:
x_train.shape

(960, 38, 30, 4)

In [None]:
from keras import callbacks
callbacks_list = [callbacks.ModelCheckpoint(
    filepath = 'best_model-2.h5',
    monitor =  'val_loss',
    save_best_only = True,
    verbose = 1
)]

history = model.fit(x_train,y_train,epochs=15,batch_size=3,callbacks=callbacks_list,validation_split=0.2)


Epoch 1/15
Epoch 1: val_loss improved from inf to 0.69797, saving model to best_model-2.h5
Epoch 2/15
Epoch 2: val_loss did not improve from 0.69797
Epoch 3/15
Epoch 3: val_loss did not improve from 0.69797
Epoch 4/15
Epoch 4: val_loss did not improve from 0.69797
Epoch 5/15
Epoch 5: val_loss did not improve from 0.69797
Epoch 6/15
Epoch 6: val_loss did not improve from 0.69797
Epoch 7/15
Epoch 7: val_loss did not improve from 0.69797
Epoch 8/15
Epoch 8: val_loss did not improve from 0.69797
Epoch 9/15
Epoch 9: val_loss did not improve from 0.69797
Epoch 10/15
Epoch 10: val_loss did not improve from 0.69797
Epoch 11/15
Epoch 11: val_loss did not improve from 0.69797
Epoch 12/15
Epoch 12: val_loss did not improve from 0.69797
Epoch 13/15
Epoch 13: val_loss did not improve from 0.69797
Epoch 14/15
Epoch 14: val_loss did not improve from 0.69797
Epoch 15/15
Epoch 15: val_loss did not improve from 0.69797


In [None]:
test_loss,test_acc = model.evaluate(x_test,y_test)



In [None]:
model.load_weights('best_model.h5')

OSError: ignored

In [None]:
test_loss,test_acc = model.evaluate(x_test,y_test)



In [None]:
test_acc

0.5083333253860474

In [None]:
test_loss

NameError: ignored

In [None]:
res = model.predict(x_test)

In [None]:
res

NameError: ignored

In [None]:
y_test

array([[0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.

In [None]:
pred = np.float64(res>0.2)
pred

array([[1., 1.],
       [1., 1.],
       [1., 0.],
       [0., 1.],
       [1., 1.],
       [1., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [1., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 1.],
       [1., 0.],
       [1., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [1., 1.],
       [1., 0.],
       [1., 1.],
       [0., 1.],
       [1., 0.],
       [1., 0.],
       [1., 1.],
       [0., 1.],
       [1., 0.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [1., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [1., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [1., 1.],
       [0., 1.

In [None]:
from sklearn.metrics import classification_report

In [None]:
# target = ['class 0', 'class 1']
classification_report(y_test[:,1],pred[:,1])


'              precision    recall  f1-score   support\n\n         0.0       0.53      0.40      0.45       126\n         1.0       0.48      0.61      0.54       114\n\n    accuracy                           0.50       240\n   macro avg       0.51      0.51      0.50       240\nweighted avg       0.51      0.50      0.49       240\n'

In [None]:
pred = np.int64(res>0.5)
y_label = []
for v in y_test:
  y_label.append(v[0])
y_label=np.array(y_label,dtype=int)

pred_label = []
for v in pred:
  pred_label.append(v[0])
pred_label=np.array(pred_label,dtype=int)
y_label

array([0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 1, 1,
       1, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0,
       1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0,
       0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1,
       1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1,
       0, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 0,
       0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0,
       0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0,
       1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1,
       0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 1, 0,
       1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1])

In [None]:
from sklearn.metrics import f1_score

In [None]:
pred_label

array([0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1,
       1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
       1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0,
       1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 1,
       0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0,
       1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1,
       1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1,
       0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0,
       1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1])

In [None]:
f1_score(y_label,pred_label)

0.6168831168831169

In [None]:
print(pred)

[[0 1]
 [0 1]
 [0 1]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [0 1]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [0 1]
 [0 1]
 [1 0]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [0 1]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [0 1]
 [0 1]
 [1 0]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [1 0]
 [0 1]
 [1 0]
 [1 0]
 [0 1]
 [1 0]
 [0 1]
 [0 1]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [1 0]
 [0 1]
 [0 1]
 [0 1]
 [1 0]
 [0 1]

###manual test error data


In [None]:
# # seed(123)
# # test data
# testErrorSound = readCsv('tinyError')
# print(len(testErrorSound))
# testErrorSoundList = []
# for v in testErrorSound.values():
#     testErrorSoundList.append(np.array(v))
# testErrorSoundList = np.array(testErrorSoundList)
# transposeTestErrorSoundList = testErrorSoundList.transpose((0, 2, 3, 1))

# # transpose of error sound
# print(transposeTestErrorSoundList.shape)

In [None]:
# testErrorLabels = np.zeros(len(testErrorSound), dtype=int)
# print(testErrorLabels.shape)
# testErrorLabels = to_categorical(testErrorLabels)

In [None]:
# # test data set
# resTest = model.predict(transposeTestErrorSoundList)
# predTest = np.int64(resTest>0.5)
# test_label = []
# for v in testErrorLabels:
#   test_label.append(v[0])
# test_label=np.array(test_label,dtype=int)

# test_pred_label = []
# for v in predTest:
#   test_pred_label.append(v[0])
# test_pred_label=np.array(test_pred_label,dtype=int)

In [None]:
# test_score = f1_score(test_label,test_pred_label)
# test_score

In [None]:
vRange = np.linspace(0.3,0.5,1000)

In [None]:
# tuple: (threshold,accuracy)
accMap = {"acc":(0.0,0.0)}
for threshold in vRange:
  pred = np.int64(res>threshold)

  # get y_label's index0s
  y_label = []
  for v in y_test:
    y_label.append(v[0])
  y_label=np.array(y_label,dtype=int)

  # get pred_label's index0s
  pred_label = []
  for v in pred:
    pred_label.append(v[0])
  pred_label=np.array(pred_label,dtype=int)
  score = f1_score(y_label,pred_label)
  if score > accMap["acc"][1]:
    accMap["acc"] = (threshold,score)
accMap["acc"]

(0.34184184184184185, 0.9655172413793104)

In [None]:
print(len(y_label))

240
