In [None]:
import os, sys, glob, time, math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, regularizers
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import (Input, BatchNormalization, Conv2D, Activation,
                                     Dense, GlobalAveragePooling2D, MaxPooling2D,
                                     ZeroPadding2D, Add, Flatten)

from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

In [None]:
path1 = r'D:/regression/1/'
path2 = r'D:/regression/2/'
path3 = r'D:/regression/3/'

dataframe1 = pd.read_excel(r'D:/regression/csdregression.xlsx', sheet_name=0)
dataframe2 = pd.read_excel(r'D:/regression/csdregression.xlsx', sheet_name=1)
dataframe3 = pd.read_excel(r'D:/regression/csdregression.xlsx', sheet_name=2)

In [None]:
def load_list(paths=[], dataframes=[]):
    files, target = [], []
    for i, dataframe in enumerate(dataframes):
        for _, row in dataframe.iterrows():
            folder_path = os.path.join(paths[i], row['image'])
            folder = os.listdir(folder_path)
            for file in folder:
                files.append(os.path.join(folder_path, file))
                target.append(row['dn/da'])
    return files, target

files, target = load_list(
    paths=[path1, path2, path3],
    dataframes=[dataframe1, dataframe2, dataframe3]
)

print("Total samples:", len(files), "Total targets:", len(target))

In [None]:
train_files, test_files, train_target, test_target = train_test_split(
    files, target, test_size=0.2, random_state=10
)
train_files, val_files, train_target, val_target = train_test_split(
    train_files, train_target, test_size=0.2, random_state=10
)

print("Train:", len(train_files), "Val:", len(val_files), "Test:", len(test_files))

In [None]:
min_train = float(np.min(train_target))
max_train = float(np.max(train_target))
print("train target min/max:", min_train, max_train)

def min_max_scaling(data, data_min, data_max, eps=1e-12):
    return (np.array(data, dtype=np.float32) - data_min) / (data_max - data_min + eps)

def inverse_min_max_scaling(normed, data_min, data_max):
    return np.array(normed, dtype=np.float32) * (data_max - data_min) + data_min

train_target = min_max_scaling(train_target, min_train, max_train)
val_target   = min_max_scaling(val_target,   min_train, max_train)
test_target  = min_max_scaling(test_target,  min_train, max_train)

In [None]:
class DataGenerator(keras.utils.Sequence):
    """Generates batches of (X, y) for Keras"""
    def __init__(self, files, target, batch_size=32, dim=(256,256), n_channels=10, shuffle=True):
        self.files = files
        self.target = np.array(target, dtype=np.float32)
        self.dim = dim
        self.batch_size = batch_size
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return len(self.files) // self.batch_size

    def __getitem__(self, index):
        idxs = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        f_batch = [self.files[i] for i in idxs]
        y_batch = self.target[idxs]
        X, y = self.__data_generation(f_batch, y_batch)
        return X, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.files))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __data_generation(self, f_batch, y_batch):
        X = np.empty((self.batch_size, *self.dim, self.n_channels), dtype=np.float32)
        y = y_batch.astype(np.float32)
        for i, f in enumerate(f_batch):
            X[i] = np.load(f, allow_pickle=True).astype(np.float32)  # expects (256,256,10)
        return X, y

training_generator   = DataGenerator(train_files, train_target, batch_size=32, shuffle=True)
validation_generator = DataGenerator(val_files,   val_target,   batch_size=32, shuffle=False)
test_generator       = DataGenerator(test_files,  test_target,  batch_size=1,  shuffle=False)

In [None]:
inp_vgg = Input(shape=(256, 256, 10), dtype='float32', name='input_vgg')
x = Conv2D(64, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(inp_vgg)
x = Conv2D(64, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = MaxPooling2D((2,2))(x)

x = Conv2D(128, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = Conv2D(128, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = MaxPooling2D((2,2))(x)

x = Conv2D(256, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = Conv2D(256, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = Conv2D(256, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = MaxPooling2D((2,2))(x)

x = Conv2D(512, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = Conv2D(512, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = Conv2D(512, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = MaxPooling2D((2,2))(x)

x = Conv2D(512, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = Conv2D(512, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = Conv2D(512, (3, 3), activation='relu', padding='same',
           kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(0.01))(x)
x = MaxPooling2D((2,2))(x)

x = Flatten()(x)
x = Dense(4096, kernel_initializer='he_normal', activation='relu')(x)
x = Dense(4096, kernel_initializer='he_normal', activation='relu')(x)
out_vgg = Dense(1)(x)
vgg16_model = Model(inp_vgg, out_vgg, name="VGG16_like_regressor")

In [None]:
inp_res = Input(shape=(256, 256, 10), dtype='float32', name='input_res')

def conv1_layer(x):
    x = ZeroPadding2D(padding=(3, 3))(x)
    x = Conv2D(64, (7, 7), strides=(2, 2))(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = ZeroPadding2D(padding=(1,1))(x)
    return x

def conv_block(x, filters, stride):
    shortcut = x
    f = filters
    x = Conv2D(f, (1, 1), strides=stride, padding='valid')(x)
    x = BatchNormalization()(x); x = Activation('relu')(x)
    x = Conv2D(f, (3, 3), strides=1, padding='same')(x)
    x = BatchNormalization()(x); x = Activation('relu')(x)
    x = Conv2D(4*f, (1, 1), strides=1, padding='valid')(x)
    shortcut = Conv2D(4*f, (1, 1), strides=stride, padding='valid')(shortcut)
    x = BatchNormalization()(x); shortcut = BatchNormalization()(shortcut)
    x = Add()([x, shortcut]); x = Activation('relu')(x)
    return x

def identity_block(x, filters):
    shortcut = x
    f = filters
    x = Conv2D(f, (1, 1), strides=1, padding='valid')(x)
    x = BatchNormalization()(x); x = Activation('relu')(x)
    x = Conv2D(f, (3, 3), strides=1, padding='same')(x)
    x = BatchNormalization()(x); x = Activation('relu')(x)
    x = Conv2D(4*f, (1, 1), strides=1, padding='valid')(x)
    x = BatchNormalization()(x)
    x = Add()([x, shortcut]); x = Activation('relu')(x)
    return x

def conv2_layer(x):
    x = MaxPooling2D((3, 3), strides=2)(x)
    x = conv_block(x, 64, stride=1)
    for _ in range(2):
        x = identity_block(x, 64)
    return x

def conv3_layer(x):
    x = conv_block(x, 128, stride=2)
    for _ in range(3):
        x = identity_block(x, 128)
    return x

def conv4_layer(x):
    x = conv_block(x, 256, stride=2)
    for _ in range(5):
        x = identity_block(x, 256)
    return x

def conv5_layer(x):
    x = conv_block(x, 512, stride=2)
    for _ in range(2):
        x = identity_block(x, 512)
    return x

x = conv1_layer(inp_res)
x = conv2_layer(x)
x = conv3_layer(x)
x = conv4_layer(x)
x = conv5_layer(x)
x = GlobalAveragePooling2D()(x)
out_res = Dense(1)(x)
resnet50_model = Model(inp_res, out_res, name="ResNet50_like_regressor")

In [None]:
models_to_train = [vgg16_model, resnet50_model]
names_to_train  = ["VGG16", "ResNet50"]

histories = []
early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, restore_best_weights=True)

for name, model in zip(names_to_train, models_to_train):
    print(f"\n=== Training {name} ===")
    checkpoint = ModelCheckpoint(f"bestdadn_{name}.h5", monitor='val_loss',
                                 verbose=1, save_best_only=True, mode='min')
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-4),
                  loss='mse',
                  metrics=[tf.keras.metrics.MeanSquaredError()])
    history = model.fit(training_generator,
                        epochs=200,
                        callbacks=[checkpoint, early_stopping],
                        validation_data=validation_generator)
    histories.append(history.history)
    model.save(f"finaldadn_{name}.h5")
    np.save(f'historydadn_{name}.npy', history.history)