# Import thư viện

In [16]:
import numpy as np
from keras.models import Sequential
import keras
from keras.layers import ConvLSTM2D, Dropout, Flatten, Dense
import os
import cv2
from keras.callbacks import EarlyStopping, ModelCheckpoint
import tensorflow_addons as tfa
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt
import random
from deap import base, creator, tools, algorithms
from tensorflow.keras.models import load_model

numpy==1.26.4
keras==2.15.0
opencv-python==4.8.0.76
tensorflow_addons==0.22.0
deap==1.4.1
tensorflow==2.15.0

# Dữ liệu đầu vào

### Đọc dữ liệu ảnh

In [17]:
# Đường dẫn đến thư mục chứa ảnh
image_folder = r"E:\BacSon\Fair_2024_ConvLSTM_ConvGRU\data"
# image_folder = r"E:\BacSon\Fair_2023_ARIMA_LSTM_GRU\dataSet\data_img"

# Đọc từng file ảnh
image_files = [f for f in os.listdir(image_folder) if os.path.isfile(os.path.join(image_folder, f))]

In [18]:
# Chuyển ảnh về ma trận ảnh Gray

def matrix_images(image_folder, image_files):
    matrix_img = []
    for img_file in image_files:
        img_path = os.path.join(image_folder, img_file)
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        matrix_img.append(image)

    return matrix_img

data = matrix_images(image_folder, image_files)

### Chuẩn hóa dữ liệu

In [19]:
# Áp dụng Min-Max Scaling chuẩn hóa dữ liệu
def min_max_scaling(data):
    # Chuyển danh sách các mảng numpy thành một mảng numpy đa chiều
    data_array = np.array(data)

    # Tính giá trị min và max trên toàn bộ dữ liệu
    data_min = np.min(data_array)
    data_max = np.max(data_array)

    # Áp dụng công thức Min-Max Scaling
    scaled_data = (data_array - data_min) / (data_max - data_min)

    return scaled_data

### Tạo nhãn cho dữ liệu

In [20]:
data = min_max_scaling(data)
data = np.expand_dims(data, axis=-1)

In [21]:
def create_image_sequences(data, time_steps):
    num_samples, height, width, channels = data.shape
    num_frames = num_samples - time_steps
    input_sequences = np.zeros((num_frames, time_steps, height, width, channels))
    labels = np.zeros((num_frames, height, width, channels))  # Khởi tạo mảng label

    for i in range(num_frames):
        input_sequences[i] = data[i:i+time_steps]
        labels[i] = data[i+time_steps]  # Xác định label là frame tiếp theo

    return input_sequences, labels

# Tạo chuỗi dữ liệu với 4 ảnh liên tiếp từ dữ liệu hình ảnh
X_data, y_data = create_image_sequences(data, 4)

### Chia dữ liệu

In [22]:
# Chia dữ liệu
size = int(len(X_data) * 0.8)
size_val = int((len(X_data) - size) / 2)

X_train = X_data[:size]
X_val = X_data[size:size + size_val]
X_test = X_data[-size_val:]

y_train = y_data[:size]
y_val = y_data[size:size + size_val]
y_test = y_data[- size_val:]

In [23]:
# Kiểm tra kích thước của X_train
print("Kích thước của X_train:", X_train.shape)
print("Kích thước của X_val:", X_val.shape)
print("Kích thước của X_test:", X_test.shape)

print("Kích thước của y_train:", y_train.shape)
print("Kích thước của y_val:", y_val.shape)
print("Kích thước của y_test:", y_test.shape)

Kích thước của X_train: (48, 4, 150, 150, 1)
Kích thước của X_val: (6, 4, 150, 150, 1)
Kích thước của X_test: (6, 4, 150, 150, 1)
Kích thước của y_train: (48, 150, 150, 1)
Kích thước của y_val: (6, 150, 150, 1)
Kích thước của y_test: (6, 150, 150, 1)


# Mô hình ConvLSTM

In [24]:
def conv_lstm(input_lstm, filters1, filters2, kernel_size1, kernel_size2, kernel_size3, activation):
    # Xây dựng mô hình ConvLSTM
    inputs_shape = input_lstm
    model_lstm = Sequential()
    model_lstm.add(ConvLSTM2D(filters=filters1, kernel_size=kernel_size1, input_shape=inputs_shape, padding='same', activation=activation, return_sequences=True))
    model_lstm.add(ConvLSTM2D(filters=filters2, kernel_size=kernel_size2, padding='same', activation=activation, return_sequences=True))
    model_lstm.add(ConvLSTM2D(filters=1, kernel_size=kernel_size3, padding='same', activation='tanh'))

    return model_lstm

In [25]:
def fitness_function(X_train, y_train, X_val, y_val, filters1, filters2, kernel_size1, kernel_size2, kernel_size3, 
                       activation, optimizer, epochs=1, model_path=""):
    # Xây dựng mô hình ConvLSTM
    time_step, height, width, channels = X_train.shape[1:]
    input_lstm = (time_step, height, width, channels)
    model_lstm = conv_lstm(input_lstm, filters1, filters2, kernel_size1, kernel_size2, kernel_size3, activation)
    
    # Compile mô hình
    model_lstm.compile(optimizer=optimizer, loss='mean_squared_error')

    # Dừng sớm
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min', restore_best_weights=True)
    
    callbacks = [early_stopping]
    if model_path:
        model_checkpoint = ModelCheckpoint(model_path, monitor='val_loss', mode='min', save_best_only=True, verbose=1)
        callbacks.append(model_checkpoint)

    # Training model
    history = model_lstm.fit(X_train, y_train,
                             batch_size=4,
                             epochs=epochs, 
                             validation_data=(X_val, y_val),
                             callbacks=callbacks)
    
    # Hàm mục tiêu
    val_loss = np.min(history.history['val_loss'])

    if model_path:
        # Load the best model
        best_model = load_model(model_path)
    else:
        best_model = model_lstm

    return val_loss, best_model, history

In [26]:
# Hàm đánh giá cá thể trong quần thể
def evaluate(individual):
    if len(individual) < 4:  # Kiểm tra xem cá thể có đủ độ dài không
        return float('inf'),  # Trả về một giá trị fitness vô cực để loại bỏ cá thể này

    filters1, filters2, kernel_size1, kernel_size2, kernel_size3, activation = individual

    print("Train Adam")
    optimizer_Adam = keras.optimizers.Adam(learning_rate=1e-3)
    val_loss_adam, _, _ = fitness_function(X_train, y_train, X_val, y_val, filters1, filters2, 
                                             kernel_size1, kernel_size2, kernel_size3, activation, optimizer_Adam, epochs=50)
    return val_loss_adam,

    # print("Train YoGi")
    # optimizer_Yogi = tfa.optimizers.Yogi(learning_rate=1e-3)
    # val_loss_yogi, _, _ = fitness_function(X_train, y_train, X_val, y_val, filters1, filters2, 
    #                                          kernel_size1, kernel_size2, kernel_size3, activation, optimizer_Yogi, epochs=1)

    # if val_loss_adam < val_loss_yogi:
    #     return val_loss_adam,
    # else:
    #     return val_loss_yogi,

In [27]:
# Định nghĩa hàm tạo quần thể
def create_population(n, filters_func1, filters_func2, kernel_size_func1, kernel_size_func2, kernel_size_func3, activation_func):
    population = [creator.Individual([filters_func1(), filters_func2(), kernel_size_func1(), 
                                      kernel_size_func2(), kernel_size_func3(), activation_func()]) for _ in range(n)]
    return population

In [28]:
# Hàm đột biến cá thể
def mutate_individual(individual, indpb):
    if random.random() < indpb:
        individual[0] = random.choice([16, 32, 64, 128])  # filters1
    if random.random() < indpb:
        individual[1] = random.choice([16, 32, 64, 128])  # filters2
    if random.random() < indpb:
        individual[2] = random.choice([(3, 3), (5, 5), (7, 7)])  # kernel_size1
    if random.random() < indpb:
        individual[3] = random.choice([(3, 3), (5, 5), (7, 7)])  # kernel_size2
    if random.random() < indpb:
        individual[4] = random.choice([(3, 3), (5, 5), (7, 7)])  # kernel_size3
    if random.random() < indpb:
        individual[5] = random.choice(["tanh", "relu", "sigmoid"])  # activation

In [29]:
# Số lượng quần thể ban đầu
num_populations = 4
# Số lượng cá thể trong quần thể ban đầu
individuals = 5
# Quá trình lai ghép và đột biến
num_generations = 2

epochs = 200

In [30]:
# Khởi tạo hàm mục tiêu
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)

# Khởi tạo toolbox
toolbox = base.Toolbox()
toolbox.register("attr_filters", random.choice, [16, 32, 64, 128])
toolbox.register("attr_kernel_size", random.choice, [(3, 3), (5, 5), (7, 7)])
toolbox.register("attr_activation", random.choice, ["tanh", "relu", "sigmoid"])

# Đăng ký cá thể và quần thể
toolbox.register("individual", tools.initCycle, creator.Individual, 
                 (toolbox.attr_filters, toolbox.attr_filters, 
                  toolbox.attr_kernel_size, toolbox.attr_kernel_size, 
                  toolbox.attr_kernel_size, toolbox.attr_activation), n=1)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# Đăng ký các toán tử lai ghép, đột biến và chọn lọc
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", mutate_individual, indpb=0.15)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("evaluate", evaluate)

# Tạo và in quần thể ban đầu
populations = []
for i in range(num_populations):
    population = create_population(individuals, toolbox.attr_filters, toolbox.attr_filters, 
                                   toolbox.attr_kernel_size, toolbox.attr_kernel_size, toolbox.attr_kernel_size, 
                                   toolbox.attr_activation)
    populations.append(population)
    print(f"Quần thể ban đầu {i + 1}:")
    for j, ind in enumerate(population):
        print(f"Cá thể thứ {j}: {ind}")
    print()

# Đánh giá fitness và chọn lọc cá thể tốt nhất cho mỗi quần thể ban đầu
best_individuals = []
for population in populations:
    fits = toolbox.map(toolbox.evaluate, population)
    for fit, ind in zip(fits, population):
        ind.fitness.values = fit
    best_ind = min(population, key=lambda ind: ind.fitness.values)
    best_individuals.append(best_ind)
    print(f"Cá thể tốt nhất trong quần thể thứ {populations.index(population) + 1}: {best_ind} ban đầu")
print()

print("Bắt đầu quá trình lai ghép và đột biến...")
print()

# Quá trình lai ghép và đột biến
for gen in range(num_generations):
    # Tạo quần thể con bằng cách lai ghép và đột biến từ các cá thể tốt nhất
    offspring = algorithms.varAnd(best_individuals, toolbox, cxpb=0.7, mutpb=0.15)
    # offspring = algorithms.varAnd(best_individuals, toolbox, mutpb=0.15)
    print(f"Quần thể con F{gen + 1} đã được tiến hóa:")
    for i, ind in enumerate(offspring):
        print(f"Cá thể thứ {i}: {ind}")

    # Đánh giá fitness của quần thể con
    fits = toolbox.map(toolbox.evaluate, offspring)
    for fit, ind in zip(fits, offspring):
        ind.fitness.values = fit
    best_ind = min(offspring, key=lambda ind: ind.fitness.values)
    best_individuals.append(best_ind)
    print(f"Cá thể tốt nhất trong quần thể con F{gen + 1}: {best_ind}")

    # Chọn lọc cá thể để tạo quần thể mới cho thế hệ tiếp theo
    best_individuals = toolbox.select(offspring, k=len(best_individuals))
    print()

# In ra cá thể tốt nhất cuối cùng
best_ind = min(best_individuals, key=lambda ind: ind.fitness.values)
print("Cá thể tốt nhất cuối cùng:")
print(best_ind)

Quần thể ban đầu 1:
Cá thể thứ 0: [128, 128, (7, 7), (5, 5), (5, 5), 'relu']
Cá thể thứ 1: [32, 16, (5, 5), (7, 7), (7, 7), 'tanh']

Quần thể ban đầu 2:
Cá thể thứ 0: [32, 32, (5, 5), (3, 3), (7, 7), 'tanh']
Cá thể thứ 1: [16, 32, (5, 5), (3, 3), (5, 5), 'relu']

Quần thể ban đầu 3:
Cá thể thứ 0: [16, 32, (7, 7), (7, 7), (3, 3), 'tanh']
Cá thể thứ 1: [128, 128, (5, 5), (5, 5), (7, 7), 'tanh']

Quần thể ban đầu 4:
Cá thể thứ 0: [32, 32, (3, 3), (7, 7), (5, 5), 'relu']
Cá thể thứ 1: [32, 32, (7, 7), (5, 5), (5, 5), 'sigmoid']

Train Adam




Epoch 1/50


# Đánh giá và Test mô hình

### Các tham số từ cá thể tốt nhất

In [None]:
import matplotlib.pyplot as plt

def plot_loss_model(history):
    # Trích xuất giá trị loss trên tập huấn luyện và kiểm tra
    train_loss = history.history['loss']
    val_loss = history.history['val_loss']

    # Vẽ biểu đồ
    plt.plot(train_loss, marker='.', linestyle='-')
    plt.plot(val_loss, marker='', linestyle='-')
    plt.xlabel('Epoch')
    plt.ylabel('Validation Loss')
    plt.title('Validation Loss over Epochs')
    plt.yscale('log')  # Scale y-axis to logarithmic scale
    plt.show()

In [None]:
# Các tham số đầu vào mô hình LSTM
filters1, filters2, kernel_size1, kernel_size2, kernel_size3, activation = best_ind

print(filters1, filters2, kernel_size1, kernel_size2, kernel_size3, activation)

In [None]:
optimizer_Adam = keras.optimizers.Adam(learning_rate=1e-3)
model_path_Adam = './Ketqua_GA/ConvLSTM/ConvLSTM_Adam.hdf5'
_, model_lstm_Adam, history_lstm_Adam = fitness_function(X_train, y_train, X_val, y_val, filters1, filters2, 
                                                           kernel_size1, kernel_size2, kernel_size3, 
                                                           activation, optimizer_Adam, epochs=1, model_path=model_path_Adam)

In [None]:
plot_loss_model(history_lstm_Adam)

In [None]:
optimizer_Yogi = tfa.optimizers.Yogi(learning_rate=1e-3)
model_path_Yogi = './Ketqua_GA/ConvLSTM/ConvLSTM_Yogi.hdf5'
_, model_lstm_Yogi, history_lstm_Yogi = fitness_function(X_train, y_train, X_val, y_val, filters1, filters2, 
                                                           kernel_size1, kernel_size2, kernel_size3, activation, 
                                                           optimizer_Yogi, epochs=1, model_path=model_path_Yogi)

In [None]:
plot_loss_model(history_lstm_Yogi)

### Đánh giá mô hình

In [None]:
# Đánh giá mô hình trên tập test
loss1 = model_lstm_Adam.evaluate(X_val, np.array(y_val))
print(f'Val loss: {loss1}')

loss2 = model_lstm_Yogi.evaluate(X_val, np.array(y_val))
print(f'Val loss: {loss2}')

if loss1 < loss2:
    print("Best Model Adam")
    model = model_lstm_Adam
else:
    print("Best Model YoGi")
    model = model_lstm_Yogi

### Dự đoán 

In [None]:
def test_model(model, X_test, y_test):
    # Dự đoán ảnh mới
    predicted = model.predict(X_test)
    predicted_images = np.array(predicted)
    y_test = np.array(y_test)


    # Đánh giá MSE MAE R^2
    mse = mean_squared_error(y_test.flatten(), predicted_images.flatten())
    print("Mean Squared Error (MSE):", mse)
    mae = mean_absolute_error(y_test.flatten(), predicted_images.flatten())
    print("Mean Absolute Error (MAE):", mae)
    r2 = r2_score(y_test.flatten(), predicted_images.flatten())
    print("R-squared (R2) Score:", r2)

    # Hiển thị các ảnh dự đoán
    fig, axs = plt.subplots(1, len(predicted_images), figsize=(15, 25))
    for i in range(len(predicted_images)):
        axs[i].imshow(predicted_images[i])  # Hiển thị ảnh cuối cùng trong mỗi mẫu dự đoán
        axs[i].set_title(f"Ảnh dự đoán {i+1}")
    plt.show()

    return predicted_images

test_model(model, X_test, y_test)

In [None]:
test_model(model_lstm_Adam, X_test, y_test)

In [None]:
test_model(model_lstm_Yogi, X_test, y_test)