Import library

In [None]:
import os
import numpy as np
import pickle
from tqdm import tqdm
from scipy import signal 
import pandas as pd
from matplotlib import pyplot as plt
import os, glob
from scipy.signal import butter, lfilter
from scipy.signal import freqs
from numpy.fft import fft, ifft,rfft,fftfreq

Defined custom function

In [None]:
def fftx(x):
  sr = 51200 #sampling rate
  X = abs(rfft(x)) #transform to FFT
  n = len(x)
  freq = fftfreq(X.size, 1/sr)
  X = X[((freq>10) & (freq<200))] #bandpass with 10hz to 200hz
  X = X-150 #Simply delete noise with offset value 
  return X
  
def bandpass(data1,data2,data3):
  A=fftx(data1)
  B=fftx(data2)
  C=fftx(data3)
  return  A,B,C

def process_data(data_path):
  x = pd.read_csv(data_path,header=None,names=['x','y','z']) 
  A,B,C = bandpass((x['x']),(x['y']),(x['z']))
  x_all.append(A)
  y_all.append(B)
  z_all.append(C)

Import your data

In [None]:
data_path = glob.glob("train_data600\\train*.csv")
data_path2 = glob.glob("train_data1200\\train*.csv")
data_path = data_path+data_path2

Process Data and save to npy file

In [None]:
x_all = []
y_all = []
z_all = []
for path in tqdm(data_path):
  process_data(path)
  
np.save("X_all.npy", x_all)
np.save("Y_all.npy", y_all)
np.save("Z_all.npy", z_all)
np.save("Label.npy", y_label)

Make multi label for multi task

In [None]:
import numpy as np
x_all = np.load("X_all.npy")
y_all = np.load("Y_all.npy")
z_all = np.load("Z_all.npy")
y_label = np.load("Label.npy")
xyz = np.transpose(np.stack((x_all,y_all,z_all)) , [1,2,0])

# os.environ["CUDA_VISIBLE_DEVICES"] = "1"

readme = ['Normal', 'UB_1P', 'UB_2P', 'UB_3P', 'MA_P16', 'MA_P20', 'MA_P46', 'MP_N13', 'MP_N16',
          'MP_N18', 'MP_N24', 'MA_P16+UB_1P', 'MA_P16+UB_2P', 'MA_P16+UB_3P', 'MA_P20+UB_1P',
          'MA_P20+UB_2P', 'MA_P20+UB_3P', 'MA_P46+UB_1P', 'MA_P46+UB_2P', 'MA_P46+UB_3P', 'MP_N13+UB_1P', 'MP_N13+UB_2P', 'MP_N13+UB_3P', 
          'MP_N16+UB_1P', 'MP_N16+UB_2P', 'MP_N16+UB_3P', 'MP_N18+UB_1P', 'MP_N18+UB_2P', 'MP_N18+UB_3P', 'Mixcase']
UP = ['no', 'UB_1P', 'UB_2P', 'UB_3P']
MA = ['no', 'MA_P16', 'MA_P20', 'MA_P46']
MP = ['no', 'MP_N13', 'MP_N16','MP_N18', 'MP_N24']
fail2index = { w :i for i, w in enumerate(readme)}
index2fail = { i :w for i, w in enumerate(readme)}

def onehotUP(label):
  a = np.zeros(4)
  a[label]=1
  return a
def onehotMA(label):
  a = np.zeros(4)
  i = (label%4) +1
  a[i] = 1
  return a
def onehotMP(label):
  a = np.zeros(5)
  i = (label%7) +1
  a[i] = 1
  return a
def makeonehot(fail):
  fail = fail.split('+')
  yup=[]
  yma=[]
  ymp=[]
  for f in fail:
    if f in fail2index:
      if fail2index[f]>=1 and fail2index[f]<=3: yup = onehotUP(fail2index[f])
      if fail2index[f]>=4 and fail2index[f]<=6: yma = onehotMA(fail2index[f])
      if fail2index[f]>=7 and fail2index[f]<=10: ymp = onehotMP(fail2index[f])
  if len(yup)==0: yup = np.array([1,0,0,0], dtype="float32")
  if len(yma)==0: yma = np.array([1,0,0,0], dtype="float32")
  if len(ymp)==0: ymp = np.array([1,0,0,0,0], dtype="float32")
  return yup, yma, ymp
def decode_up(array): return UP[np.argmax(array)]
def decode_ma(array): return MA[np.argmax(array)]
def decode_mp(array): return MP[np.argmax(array)]
fail = 'Normal'
a,b,c = makeonehot(fail)
print( a,b,c)
print(decode_up(a),decode_ma(b),decode_mp(c))


import numpy as np
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split( xyz, y_label, test_size=0.15)


yUP_train=[]
yMA_train=[]
yMP_train=[]
yUP_test=[]
yMA_test=[]
yMP_test=[]
for label in y_train:
  fail = index2fail[label]
  yup, yma, ymp = makeonehot(fail)
  yUP_train.append(yup)
  yMA_train.append(yma)
  yMP_train.append(ymp)
  
for label in y_test:
  fail = index2fail[label]
  yup, yma, ymp = makeonehot(fail)
  yUP_test.append(yup)
  yMA_test.append(yma)
  yMP_test.append(ymp)

In [None]:
#CNN
import tensorflow as tf
from tensorflow.keras import layers , models
from tensorflow.keras.layers import Flatten, Dense, Conv1D, MaxPool1D, Dropout, BatchNormalization

input_shape = (23,3)
input_layer = layers.Input(input_shape)

conv1D1 = Conv1D(filters=3, kernel_size=(3,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001))(input_layer)
batch1 = BatchNormalization()(conv1D1)

conv1D2 = Conv1D(filters=9, kernel_size=(3,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001))(batch1)
batch2 = BatchNormalization()(conv1D2)
pool = MaxPool1D(pool_size=(3,), strides=2, padding='same')(batch4)

conv1D3 = Conv1D(filters=27, kernel_size=(3,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001))(batch2)
batch3 = BatchNormalization()(conv1D3)

conv1D4 = Conv1D(filters=81, kernel_size=(3,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001))(batch3)
batch4 = BatchNormalization()(conv1D4)

pool = MaxPool1D(pool_size=(3,), strides=2, padding='same')(batch4)
drop = Dropout(0.5)(pool)
flat = Flatten()(pool)

dense1 = Dense(units = 128, activation=tf.keras.layers.LeakyReLU(alpha=0.001))(flat)
dense2 = Dense(units = 128, activation=tf.keras.layers.LeakyReLU(alpha=0.001))(dense1)
dense3 = Dense(units = 64, activation=tf.keras.layers.LeakyReLU(alpha=0.001))(dense2)

outputUB = Dense(units = len(UP), activation='softmax')(dense3)
outputMA = Dense(units = len(MA), activation='softmax')(dense3)
outputMP = Dense(units = len(MP), activation='softmax')(dense3)
model = models.Model(inputs=input_layer, outputs=[outputUB , outputMA , outputMP])
loss = tf.keras.losses.categorical_crossentropy
optimizer = tf.keras.optimizers.Adam(lr=1e-5)
model.compile(optimizer=optimizer, loss = loss, metrics=['accuracy'])
model.summary()

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import os
modelPath = os.path.join(os.getcwd(), 'bestModel.h5')

earlystopping = EarlyStopping(
    monitor='val_dense_47_loss', # set monitor metrics
    min_delta=0.001, # set minimum metrics delta
    patience=20, # number of epochs to stop training
    restore_best_weights=True, # set if use best weights or last weights
    )
callbacksList = [earlystopping] # build callbacks list

model_history = model.fit( x_train , [np.array(yUP_train), np.array(yMA_train), np.array(yMP_train)], epochs=300, 
                  batch_size = 64, 
                validation_data = ( x_test, [np.array(yUP_test), np.array(yMA_test), np.array(yMP_test)] ), 
                callbacks=callbacksList)


In [None]:
file_list_test = os.listdir("H:\\Hackathon2\\test")
len(file_list_test)
path_dat = "H:\\Hackathon2\\test"

In [None]:
import os, glob
from tqdm import tqdm
import pandas as pd
data_path = glob.glob("H:\Hackathon2\\test\\test*.csv")
import numpy as np
from numpy.fft import fft, ifft, fftfreq
def process_data(data_path):
  x = pd.read_csv(data_path,header=None,names=['x','y','z']) 
  A,B,C = bandpasskub((x['x']),(x['y']),(x['z']))
  x_all_test.append(A)
  y_all_test.append(B)
  z_all_test.append(C)
  
x_all_test = []
y_all_test = []
z_all_test = []
for path in tqdm(data_path):
  process_data(path)
  
np.save("X_alltest.npy", x_all_test)
np.save("Y_alltest.npy", y_all_test)
np.save("Z_alltest.npy", z_all_test)


In [None]:
x_test = np.load("X_alltest.npy")
y_test = np.load("Y_alltest.npy")
z_test = np.load("Z_alltest.npy")
xyz_test = np.transpose(np.stack((x_test,y_test,z_test)) , [1,2,0])
xyz_test.shape

In [None]:
pred = model.predict(xyz_test)
from tqdm import tqdm
predict = []
for i in tqdm(range(1800)):
  a,b,c = pred[0][i], pred[1][i], pred[2][i] 
  # print(a,b,c)
  pr = []
  p = [decode_up(a),decode_ma(b),decode_mp(c)]
  # print(p)
  for j in p:
    if j!='no': pr.append(j)
  if len(pr)>0 and len(pr)<3:
    pr = '+'.join(sorted(pr))
  elif len(pr)==3:
    pr = 'Mixcase'
  else:
    pr = 'Normal'
  if pr in fail2index: predict.append(fail2index[pr])
  else: predict.append(fail2index['Normal'])
print(predict)

In [None]:
#read Sample and save your submission
sample_submission=pd.read_csv('Sample_submission.csv')
df = pd.DataFrame({ 'filename':sample_submission['filename'], 'code':predict })
df.to_csv('Submitfinal.csv')