### Training CNN for time series

In [1]:
import os
import constants as cs
import matplotlib as plt
import settings
import globali as gg
import numpy as np
from tqdm import tqdm
import seaborn as sns
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Conv1D, Conv2D, MaxPooling1D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint, CSVLogger
from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, BatchNormalization

### Initialization

In [2]:
# Load actions and counters
gg.actions, gg.counters = settings.read_yaml_fields()
label_map = {label:num for num, label in enumerate(gg.actions)}

In [3]:
def read_keypoints(file_path):
    return np.loadtxt(file_path)

def read_sequence_from_folder(folder_path):
    files = sorted([os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.txt')])
    sequence = [read_keypoints(file) for file in files[:30]]
    return np.array(sequence)


In [4]:
sequences, labels = [], []
arrays_list = []
        
for action in tqdm(gg.actions, desc="Processing Actions"):
    for sequence in range(gg.counters[action]):
        window = []
        for frame_num in range(cs.NUM_FRAME):
            res = np.load(os.path.join(cs.KEYPOINTS_FOLDER, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        video_keypoints = np.stack(window, axis=-1)  # Dimensione: (1662, 30)
        sequences.append(video_keypoints)
        labels.append(label_map[action])
        

Processing Actions: 100%|██████████| 58/58 [01:20<00:00,  1.39s/it]


In [6]:
print(len(sequences))
print(len(sequences[0]))
print(len(labels))

4921
1662
4921


In [13]:
X = np.stack(sequences, axis=0)
np.savez('./temp/X.npz', data=X)

In [14]:
y = to_categorical(labels).astype(int)
np.savez('./temp/y.npz', data=y)

In [9]:
def create_folder_for_training(number):
    # create folders
    log_f = os.path.join(cs.LOGS_FOLDER, 'log_' + str(number), '')
    model_f = os.path.join(cs.MODELS_FOLDER, 'model_' + str(number))
    os.makedirs(log_f, exist_ok=True)
    os.makedirs(model_f, exist_ok=True)
    log_f = "D:\logs" # + "log_" + str(number)
    return log_f, model_f

In [10]:
log_f, model_f = create_folder_for_training(6)

### Build and training CNN 

In [12]:
# def training data and test data
X_train, X_test, y_train, y_test = train_test_split(X, labels, test_size=0.2, random_state=42)

# def CNN model
input_shape = (1662, 30, 1)
num_classes = 58

model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),

    Conv2D(64, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),

    Conv2D(128, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),

    Flatten(),
    Dense(128, activation='relu'),
    BatchNormalization(),
    Dropout(0.5),

    Dense(num_classes, activation='softmax')
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

In [13]:
# log
tensorboard_callback = TensorBoard(log_dir=log_f, histogram_freq=1)
training_log = os.path.join(log_f, 'train_log.csv')
csv_logger = CSVLogger(training_log, append=True)

In [14]:
# Training model
# model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
early_stopping = EarlyStopping(monitor='loss', patience=10, restore_best_weights=True)
best_model_path = os.path.join(model_f, 'best_model.h5') 
checkpoint = ModelCheckpoint(best_model_path, monitor='loss', save_best_only=True)

In [15]:
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.20, random_state=42, stratify=y)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.50, random_state=42, stratify=y_temp)
np.save('./temp/X_train', X_train)
np.save('./temp/y_train', y_train)
np.save('./temp/X_test', X_test)
np.save('./temp/y_test', y_test)
np.save('./temp/X_val', X_test)
np.save('./temp/y_val', y_test)

In [21]:
history = model.fit(X_train, y_train, epochs=200, batch_size=32, validation_data=(X_val, y_val), callbacks=[early_stopping, checkpoint, csv_logger, tensorboard_callback])        

Epoch 1/100
 13/123 [==>...........................] - ETA: 2:21 - loss: 1.4494 - accuracy: 0.5577

KeyboardInterrupt: 

In [None]:
# save best model
final_model_path = os.path.join(model_f, 'final_model.h5')
model.save(final_model_path)

In [None]:
model.summary()

In [None]:
# Load the weights pf trained model
model.load_weights('models/model_3/best_model.h5')

### Evaluation using Confusion Matrix and Accuracy

In [None]:
y_pred = model.predict(X_test)

In [None]:
y_true = np.argmax(y_test, axis=1).tolist()
y_pred = np.argmax(y_pred, axis=1).tolist()

In [None]:
# calculate confusion matrix
cm = confusion_matrix(y_true, y_pred)

# show confusion matrix
plt.figure(figsize=(12, 12))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')

# save confusion matrix into file JPG
plt.savefig(model_f + '/confusion_matrix.jpg', format='jpg')
plt.show()
plt.close()

In [None]:
# Evaluate model
score = model.evaluate(X_test, y_test, verbose=0)
print(f'Test loss: {score[0]} / Test accuracy: {score[1]}')