# Dependencies

In [None]:
import os
import cv2
import math
import numpy as np
import seaborn as sns
import tensorflow as tf
import matplotlib.pyplot as plt

from keras.layers import *
import tensorflow.keras.backend as K
from keras.optimizers import Adam, SGD
from datetime import datetime, timedelta
from sklearn.metrics import confusion_matrix
from keras.utils import to_categorical, plot_model
from keras.models import Sequential, Model, load_model
from keras.callbacks import ModelCheckpoint, EarlyStopping

# Data Preparation

In [None]:
# Mount to Drive
from google.colab import drive
drive.mount('/content/drive')

# G-Drive Path
dataset_path = "/content/drive/MyDrive/Skripsi/Dataset"
save_file_path = "/content/drive/MyDrive/Skripsi/File_Save"

In [None]:
# count dataset total
def count_files(directory):
    return len([name for name in os.listdir(directory) if os.path.isfile(os.path.join(directory, name))])

# Initialize the total amount for theft and normal classes
total_pencurian = 0
total_normal = 0

# Initialize the amount per category
kategori_pencurian = {'Gedung': 0, 'Ritel': 0, 'Motor': 0}
kategori_normal = {'Gedung': 0, 'Ritel': 0, 'Motor': 0}

# Loop for each category and subcategory
for category in ['Pencurian', 'Normal']:
    for subcategory in ['Gedung', 'Ritel', 'Motor']:
        directory = os.path.join(dataset_path, category, subcategory)
        file_count = count_files(directory)

        if category == 'Pencurian':
            total_pencurian += file_count
            kategori_pencurian[subcategory] += file_count
        else:
            total_normal += file_count
            kategori_normal[subcategory] += file_count

# Print Result
print(f"Total video kelas Pencurian\t: {total_pencurian}")
print(f"Total video kelas Normal\t: {total_normal}")
print("\nJumlah video Pencurian per-kategori:")
for subcategory, count in kategori_pencurian.items():
    print(f"  {subcategory}: {count}")
print("\nJumlah video Normal per-kategori:")
for subcategory, count in kategori_normal.items():
    print(f"  {subcategory}: {count}")

In [None]:
IMAGE_HEIGHT = 120
IMAGE_WIDTH = 160
SEQUENCE_COUNT = 20 # frames count for capture video
BATCH_SIZE = 2

'''
  Batch_size is the number of data samples processed by the model in one
  training iteration. A larger batch_size uses more memory, but training
  process can be faster and more stable.
'''

# Preprocessing

In [None]:
# function for split dataset to train, valiation, and test
def split_dataset(data_path, train_num=30, val_num=3, test_num=2):
  train_files = []
  val_files = []
  test_files = []

  for root, _, files in os.walk(data_path): # loop for all dataset file path
    for idx, file in enumerate(files): # loop for file video name
      video_path = os.path.join(root, file) # spesific dataset file path

      # get file path
      if idx < train_num:
        train_files.append(video_path)
      elif idx < train_num + val_num:
        val_files.append(video_path)
      elif idx < train_num + val_num + test_num:
        test_files.append(video_path)

      # else:
        # test_files.append(video_path)

    # Check for duplicates
    all_paths = train_files + val_files + test_files
    unique_paths = set(all_paths)

    # Check if there are duplicates
    if len(all_paths) != len(unique_paths):
      print("Warning: Duplicate paths found!")

  return train_files, val_files, test_files

In [None]:
# function for labeling image
def labeling_img(video_path):
  file_name = os.path.basename(video_path) # get file video name
  file_name = file_name.split()[0] # take the first word

  if file_name == 'normal': # class: 0 for normal, 1 for pencurian
    label = 0
  else:
    label = 1

  return label

In [None]:
# Function for get video frames
def get_video_frame(video_path, resize=(320, 240), sequence_count=20):
  # get frames and labels list
  frames = []
  labels = []

  video_reader = cv2.VideoCapture(video_path)
  frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
  skip_frames = max(int(frames_count/sequence_count), 1) # set the value to 1 if the quotient is negative

  for frame_counter in range(sequence_count):
    # set the current frame position of the video.
    video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames)

    # read the video frame.
    success, frame = video_reader.read()

    # check if Video frame is not successfully read then break the loop
    if not success:
        break

    # convert image color format
    frame = frame[:, :, [2, 1, 0]]
    '''
      OpenCV uses the BGR (Blue, Green, Red) image color format,
      so it needs to be converted to RGB (Red, Green, Blue) before modeling
      process.
    '''

    # resize image
    frame = cv2.resize(frame, resize)

    # append the normalized frame into the frames list
    frames.append(frame)

    # call function labeling image
    # label = labeling_img(video_path)
    # labels.append(label)

  video_reader.release()
  # return frames, labels
  return frames

In [None]:
# function for normalize pixel with range 0-1
def normalization_pixel(get_image):
  get_image = get_image.astype(np.float32) # convert integer to float
  normalized_image = get_image / 255.0 # normalize pixelz
  return normalized_image

In [None]:
# split dataset path for train, validation, and test
train_path, val_path, test_path = split_dataset(dataset_path, train_num=30, val_num=3, test_num=2)

# ======================= Process The Images and Labels ========================
# list image frame
train_frames = []
val_frames = []
test_frames = []

# list image labels
train_labels = []
val_labels = []
test_labels = []

# get images and labels train
for path in train_path:
  frame = get_video_frame(video_path=path, resize=(IMAGE_WIDTH, IMAGE_HEIGHT), sequence_count=SEQUENCE_COUNT)
  if len(frame) == SEQUENCE_COUNT: # check frame are the same as count of sequences
    train_frames.append(frame)

    # call function labeling image
    label = labeling_img(path)
    train_labels.append(label)

# get images and labels validation
for path in val_path:
  frame = get_video_frame(video_path=path, resize=(IMAGE_WIDTH, IMAGE_HEIGHT), sequence_count=SEQUENCE_COUNT)
  if len(frame) == SEQUENCE_COUNT: # check frame are the same as count of sequences
    val_frames.append(frame)

    # call function labeling image
    label = labeling_img(path)
    val_labels.append(label)

# get images and labels test
for path in test_path:
  frame = get_video_frame(video_path=path, resize=(IMAGE_WIDTH, IMAGE_HEIGHT), sequence_count=SEQUENCE_COUNT)
  if len(frame) == SEQUENCE_COUNT: # check frame are the same as count of sequences
    test_frames.append(frame)

    # call function labeling image
    label = labeling_img(path)
    test_labels.append(label)

# convert list images to numpy array
train_labels = np.array(train_labels)
val_labels = np.array(val_labels)
test_labels = np.array(test_labels)

# convert list labels to numpy array
train_frames = np.array(train_frames)
val_frames = np.array(val_frames)
test_frames = np.array(test_frames)

# show image shape
print("Training shape images:", train_frames.shape)
print("Valiation shape images:", val_frames.shape)
print("Testing shape images:", test_frames.shape)

# show labels shape
print("\nTraining shape labels:", train_labels.shape)
print("Valiation shape labels:", val_labels.shape)
print("Testing shape labels:", test_labels.shape)

# convert label to one-hot-encoding
train_labels = to_categorical(train_labels)
val_labels = to_categorical(val_labels)
test_labels = to_categorical(test_labels)

# show one-hot-encoding labels shape
print("\nTraining shape one-hot-encoding labels:", train_labels.shape)
print("Valiation shape one-hot-encoding labels:", val_labels.shape)
print("Testing shape one-hot-encoding labels:", test_labels.shape)

In [None]:
# Normalize pixel with range 0-1
train_frames_norm = normalization_pixel(train_frames)
val_frames = normalization_pixel(val_frames)
test_frames = normalization_pixel(test_frames)

# Print pixel range comparison
print(f"Range before normalization:\nmin: {np.min(train_frames)}\nmax: {np.max(train_frames)}")
print(f"\nRange after normalization:\nmin: {np.min(train_frames_norm)}\nmax: {np.max(train_frames_norm)}")

# Modeling

In [None]:
# Fix Architecture
model_name = 'SelfBuild_CNN-LSTM' # name for save model

model = Sequential()

model.add(TimeDistributed(Conv2D(32, (3, 3), padding='same', activation='relu'),
                          input_shape=(SEQUENCE_COUNT, IMAGE_HEIGHT, IMAGE_WIDTH, 3)))
model.add(TimeDistributed(BatchNormalization()))
model.add(TimeDistributed(MaxPooling2D((2, 2))))
model.add(TimeDistributed(Dropout(0.25)))

model.add(TimeDistributed(Conv2D(64, (3, 3), padding='same', activation='relu')))
model.add(TimeDistributed(BatchNormalization()))
model.add(TimeDistributed(MaxPooling2D((2, 2))))
model.add(TimeDistributed(Dropout(0.25)))

model.add(TimeDistributed(Conv2D(128, (3, 3), padding='same', activation='relu')))
model.add(TimeDistributed(BatchNormalization()))
model.add(TimeDistributed(MaxPooling2D((2, 2))))
model.add(TimeDistributed(Dropout(0.25)))

model.add(TimeDistributed(Flatten()))
model.add(LSTM(128, return_sequences=True))
model.add(LSTM(64))
# model.add(LSTM(128))
# model.add(Dense(128, activation='relu'))
# model.add(Dropout(0.50))

model.add(Dense(2, activation='sigmoid'))

model.summary()

### process architecture

In [None]:
# create folder for save the model
current_datetime = datetime.now() + timedelta(hours=7) # current Indonesia date and time (GMT +7)

# set date format for main folder
date_format = '%y-%m-%d'
date = datetime.strftime(current_datetime, date_format)

# set time format for sub-folder
# time_format = '%H:%M'
time_format = '%H_%M' # for windows
time = datetime.strftime(current_datetime, time_format)

# create folder
model_save_folder = os.path.join(save_file_path, 'Model', date, f'{model_name}({time})') # folder path
os.makedirs(model_save_folder, exist_ok=True) # create folder if not exist

# Save and Show the Architecture Model Graph Image
save_file = os.path.join(model_save_folder, f'architecture.jpg') # file path
plot_model(model, to_file=save_file, show_shapes=True, show_layer_names=True)

# Training

In [None]:
def Recall(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def Precision(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def F1_Score(y_true, y_pred):
    precision = Precision(y_true, y_pred)
    recall = Recall(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

In [None]:
# Check device running on GPU
if tf.config.list_physical_devices('GPU'):
  print('GPU is available')
else:
  print('GPU is not available (USING CPU)')

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.0001),
              loss='binary_crossentropy',
              metrics=['Accuracy', 'Precision', 'Recall', F1_Score])

In [None]:
# save the best training model
best_model_path = os.path.join(model_save_folder, 'best_model.keras')
checkpoint = ModelCheckpoint(best_model_path, monitor='val_loss', save_best_only=True, mode='min', verbose=1)

# early stopping when there was no significant improvement in the training process
early_stopping = EarlyStopping(monitor='val_loss', patience=4, mode='auto')

# training the model
model_history = model.fit(x = train_frames_norm,
                          y = train_labels,
                          validation_data= (val_frames, val_labels),
                          epochs = 100,
                          batch_size = BATCH_SIZE,
                          callbacks=[checkpoint, early_stopping],
                          shuffle = True)

# save the training model
model_save_path = os.path.join(model_save_folder, 'model.keras')
model.save(model_save_path)

In [None]:
# show plot model history
metrics = list(model_history.history.keys()) # get dictionary key
num_plot = int(len(metrics) / 2) # for get only train key

_, axs = plt.subplots(1, num_plot, figsize=(30, 6)) # set canvas size

for idx in range(num_plot):
  train_values = model_history.history[metrics[idx]]
  val_values = model_history.history[f'val_{metrics[idx]}']

  # set up show plot data
  axs[idx].plot(train_values, label=f'train_{metrics[idx]}')
  axs[idx].plot(val_values, label=f'val_{metrics[idx]}')
  axs[idx].set_title(f'{metrics[idx]} model')
  axs[idx].set_xlabel('epoch')
  axs[idx].set_ylabel(f'{metrics[idx]}')
  axs[idx].legend()

# save the plot
save_img = os.path.join(model_save_folder, 'plotting_metrics.jpg')
plt.savefig(save_img)

# show ploting
plt.tight_layout()
plt.show()

# Evaluation

In [None]:
# evaluate the model
model_eval = load_model(model_save_path, custom_objects={'F1_Score': F1_Score})
result_model_eval = model_eval.evaluate(test_frames, test_labels)

In [None]:
# evaluate the best model
best_model = load_model(best_model_path, custom_objects={'F1_Score': F1_Score})
result_best_model = best_model.evaluate(test_frames, test_labels)

In [None]:
# Model Report
def report_metrics_value(model, metric_key, metric_name, idx, note):
  for key in metric_key[:metric_name]:
    if key == 'f1_score':
      value = np.mean(model.history[key][idx])
    else:
      value = model.history[key][idx]

    note.write('\ttrain_{}\t: {:.4}'.format(key, value))

  note.write('\n')
  for key in metric_key[metric_name:]:
    if key == 'val_f1_score':
      value = np.mean(model.history[key][idx])
    else:
      value = model.history[key][idx]

    note.write('\t{}\t: {:.4}'.format(key, value))

def report_eval(model, metric_key, note):
  for idx, value in enumerate(model):
    if isinstance(value, np.ndarray):
      value = np.mean(value)

    note.write('\t{}\t: {:.4}'.format(metric_key[idx], value))

# Save Files
save_report = os.path.join(model_save_folder, 'training_report.txt')
with open(save_report, 'w') as f:
  # Training report
  f.write(f'TRAINING REPORT📝\n\n')
  f.write(f'Image Size\t\t: {IMAGE_HEIGHT}(H) x {IMAGE_WIDTH}(W)\n')
  f.write(f'Total Train Data\t: {train_frames_norm.shape[0]}\nTotal Valid Data\t: {val_frames.shape[0]}\nTotal Test Data\t\t: {test_frames.shape[0]}\n')
  f.write(f'Frame Sequences\t\t: {SEQUENCE_COUNT}\n')
  f.write(f'Batch Size\t\t: {BATCH_SIZE}\n\n')

  f.write(f'Optimizer Name\t: {model.optimizer.get_config()["name"]}\n')
  f.write('learning rate\t: {:.5g}\n'.format(model.optimizer.get_config()['learning_rate']))

  config_str='\n\t'.join([f'{key}: {value}' for key, value in model.optimizer.get_config().items() if key != 'name' and key != 'learning_rate'])
  f.write(f'Optimizer Config:\n\t{config_str}\n\n')

  f.write(f'Loss\t: {model.loss}\n')
  f.write(f'Metrics\t: {metrics[1:num_plot]}\n\n')

  f.write(f'Total Epoch\t\t: {model_history.params["epochs"]}\n')
  f.write(f'Step Per-Epoch\t\t: {model_history.params["steps"]}\n')
  try:
    f.write(f'Epoch Early Stopping\t: {early_stopping.stopped_epoch}\n')
  except:
    f.write(f'Epoch Early Stopping\t: 0\n')

  idx = model_history.history['val_loss'].index(min(model_history.history['val_loss']))
  f.write(f'Best Model Epoch\t: {idx+1}\n\n')

  f.write('Training Model:\n')
  report_metrics_value(model_history, metrics, num_plot, idx=-1, note=f)

  f.write('\n\nTraining Best Model:\n')
  report_metrics_value(model_history, metrics, num_plot, idx=idx, note=f)

  # Evaluation Report
  f.write('\n\nEvaluation Model:\n')
  report_eval(result_model_eval, metrics, f)

  f.write('\n\nEvaluation Best Model:\n')
  report_eval(result_best_model, metrics, f)

In [None]:
# get label predict and original
get_data = [train_frames, val_frames, test_frames]
get_labels = [train_labels, val_labels, test_labels]
titles = ['Train', 'Validation', 'Test']
name_confusion = 'Last Epoch Confusion Matrix'

fig, axes = plt.subplots(1, 3, figsize=(21, 7))
fig.suptitle(name_confusion, fontsize=16)

for idx, ax in enumerate(axes):
  model_predict = model_eval.predict(get_data[idx])
  label_predict = np.argmax(model_predict, axis=1)
  label_original = np.argmax(get_labels[idx], axis=1)
  conf_matrix = confusion_matrix(label_predict, label_original) # get confusion matrix

  sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', ax=ax)
  ax.set_xlabel('Predicted labels')
  ax.set_ylabel('True labels')
  ax.set_title(titles[idx])
  ax.set_xticklabels(['Class 0', 'Class 1'])
  ax.set_yticklabels(['Class 0', 'Class 1'])

# save the plot
save_img = os.path.join(model_save_folder, name_confusion + '.jpg')
plt.savefig(save_img)

# show plot
plt.tight_layout()
plt.show()

In [None]:
# get label predict and original
get_data = [train_frames, val_frames, test_frames]
get_labels = [train_labels, val_labels, test_labels]
titles = ['Train', 'Validation', 'Test']
name_confusion = 'Best Model Confusion Matrix'

fig, axes = plt.subplots(1, 3, figsize=(21, 7))
fig.suptitle(name_confusion, fontsize=16)

for idx, ax in enumerate(axes):
  model_predict = best_model.predict(get_data[idx])
  label_predict = np.argmax(model_predict, axis=1)
  label_original = np.argmax(get_labels[idx], axis=1)
  conf_matrix = confusion_matrix(label_predict, label_original) # get confusion matrix

  sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', ax=ax)
  ax.set_xlabel('Predicted labels')
  ax.set_ylabel('True labels')
  ax.set_title(titles[idx])
  ax.set_xticklabels(['Class 0', 'Class 1'])
  ax.set_yticklabels(['Class 0', 'Class 1'])

# save the plot
save_img = os.path.join(model_save_folder, name_confusion + '.jpg')
plt.savefig(save_img)

# show plot
plt.tight_layout()
plt.show()