In [1]:
import os, math, random
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.applications import Xception
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import Sequence
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
import joblib
import pandas as pd

In [2]:
# --- config ---
DATA_DIR = "./DataTrain/images"
VOLUME_FILES_DIR = "./DataTrain/labels"
MODEL_SAVE_PATH = "./mangosteen_volume_model_aug.h5"
SCALER_PATH = "./volume_scaler.save"
IMG_SIZE = 224
BATCH_SIZE = 32
INITIAL_EPOCHS = 200
FINE_TUNE_EPOCHS = 0
LEARNING_RATE_INITIAL = 1e-3
LEARNING_RATE_FINE_TUNE = 1e-6

In [3]:

# --- augmentation helpers ---
def random_brightness_contrast(image):
    alpha = np.random.uniform(0.8, 1.2)
    beta = np.random.uniform(-0.2, 0.2) * 255.0
    out = image * alpha + beta
    return np.clip(out, 0.0, 255.0)

def random_flip_rotate_scale_crop(img):
    # flip
    if random.random() < 0.5:
        img = cv2.flip(img, 1)  # horizontal
    # rotate
    angle = random.uniform(-15, 15)
    M = cv2.getRotationMatrix2D((IMG_SIZE//2, IMG_SIZE//2), angle, 1)
    img = cv2.warpAffine(img, M, (IMG_SIZE, IMG_SIZE), borderMode=cv2.BORDER_REFLECT)
    # scale
    scale = random.uniform(0.9, 1.1)
    img = cv2.resize(img, None, fx=scale, fy=scale)
    # center crop / resize back
    h, w = img.shape[:2]
    startx = max(0, w//2 - IMG_SIZE//2)
    starty = max(0, h//2 - IMG_SIZE//2)
    img = img[starty:starty+IMG_SIZE, startx:startx+IMG_SIZE]
    img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
    return img


In [4]:
# --- Sequence implementation ---
class MangosteenSequence(Sequence):
    def __init__(self, image_paths, volumes, batch_size, img_size, is_training=True, shuffle=True):
        self.image_paths = list(image_paths)
        self.volumes = list(volumes)
        self.batch_size = batch_size
        self.img_size = img_size
        self.is_training = is_training
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return math.ceil(len(self.image_paths) / self.batch_size)

    def __getitem__(self, idx):
        start = idx * self.batch_size
        end = min(start + self.batch_size, len(self.image_paths))
        batch_paths = self.image_paths[start:end]
        batch_vols = self.volumes[start:end]

        images = []
        for p in batch_paths:
            img = cv2.imread(p)
            if img is None:
                print(f"⚠️ Warning: cv2.imread failed for {p}")
                img = np.zeros((self.img_size, self.img_size, 3), dtype=np.uint8)
            else:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                img = cv2.resize(img, (self.img_size, self.img_size))
            img = img.astype('float32')
            if self.is_training:
                img = random_brightness_contrast(img)
                img = random_flip_rotate_scale_crop(img)
            images.append(img)
        images = np.stack(images, axis=0) / 255.0
        vols = np.array(batch_vols, dtype='float32')
        return images, vols

    def on_epoch_end(self):
        if self.shuffle and self.is_training:
            combined = list(zip(self.image_paths, self.volumes))
            random.shuffle(combined)
            self.image_paths, self.volumes = zip(*combined)
            self.image_paths = list(self.image_paths)
            self.volumes = list(self.volumes)

In [5]:
# --- load data ---
def load_data_from_folders(data_dir, volume_dir):
    image_paths, volumes = [], []
    for filename in os.listdir(data_dir):
        if filename.lower().endswith(('.png','.jpg','.jpeg')):
            base = filename.split('.',1)[0]
            vol_file = os.path.join(volume_dir, base + ".txt")
            if os.path.exists(vol_file):
                try:
                    with open(vol_file,'r') as f:
                        v = float(f.read().strip())
                    image_paths.append(os.path.join(data_dir, filename))
                    volumes.append(v)
                except:
                    pass
    return image_paths, volumes

print("Loading training data...")
image_paths, volumes = load_data_from_folders(DATA_DIR, VOLUME_FILES_DIR)
if not image_paths:
    raise SystemExit("❌ No data found.")

Loading training data...


In [6]:
# --- normalize volume ---
scaler = StandardScaler()
volumes_scaled = scaler.fit_transform(np.array(volumes).reshape(-1,1)).flatten()
joblib.dump(scaler, SCALER_PATH)  

['./volume_scaler.save']

In [7]:
# --- train/val split ---
train_paths, val_paths, train_volumes, val_volumes = train_test_split(
    image_paths, volumes_scaled, test_size=0.2, random_state=42
)

train_seq = MangosteenSequence(train_paths, train_volumes, BATCH_SIZE, IMG_SIZE, is_training=True)
val_seq = MangosteenSequence(val_paths, val_volumes, BATCH_SIZE, IMG_SIZE, is_training=False, shuffle=False)


In [8]:

# --- build model ---
base_model = Xception(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE,3))
x = GlobalAveragePooling2D()(base_model.output)
x = Dense(1024, activation='relu', kernel_regularizer=l2(0.01))(x)
x = Dropout(0.3)(x)
x = Dense(512, activation='relu', kernel_regularizer=l2(0.01))(x)
predictions = Dense(1, activation='linear')(x)
model = Model(inputs=base_model.input, outputs=predictions)

for layer in base_model.layers:
    layer.trainable = False

model.compile(optimizer=Adam(learning_rate=LEARNING_RATE_INITIAL),
              loss='huber', metrics=['mean_absolute_error'])

callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)
]

In [9]:
# --- train regression head ---
print("Training regression head...")
model.fit(train_seq, validation_data=val_seq, epochs=INITIAL_EPOCHS, callbacks=callbacks, verbose=1)


Training regression head...


  self._warn_if_super_not_called()


Epoch 1/200
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 3s/step - loss: 18.9051 - mean_absolute_error: 1.6747 - val_loss: 15.0325 - val_mean_absolute_error: 0.8047 - learning_rate: 0.0010
Epoch 2/200
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 2s/step - loss: 12.8050 - mean_absolute_error: 0.7484 - val_loss: 10.4571 - val_mean_absolute_error: 0.6987 - learning_rate: 0.0010
Epoch 3/200
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 2s/step - loss: 8.8820 - mean_absolute_error: 0.6187 - val_loss: 7.3436 - val_mean_absolute_error: 0.6602 - learning_rate: 0.0010
Epoch 4/200
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 2s/step - loss: 6.3379 - mean_absolute_error: 0.5795 - val_loss: 5.3383 - val_mean_absolute_error: 0.5944 - learning_rate: 0.0010
Epoch 5/200
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 2s/step - loss: 4.6940 - mean_absolute_error: 0.5446 - val_loss: 4.0944 - val_mean_absolute

<keras.src.callbacks.history.History at 0x23697b32250>

In [10]:
# --- fine-tune last block only ---
for layer in base_model.layers:
    if 'block14' in layer.name:  # ปลดล็อกเฉพาะ block14
        layer.trainable = True

model.compile(optimizer=Adam(learning_rate=LEARNING_RATE_FINE_TUNE),
              loss='huber', metrics=['mean_absolute_error'])

print("Fine-tuning base model (last block)...")
model.fit(train_seq, validation_data=val_seq, epochs=FINE_TUNE_EPOCHS, callbacks=callbacks, verbose=1)


Fine-tuning base model (last block)...


<keras.src.callbacks.history.History at 0x236acfcfb50>

In [11]:
# --- save model ---
model.save(MODEL_SAVE_PATH)
print(f"✅ Model saved to {MODEL_SAVE_PATH}")



✅ Model saved to ./mangosteen_volume_model_aug.h5
