# Đọc vào dữ liệu  
B1. Đọc vào full mọi ảnh từ 1 đường dẫn (trong thư mục này bao gồm 3 thư mục tương ứng 3 nhãn)  
B2. Cân bằng lại số lượng ảnh của từng nhãn (phép xoay, phép lật, làm nhiễu)  
B3. Phân chia train:valid:test theo tỷ lệ 65:25:10  

In [None]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle


# B1: Đọc vào full mọi ảnh từ đường dẫn
def load_images_from_folder(folder):
    images = []
    labels = []
    for label in os.listdir(folder):
        label_path = os.path.join(folder, label)
        if os.path.isdir(label_path):
            for filename in os.listdir(label_path):
                img_path = os.path.join(label_path, filename)
                if img_path.endswith(".jpg") or img_path.endswith(".png"):
                    img = cv2.imread(img_path)
                    img = cv2.resize(img, (224, 224))
                    if img is not None:
                        images.append(img)
                        labels.append(label)
    return images, labels


folder_path = "/mnt/DataK/Project/ThyroidCancer/data/origin_data/B256"
images, labels = load_images_from_folder(folder_path)

# B2: Cân bằng lại số lượng ảnh của từng nhãn
# Tính số lượng ảnh của từng nhãn
label_counts = {}
for label in labels:
    label_counts[label] = label_counts.get(label, 0) + 1
print(f"Số lượng ảnh từng nhãn: {label_counts}")

# Xác định số lượng ảnh tối đa của mỗi nhãn
max_count = max(label_counts.values())
print(f"Số ảnh có nhiều nhất ở 1 nhãn: {max_count}")

# Phép xoay, lật, làm nhiễu để cân bằng số lượng ảnh
import random


def rotate_image(image, angle):
    if angle == 90:
        rotated_image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
    elif angle == 180:
        rotated_image = cv2.rotate(image, cv2.ROTATE_180)
    elif angle == 270:
        rotated_image = cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE)
    else:
        raise ValueError(
            "Invalid rotation angle. Please choose from 90, 180, or 270 degrees."
        )
    return rotated_image


def random_rotation(image):
    angle = np.random.choice([90, 180, 270])  # Chỉ chọn góc quay là bội số của 5 độ
    return rotate_image(image, angle)


def perform_augmentation(image):
    # Xác định phép biến đổi ngẫu nhiên
    augmentation_type = random.choice(["rotate", "flip", "noise"])

    # Xoay ảnh ngẫu nhiên
    if augmentation_type == "rotate":
        image = random_rotation(image)

    # Lật ảnh ngẫu nhiên (theo chiều ngang hoặc dọc)
    elif augmentation_type == "flip":
        flip_code = random.choice(
            [-1, 1]
        )  # Ngẫu nhiên chọn lật theo chiều ngang, dọc hoặc không lật
        image = cv2.flip(image, flip_code)

    # Thêm nhiễu ngẫu nhiên
    elif augmentation_type == "noise":
        noise = np.random.normal(0, 16, image.shape).astype(
            "uint8"
        )  # Tạo nhiễu ngẫu nhiên có giá trị trung bình là 0 và phương sai là 16
        image = cv2.add(image, noise)  # Thêm nhiễu vào ảnh

    # Trường hợp không thực hiện phép biến đổi nào
    # else:
    #     pass

    return image


balanced_images = []
balanced_labels = []
for label in label_counts:
    label_images = [img for img, lbl in zip(images, labels) if lbl == label]
    label_count = label_counts[label]
    while len(label_images) < max_count:
        img = label_images[np.random.randint(0, label_count)]
        # Áp dụng các phép biến đổi vào ảnh
        # Ví dụ: img = perform_augmentation(img)
        img = perform_augmentation(img)
        balanced_images.append(img)
        balanced_labels.append(label)

# B3: Phân chia train:valid:test theo tỷ lệ 65:25:10
X_train, X_temp, y_train, y_temp = train_test_split(
    balanced_images, balanced_labels, test_size=0.35, random_state=42
)
X_valid, X_test, y_valid, y_test = train_test_split(
    X_temp, y_temp, test_size=0.1, random_state=42
)

# Thiết lập kiến trúc mạng ResNet  

In [None]:
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
# Chuyển đổi danh sách ảnh thành mảng numpy
X_train = np.array(X_train)
X_valid = np.array(X_valid)
X_test = np.array(X_test)

# Chuẩn bị dữ liệu cho model.fit()
train_datagen = ImageDataGenerator(rescale=1./255)
valid_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow(X_train, y_train, batch_size=32)
valid_generator = valid_datagen.flow(X_valid, y_valid, batch_size=32)
test_generator = test_datagen.flow(X_test, y_test, batch_size=32)

In [None]:
# Load pre-trained ResNet50 model
base_model = ResNet50(weights='imagenet', include_top=False)

# Add custom layers for your task
x = base_model.output
# x = Flatten()(x) # Add Flatten or Pooling ?
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = Dropout(0.2)(x)  # Add a dropout layer to prevent overfitting
x = Dense(512, activation='relu')(x)  # Add another dense layer
x = Dropout(0.25)(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.3)(x)
x = Dense(128, activation='relu')(x)
x = Dropout(0.35)(x)
predictions = Dense(3, activation='sigmoid')(x)

# Combine base model with custom layers
model = Model(inputs=base_model.input, outputs=predictions)

# Freeze layers of the base model
for layer in base_model.layers:
    layer.trainable = False

# Model summary
model.summary()

In [None]:
# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Training model  
B1. Train model và lưu lại trọng số sau mỗi epoch  
B2. Vẽ biểu đồ accuracy và loss function qua các epoch  

In [None]:
# from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, History
# import matplotlib.pyplot as plt

# Khởi tạo các callbacks
# checkpoint = ModelCheckpoint("best_model.h5", monitor="val_accuracy", save_best_only=True, mode="max", verbose=1)
# early_stopping = EarlyStopping(monitor="val_accuracy", patience=5, mode="max", verbose=1)
# history = History()

# model.fit(
#     train_generator,
#     steps_per_epoch=train_generator.n // train_generator.batch_size,
#     epochs=10,
#     validation_data=validation_generator,
#     validation_steps=validation_generator.n // validation_generator.batch_size
#     callbacks=[checkpoint, early_stopping, history]
# )

# # Hiển thị biểu đồ về quá trình training
# plt.plot(history.history['accuracy'])
# plt.plot(history.history['val_accuracy'])
# plt.title('Model Accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.legend(['Train', 'Validation'], loc='upper left')
# plt.show()

In [None]:
from tensorflow.keras.callbacks import Callback

class CustomModelCheckpoint(Callback):
    def __init__(self, filepath, monitor='val_accuracy', save_best_only=True, mode='max'):
        super(CustomModelCheckpoint, self).__init__()
        self.filepath = filepath
        self.monitor = monitor
        self.save_best_only = save_best_only
        self.mode = mode
        self.best_acc = -1

    def on_epoch_end(self, epoch, logs=None):
        current_acc = logs.get(self.monitor)
        if current_acc is None:
            raise ValueError("The monitored metric does not exist.")
        
        # Kiểm tra xem accuracy trên cả tập train và tập validation có tăng không
        if current_acc > self.best_acc:
            self.best_acc = current_acc
            self.model.save(self.filepath)
            print(f"Model saved with {self.monitor} = {current_acc:.4f}")

# Khởi tạo callback CustomModelCheckpoint
custom_checkpoint = CustomModelCheckpoint("best_model.h5")

# Huấn luyện mô hình với callbacks
history = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // train_generator.batch_size,
    epochs=10,
    validation_data=valid_generator,
    validation_steps=len(X_valid) // validation_generator.batch_size,
    callbacks=[custom_checkpoint]
)

# Vẽ biểu đồ sự thay đổi của accuracy qua các epoch
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

# Vẽ biểu đồ sự thay đổi của loss function qua các epoch
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
# # Đánh giá mô hình trên dữ liệu đánh giá
# score = model.evaluate(validation_generator, verbose=0)
# print("Test loss:", score[0])
# print("Test accuracy:", score[1])

# # Fine-tuning (tuỳ chọn)
# for layer in model.layers[:100]:
#     layer.trainable = False
# for layer in model.layers[100:]:
#     layer.trainable = True

# Thực hiện test thử trên tập test

In [None]:
# Đánh giá hiệu suất của mô hình trên tập test
test_loss, test_accuracy = model.evaluate(test_generator)

# In ra kết quả
print("Test loss:", test_loss)
print("Test accuracy:", test_accuracy)