## Bước 1: Tạo X, y là bộ dữ liệu từ ảnh png

In [2]:
# Merge all the code into a single script

import tensorflow as tf
import numpy as np
from PIL import Image
import os

def parse_png_to_tensor(image_path: str) -> tf.Tensor:
    """
    Parses a PNG image file into a TensorFlow tensor.

    This function is updated to be compatible with a TensorFlow training loop.
    It converts the image to RGB format, normalizes the pixel values to a
    0-1 range, and adds a batch dimension.

    Args:
        image_path (str): The file path to the PNG image.

    Returns:
        tf.Tensor: A TensorFlow tensor of the image with a shape of
                   (1, height, width, 3). Returns an empty tensor if the file
                   cannot be processed.
    """
    if not os.path.exists(image_path):
        print(f"Error: File not found at {image_path}")
        return tf.empty(0)

    try:
        # Open the image file
        image = Image.open(image_path)
        
        # Convert the image to RGB format. This handles different modes like RGBA or P.
        image = image.convert('RGB')
        
        # Convert the PIL Image object to a NumPy array.
        # The shape of the array will be (height, width, channels)
        numpy_array = np.array(image, dtype=np.float32)
        
        # Normalize the pixel values from 0-255 to 0.0-1.0
        normalized_array = numpy_array / 255.0
        
        # Convert the NumPy array to a TensorFlow tensor.
        # The tensor will have a shape of (height, width, channels)
        image_tensor = tf.convert_to_tensor(normalized_array)
        
        # Add a batch dimension at the beginning to match the expected
        # input shape for a model (batch_size, height, width, channels)
        final_tensor = tf.expand_dims(image_tensor, axis=0)

        return final_tensor

    except Exception as e:
        print(f"An error occurred while processing the image: {e}")
        return tf.empty(0)


# if __name__ == "__main__":
#     # Create a dummy PNG file for testing
#     dummy_image_path = "test_image.png"
    
#     # Create a 100x100 white image with RGB channels
#     dummy_image = Image.new('RGB', (100, 100), 'white')
#     dummy_image.save(dummy_image_path)
#     print(f"Created a dummy image at: {dummy_image_path}")

#     # Call the parse function on the dummy image
#     image_tensor = parse_png_to_tensor(dummy_image_path)

#     # Check the result
#     if image_tensor.shape.num_elements() > 0:
#         print("\nSuccessfully converted image to TensorFlow tensor!")
#         print(f"Tensor shape: {image_tensor.shape}")
#         print(f"Data type: {image_tensor.dtype}")
#     else:
#         print("\nFailed to convert image to tensor.")

#     # Clean up the dummy file
#     os.remove(dummy_image_path)
#     print(f"\nCleaned up dummy file: {dummy_image_path}")


IMG_SIZE = [144, 192]
labels = [l for l in os.listdir("./dataset") if l != ".DS_Store"]

X_list = []
y_list = []
for i, label in enumerate(labels):
    img_list = os.listdir(f"./dataset/{label}/")
    for img in img_list:
        img_path = f"./dataset/{label}/{img}"
        tensor = parse_png_to_tensor(img_path)
        # Resize and check for valid tensor
        if tensor.shape.num_elements() > 0:
            resized = tf.image.resize(tf.squeeze(tensor, axis=0), IMG_SIZE)
            X_list.append(resized)
            y_list.append(i)

X = tf.stack(X_list, axis=0)
y = tf.convert_to_tensor(y_list, dtype=tf.int32)
print(X.shape, y.shape)



(2712, 144, 192, 3) (2712,)


In [3]:
X.shape, y.shape

(TensorShape([2712, 144, 192, 3]), TensorShape([2712]))

In [3]:
import tensorflow as tf

IMG_SIZE = [144,192]
NUM_CLASSES = 12
BATCH = 32
AUTOTUNE = tf.data.AUTOTUNE


## CNN without Keras:

In [10]:
def he_init(shape):
    fan_in = tf.cast(tf.math.reduce_prod(shape[:-1]), tf.float32)
    std = tf.sqrt(2.0/fan_in)
    return tf.random.normal(shape, stddev=std)

class SimpleCNN:
    def __init__(self, num_classes=12):
        self.W1 = tf.Variable(he_init([3,3,3,32]));  self.b1 = tf.Variable(tf.zeros([32]))
        self.W2 = tf.Variable(he_init([3,3,32,64])); self.b2 = tf.Variable(tf.zeros([64]))
        self.W3 = tf.Variable(he_init([3,3,64,128]));self.b3 = tf.Variable(tf.zeros([128]))
        
        # We will initialize the weights for the dense layers dynamically
        # based on the input image shape.
        self.W4 = None
        self.b4 = None
        self.W5 = tf.Variable(he_init([256, num_classes]))
        self.b5 = tf.Variable(tf.zeros([num_classes]))
        self.weights_initialized = False

    def __call__(self, x, training=False, drop_rate=0.3):
        # x: [B, H, W, 3]
        x = tf.nn.conv2d(x, self.W1, strides=1, padding='SAME') + self.b1
        x = tf.nn.relu(x)
        x = tf.nn.max_pool2d(x, ksize=2, strides=2, padding='SAME')

        x = tf.nn.conv2d(x, self.W2, strides=1, padding='SAME') + self.b2
        x = tf.nn.relu(x)
        x = tf.nn.max_pool2d(x, ksize=2, strides=2, padding='SAME')

        x = tf.nn.conv2d(x, self.W3, strides=1, padding='SAME') + self.b3
        x = tf.nn.relu(x)
        x = tf.nn.max_pool2d(x, ksize=2, strides=2, padding='SAME')

        if not self.weights_initialized:
            # Dynamically calculate the flattened dimension and initialize weights
            # This is done the first time the model is called with a new input shape
            flat_dim = tf.math.reduce_prod(x.shape[1:])
            self.W4 = tf.Variable(he_init([flat_dim, 256]))
            self.b4 = tf.Variable(tf.zeros([256]))
            self.weights_initialized = True

        x = tf.reshape(x, [-1, tf.math.reduce_prod(x.shape[1:])])
        x = tf.matmul(x, self.W4) + self.b4
        x = tf.nn.relu(x)

        if training and drop_rate > 0:
            keep = 1.0 - drop_rate
            mask = tf.cast(tf.random.uniform(tf.shape(x)) < keep, x.dtype)
            x = (x * mask) / keep

        logits = tf.matmul(x, self.W5) + self.b5
        return logits

    @property
    def variables(self):
        # Make sure to return all variables, including the dynamically created ones
        return [self.W1,self.b1,self.W2,self.b2,self.W3,self.b3,self.W4,self.b4,self.W5,self.b5]

import math

def cross_entropy_loss(logits, onehot_labels):
    per_ex = tf.nn.softmax_cross_entropy_with_logits(labels=onehot_labels, logits=logits)
    return tf.reduce_mean(per_ex)

def accuracy(logits, onehot_labels):
    pred = tf.argmax(logits, axis=1)
    true = tf.argmax(onehot_labels, axis=1)
    return tf.reduce_mean(tf.cast(tf.equal(pred, true), tf.float32))

class SGD:
    def __init__(self, lr=1e-3, momentum=0.9):
        self.lr = lr
        self.momentum = momentum
        self.v = {}

    def apply_gradients(self, grads, vars):
        for g, v in zip(grads, vars):
            if g is None:
                continue
            key = id(v)
            if key not in self.v:
                self.v[key] = tf.zeros_like(v)
            self.v[key] = self.momentum*self.v[key] + g
            v.assign_sub(self.lr * self.v[key])

def train(model, ds_train, ds_val, epochs=15, lr=1e-3):
    opt = SGD(lr=lr, momentum=0.9)
    for ep in range(1, epochs+1):
        tr_loss = tf.metrics.Mean(); tr_acc = tf.metrics.Mean()
        for x,y in ds_train:
            with tf.GradientTape() as tape:
                logits = model(x, training=True)
                loss = cross_entropy_loss(logits, y)
            grads = tape.gradient(loss, model.variables)
            opt.apply_gradients(grads, model.variables)

            tr_loss.update_state(loss)
            tr_acc.update_state(accuracy(logits, y))

        va_loss = tf.metrics.Mean(); va_acc = tf.metrics.Mean()
        for x,y in ds_val:
            logits = model(x, training=False)
            va_loss.update_state(cross_entropy_loss(logits, y))
            va_acc.update_state(accuracy(logits, y))

        print(f"Epoch {ep:02d} | train_loss={tr_loss.result():.4f} acc={tr_acc.result():.4f} "
              f"| val_loss={va_loss.result():.4f} acc={va_acc.result():.4f}")


In [11]:
if __name__ == "__main__":
    # --- Simulate pre-existing X and y tensors as requested ---
    NUM_CLASSES = 12
    NUM_IMAGES = 2712
    IMAGE_HEIGHT = 144
    IMAGE_WIDTH = 192

    print("\nSimulating pre-existing X and y tensors...")


    print(f"Loaded X shape: {X.shape}")
    print(f"Loaded y shape: {y.shape}")

    # --- Prepare the data for training ---
    # Convert labels to one-hot encoding
    y_one_hot = tf.one_hot(y, NUM_CLASSES)
    
    # Shuffle and split the dataset
    dataset_size = len(y)
    train_size = int(0.8 * dataset_size)
    
    full_dataset = tf.data.Dataset.from_tensor_slices((X, y_one_hot))
    full_dataset = full_dataset.shuffle(buffer_size=dataset_size)
    
    ds_train = full_dataset.take(train_size).batch(8)
    ds_val = full_dataset.skip(train_size).batch(8)
    
    # --- Train the model ---
    print("\nStarting CNN training...")
    model = SimpleCNN(num_classes=NUM_CLASSES)
    train(model, ds_train, ds_val, epochs=5, lr=1e-3)


Simulating pre-existing X and y tensors...
Loaded X shape: (2712, 144, 192, 3)
Loaded y shape: (2712,)

Starting CNN training...
Epoch 01 | train_loss=2.3696 acc=0.1949 | val_loss=1.9582 acc=0.3999
Epoch 02 | train_loss=1.6791 acc=0.4260 | val_loss=1.2268 acc=0.6279
Epoch 03 | train_loss=1.0446 acc=0.6425 | val_loss=0.8838 acc=0.7027
Epoch 04 | train_loss=0.7108 acc=0.7583 | val_loss=0.5284 acc=0.8564
Epoch 05 | train_loss=0.4624 acc=0.8456 | val_loss=0.3005 acc=0.9152


### Test frame:

In [14]:
test = X[:8]
res = [tf.argmax(model(test)[i]).numpy() for i in range(8)]
print(res)
# print((model(X[:8], training=False)))
print(y[:8].numpy())


[0, 11, 0, 0, 0, 0, 0, 0]
[0 0 0 0 0 0 0 0]


## CNN with Keras:

In [17]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping

# Utils function
# Save best model check point
checkpoint_cb = ModelCheckpoint("cnn_best.h5", 
                                save_best_only=True, 
                                monitor="val_accuracy",    
                                mode="max",                
                                save_weights_only=True,     
                                verbose=1)

# He initializer function
def he_init(shape, dtype=None):
    fan_in = tf.cast(tf.reduce_prod(shape[:-1]), tf.float32)
    std = tf.sqrt(2.0 / fan_in)
    return tf.random.normal(shape, stddev=std)

NUM_CLASSES = 12 

# Early stopping

earlystop_cb = EarlyStopping(
    patience=5, restore_best_weights=True,
    monitor="val_loss", mode="min"
)



CNN = keras.Sequential([
    # Block 1
    layers.Conv2D(32, (3, 3), padding="same",
                  kernel_initializer=he_init, activation=None,
                  input_shape=(144, 192, 3)),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),

    # Block 2
    layers.Conv2D(64, (3, 3), padding="same",
                  kernel_initializer=he_init, activation=None),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),

    # Block 3
    layers.Conv2D(128, (3, 3), padding="same",
                  kernel_initializer=he_init, activation=None),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),

    # Dense head
    layers.Flatten(),
    layers.Dense(256, kernel_initializer=he_init, activation=None),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.Dropout(0.3),

    # Output
    layers.Dense(NUM_CLASSES, activation="softmax", kernel_initializer=he_init),
])

# Split train/val
X_np = X.numpy() if isinstance(X, tf.Tensor) else X
y_np = y.numpy() if isinstance(y, tf.Tensor) else y
X_train, X_val, y_train, y_val = train_test_split(
    X_np, y_np, test_size=0.2, random_state=42, stratify=y_np
)

# Compile (match activation choice!)
CNN.compile(
    optimizer=keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),  # softmax used
    metrics=["accuracy"]
)

# Train
history = CNN.fit(
    X_train, y_train,
    epochs=20,
    batch_size=8,
    validation_data=(X_val, y_val),
    shuffle=True,
    callbacks=[checkpoint_cb, earlystop_cb]
)





Epoch 1/20


2025-09-16 07:34:50.950703: E tensorflow/core/grappler/optimizers/meta_optimizer.cc:961] model_pruner failed: INVALID_ARGUMENT: Graph does not contain terminal node Adam/AssignAddVariableOp.


Epoch 1: val_accuracy improved from -inf to 0.57459, saving model to cnn_best.h5
Epoch 2/20
Epoch 2: val_accuracy improved from 0.57459 to 0.61326, saving model to cnn_best.h5
Epoch 3/20
Epoch 3: val_accuracy improved from 0.61326 to 0.76243, saving model to cnn_best.h5
Epoch 4/20
Epoch 4: val_accuracy improved from 0.76243 to 0.76611, saving model to cnn_best.h5
Epoch 5/20
Epoch 5: val_accuracy improved from 0.76611 to 0.78821, saving model to cnn_best.h5
Epoch 6/20
Epoch 6: val_accuracy improved from 0.78821 to 0.79742, saving model to cnn_best.h5
Epoch 7/20
Epoch 7: val_accuracy improved from 0.79742 to 0.92449, saving model to cnn_best.h5
Epoch 8/20
Epoch 8: val_accuracy did not improve from 0.92449
Epoch 9/20
Epoch 9: val_accuracy did not improve from 0.92449
Epoch 10/20
Epoch 10: val_accuracy did not improve from 0.92449
Epoch 11/20
Epoch 11: val_accuracy did not improve from 0.92449
Epoch 12/20
Epoch 12: val_accuracy improved from 0.92449 to 0.93370, saving model to cnn_best.h5


In [18]:
# After training, load back the best weights:
CNN.load_weights("cnn_best.h5")

# Now CNN is restored to the best epoch
val_loss, val_acc = CNN.evaluate(X_val, y_val)
print(f"Best model -> val_acc={val_acc:.4f}, val_loss={val_loss:.4f}")

Best model -> val_acc=0.9337, val_loss=0.1940


In [27]:
test = X_val[:8]
print(test.shape)
# res = [tf.argmax(CNN.predict(test[i])).numpy() for i in range(8)]
res = CNN.predict(test)
for i in range(8):
    print(tf.argmax(res[i]).numpy())
# print(res)
# print((model(X[:8], training=False)))
print(y_val[:8])


(8, 144, 192, 3)
3
1
1
0
5
0
3
0
[3 6 2 0 5 0 3 0]


## CNN With pretrained model: VGG16

In [9]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Dropout, BatchNormalization, ReLU
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

# --- Utils ---
def he_init(shape, dtype=None):
    fan_in = tf.cast(tf.reduce_prod(shape[:-1]), tf.float32)
    std = tf.sqrt(2.0 / fan_in)
    return tf.random.normal(shape, stddev=std)

NUM_CLASSES = 12

checkpoint_cb = ModelCheckpoint(
    "cnn_best.h5",
    save_best_only=True,
    monitor="val_accuracy",
    mode="max",
    save_weights_only=True,
    verbose=1,
)

earlystop_cb = EarlyStopping(
    patience=5, restore_best_weights=True,
    monitor="val_loss", mode="min",
)

# --- Base model (no top) ---
base_model = VGG16(weights="imagenet", include_top=False, input_shape=(144, 192, 3))
base_model.trainable = False  # freeze for transfer learning

# Pick a truncation point, e.g. after block2_pool or block3_pool
truncated_output = base_model.get_layer("block3_pool").output  # or "block3_pool"
truncated_base = keras.Model(inputs=base_model.input, outputs=truncated_output)
truncated_base.trainable = False

# --- Build model ---
inputs = Input(shape=(144, 192, 3))

# If your images are in [0,255]: use preprocess_input(inputs)
# If your images are in [0,1]: multiply by 255 first
x = preprocess_input(inputs * 255.0)

x = truncated_base(x, training=False)
x = GlobalAveragePooling2D()(x)  # <-- only once!

# Small head
x = Dense(256, kernel_initializer=he_init, activation=None)(x)
x = BatchNormalization()(x)
x = ReLU()(x)
x = Dropout(0.5)(x)

outputs = Dense(NUM_CLASSES, activation="softmax", kernel_initializer=he_init)(x)

vgg_model = keras.Model(inputs, outputs, name="VGG16_transfer")

# --- Compile ---
vgg_model.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=False),
    metrics=["accuracy"],
)

# --- Train (example) ---
# history = vgg_model.fit(
#     X_train, y_train,
#     validation_data=(X_val, y_val),
#     epochs=20,
#     batch_size=32,
#     callbacks=[checkpoint_cb, earlystop_cb],
#     shuffle=True,
# )
# vgg_model.load_weights("cnn_best.h5")  # ensure best weights loaded




In [10]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping

X_np = X.numpy() if isinstance(X, tf.Tensor) else X
y_np = y.numpy() if isinstance(y, tf.Tensor) else y
X_train, X_val, y_train, y_val = train_test_split(
    X_np, y_np, test_size=0.2, random_state=42, stratify=y_np
)


vgg_model.fit(
    X_train, y_train,
    epochs=20,
    batch_size=8,
    validation_data=(X_val, y_val),
    shuffle=True,
    callbacks=[checkpoint_cb, earlystop_cb]
)



Epoch 1/20


2025-09-16 11:14:42.778459: E tensorflow/core/grappler/optimizers/meta_optimizer.cc:961] model_pruner failed: INVALID_ARGUMENT: Graph does not contain terminal node Adam/AssignAddVariableOp.


Epoch 1: val_accuracy improved from -inf to 0.50829, saving model to cnn_best.h5
Epoch 2/20
Epoch 2: val_accuracy improved from 0.50829 to 0.58748, saving model to cnn_best.h5
Epoch 3/20
Epoch 3: val_accuracy did not improve from 0.58748
Epoch 4/20
Epoch 4: val_accuracy improved from 0.58748 to 0.59484, saving model to cnn_best.h5
Epoch 5/20
Epoch 5: val_accuracy improved from 0.59484 to 0.81031, saving model to cnn_best.h5
Epoch 6/20
Epoch 6: val_accuracy improved from 0.81031 to 0.81584, saving model to cnn_best.h5
Epoch 7/20
Epoch 7: val_accuracy did not improve from 0.81584
Epoch 8/20
Epoch 8: val_accuracy improved from 0.81584 to 0.83978, saving model to cnn_best.h5
Epoch 9/20
Epoch 9: val_accuracy improved from 0.83978 to 0.86004, saving model to cnn_best.h5
Epoch 10/20
Epoch 10: val_accuracy did not improve from 0.86004
Epoch 11/20
Epoch 11: val_accuracy improved from 0.86004 to 0.86924, saving model to cnn_best.h5
Epoch 12/20
Epoch 12: val_accuracy did not improve from 0.86924


KeyboardInterrupt: 