In [6]:
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

import os

# sklearn
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report
from sklearn.utils import shuffle

from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv1D, MaxPooling1D
from keras.layers import LSTM, TimeDistributed, ConvLSTM2D
from keras.preprocessing.text import Tokenizer
from keras.preprocessing import sequence
from keras.utils import to_categorical
from keras.callbacks import EarlyStopping

from scipy.signal import savgol_filter

In [7]:
# df = pd.read_csv('data/RAW_A9_01/a9_clean.csv', header=None)
# df = pd.read_csv('data/RAW_A9_02/a9_02_clean.csv', header=None)
# df = pd.read_csv('data/RAW_A9_02/a9_02_above.csv', header=None)
df = pd.read_csv('data/RAW_A9_03/a9_03_clean.csv', header=None)

# df = pd.read_csv('data/train_data_UU.csv', header=None)

In [8]:
print("total shape + label:", df.shape)

total shape + label: (167, 502)


In [9]:
# 平均切分各個標籤至train test中

TRAIN_SET_RATE = 0.8  # Rate to seperate train set and test set

labelA =  df[df.iloc[:, -1] == 'nothing']
labelB =  df[df.iloc[:, -1] == 'passing']
labelC =  df[df.iloc[:, -1] == 'touching']
assert len(labelA) + len(labelB) + len(labelC) == len(df)
print("[ Labels ]")
print("labelA shape:", labelA.shape)
print("labelB shape:", labelB.shape)
print("labelC shape:", labelC.shape)
print()

sfA = shuffle(labelA)
trainA, testA = sfA[:int(len(sfA) * TRAIN_SET_RATE)], sfA[int(len(sfA) * TRAIN_SET_RATE):]
trainA = trainA[:50]  # shrimp trainA size

sfB = shuffle(labelB)
trainB, testB = sfB[:int(len(sfB) * TRAIN_SET_RATE)], sfB[int(len(sfB) * TRAIN_SET_RATE):]

sfC = shuffle(labelC)
trainC, testC = sfC[:int(len(sfC) * TRAIN_SET_RATE)], sfC[int(len(sfC) * TRAIN_SET_RATE):]

print("[ Labels in Train & Test sets ]")
print("trainA shape:", trainA.shape, ", testA shape:", testA.shape)
print("trainB shape:", trainB.shape, ", testB shape:", testB.shape)
print("trainC shape:", trainC.shape, ", testC shape:", testC.shape)
print()


# train_set = pd.concat([trainA, trainB, trainC])
train_set = pd.concat([trainA, trainC])
train_set = shuffle(train_set)
# test_set = pd.concat([testA, testB, testC])
test_set = pd.concat([testA, testC])
test_set = shuffle(test_set)
print("[ Train & Test sets ]")
print("train set:", len(train_set), ", test set:", len(test_set))
print()
    
x_train, y_train = train_set.iloc[:, 0:-1], train_set.iloc[:, -1]
x_test, y_test = test_set.iloc[:, 0:-1], test_set.iloc[:, -1]

[ Labels ]
labelA shape: (76, 502)
labelB shape: (17, 502)
labelC shape: (74, 502)

[ Labels in Train & Test sets ]
trainA shape: (50, 502) , testA shape: (16, 502)
trainB shape: (13, 502) , testB shape: (4, 502)
trainC shape: (59, 502) , testC shape: (15, 502)

[ Train & Test sets ]
train set: 109 , test set: 31



In [None]:
LABELS = ['nothing', 'passing', 'touching']

y_test_index = y_test.index

# LabelEncoder
le = LabelEncoder()
le.fit(LABELS)
y_train = le.transform(y_train)
y_test = le.transform(y_test)

# Standardize
# sc = StandardScaler()
# sc.fit(x_train)
# x_train = sc.transform(x_train)
# x_test = sc.transform(x_test)

# savgol




In [None]:
epochs = 150
batch_size = 10
verbose = 0  # verbose : 0, 1或2。日誌顯示模式。0 =安靜模式, 1 =進度條, 2 =每輪一行。
time_step = 1
feature_dim = x_train.shape[1]

# reshape input to be 3D [samples, timesteps, features]
if isinstance(x_train, np.ndarray):
    x_train_reshape = x_train.reshape((x_train.shape[0], time_step, x_train.shape[1]))
    x_test_reshape = x_test.reshape((x_test.shape[0], time_step, x_test.shape[1]))
else:
    x_train_reshape = x_train.values.reshape((x_train.shape[0], time_step, x_train.shape[1]))
    x_test_reshape = x_test.values.reshape((x_test.shape[0], time_step, x_test.shape[1]))
print("[ Reshape ]")
print("train X:", x_train_reshape.shape, ", train Y:", y_train.shape)
print("test X:", x_test_reshape.shape, ", test Y:", y_test.shape)

model = Sequential()
model.add(LSTM(64, input_shape=(time_step, feature_dim)))
model.add(Dropout(0.2))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(40))
# model.add(Dropout(0.5))
model.add(Dense(2, activation='softmax'))  # out_layer
model.summary()
model.compile(loss='sparse_categorical_crossentropy',optimizer='adam',metrics=['sparse_categorical_accuracy'])
# model.compile(loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),optimizer='adam',metrics=['accuracy'])


In [None]:
def evaluate_model(trainX, trainy, testX, testy):
    verbose, epochs, batch_size = 0, 25, 64
    n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]

    n_steps, n_length = 4, 32
    trainX = trainX.reshape((trainX.shape[0], n_steps, n_length, n_features))
    testX = testX.reshape((testX.shape[0], n_steps, n_length, n_features))

    model = Sequential()
    model.add(TimeDistributed(Conv1D(filters=64, kernel_size=3, activation='relu'), 
                              input_shape=(None, n_length, n_features)))
    model.add(TimeDistributed(Conv1D(filters=64, kernel_size=3, activation='relu')))
    model.add(TimeDistributed(Dropout(0.5)))
    model.add(TimeDistributed(MaxPooling1D(pool_size=2)))
    model.add(TimeDistributed(Flatten()))
    model.add(LSTM(100))
    model.add(Dropout(0.5))
    model.add(Dense(100, activation='relu'))
    model.add(Dense(n_outputs, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)

    _, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
    return accuracy
        

def run_experiment(trainX, trainy, testX, testy, repeats=10):

    scores = list()
    for r in range(repeats):
        score = evaluate_model(trainX, trainy, testX, testy)
        score = score * 100.0
        print('>#%d: %.3f' % (r+1, score))
        scores.append(score)
    
    m, s = np.mean(scores), np.std(scores)
    print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))

In [None]:
validation_split=0.33

history = model.fit(x_train_reshape,
                    y_train,
                    validation_split=validation_split, 
                    batch_size=batch_size,
                    epochs=epochs,
                    verbose=verbose)
_, accuracy = model.evaluate(x_test_reshape, y_test)

plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
plt.show()

print(accuracy)

In [None]:
# LABELS = ['nothing', 'passing', 'touching']
LABELS = ['nothing', 'touching']
le = LabelEncoder()
le.fit(LABELS)

y_pred = model.predict(x_test_reshape, verbose=1)
y_pred_bool = np.argmax(y_pred, axis=1)
# y_pred = le.inverse_transform(y_pred_bool)


print(classification_report(y_test, y_pred_bool, zero_division=1, target_names=LABELS))

In [None]:
print(y_pred[:5])
print("pred:")
print(y_pred_bool)
print()
print("true:")
print(y_test)