In [None]:
# -*- coding: utf-8 -*-

import os
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from tqdm import tqdm
import joblib

import tensorflow as tf
from tensorflow.keras.utils import Sequence
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Dense, Dropout, Flatten, Input
from tensorflow.keras.optimizers import Adam

from sklearn.preprocessing import StandardScaler
from sklearn import metrics
import keras_tuner as kt

import matplotlib.pyplot as plt
import seaborn as sns

tf.random.set_seed(20)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

## 切分数据集

In [None]:

CURVES_DIR        = './curves'           
IMAGES_INDEX_XLSX = 'images_index.xlsx' 
IMAGES_ROOT       = './images'           
TRAIN_IMG_DIR     = './train_images'     
TEST_IMG_DIR      = './test_images'     


USE_STEM_MATCH = True


labels = pd.DataFrame(columns=['name'] + [f'y{i}' for i in range(200)])

for file in tqdm(os.listdir(CURVES_DIR), desc="Collect curves"):
    if not file.lower().endswith(('.xlsx', '.xls', '.csv')):
        continue
    fp = os.path.join(CURVES_DIR, file)
    if file.lower().endswith('.csv'):
        tmp = pd.read_csv(fp, index_col=0).iloc[1:, ].T
    else:
        tmp = pd.read_excel(fp, index_col=0).iloc[1:, ].T
    tmp = tmp.reset_index()
    tmp.columns = ['name'] + [f'y{i}' for i in range(200)]
    labels = pd.concat([labels, tmp], axis=0, ignore_index=True)


labels2 = pd.read_excel(IMAGES_INDEX_XLSX)


def stem(s):
    try:
        base = os.path.basename(str(s))
        return os.path.splitext(base)[0]
    except:
        return str(s)

if USE_STEM_MATCH:
    labels['name_stem']  = labels['name'].apply(stem)
    labels2['name_stem'] = labels2['name'].apply(stem)
    
    labels = pd.merge(labels2[['name', 'name_stem']], labels, how='inner', on='name_stem')
 
    labels.drop(columns=['name_x'], inplace=True)
    labels.rename(columns={'name_y': 'name'}, inplace=True)
    labels.drop(columns=['name_stem'], inplace=True)
else:
    labels = pd.merge(labels2[['name']], labels, how='inner', on='name')


y_cols = [f'y{i}' for i in range(200)]
ss = StandardScaler()
labels[y_cols] = ss.fit_transform(labels[y_cols])
joblib.dump(ss, 'ss.pkl')


def list_files(d):
    return sorted([f for f in os.listdir(d) if os.path.isfile(os.path.join(d, f))])

train_files = set(list_files(TRAIN_IMG_DIR))
test_files  = set(list_files(TEST_IMG_DIR))

if USE_STEM_MATCH:
  
    train_map = {stem(f): f for f in train_files}
    test_map  = {stem(f): f for f in test_files}


    train_mask = labels['name'].apply(lambda x: stem(x) in train_map)
    train_csv  = labels[train_mask].copy().reset_index(drop=True)
    train_csv['name'] = train_csv['name'].apply(lambda x: train_map.get(stem(x), x))


    test_mask = labels['name'].apply(lambda x: stem(x) in test_map)
    test_csv  = labels[test_mask].copy().reset_index(drop=True)
    test_csv['name'] = test_csv['name'].apply(lambda x: test_map.get(stem(x), x))


else:
  
    train_csv = labels[labels['name'].isin(train_files)].reset_index(drop=True)
    test_csv  = labels[labels['name'].isin(test_files)].reset_index(drop=True)

print(f"Train samples: {len(train_csv)}, Test samples: {len(test_csv)}")


plt.figure(figsize=(12, 4), dpi=120)
for i in range(min(10, len(labels))):
    y_values = labels.loc[i, y_cols].values
    sns.lineplot(x=range(200), y=y_values)
plt.title("Sample of standardized curves")
plt.tight_layout()
plt.show()

In [None]:
class CustomDataGenerator(Sequence):
    def __init__(self, csv_file, directory, batch_size, target_size, y_cols,
                 shuffle=True, augment=False):
        self.csv_file = csv_file
        self.directory = directory
        self.batch_size = batch_size
        self.target_size = target_size
        self.y_cols = y_cols
        self.shuffle = shuffle
        self.augment = augment
        self.on_epoch_end()

        if self.augment:
          
            self.idg = ImageDataGenerator(
                rotation_range=10,
                width_shift_range=0.1,
                height_shift_range=0.1,
                shear_range=0.05,
                zoom_range=0.1,
                horizontal_flip=True,
                fill_mode='nearest'
            )
        else:
            self.idg = None

    def __len__(self):
        return int(np.floor(len(self.csv_file) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        batch = [self.csv_file.iloc[k] for k in indexes]
        X, y = self.__data_generation(batch)
        return X, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.csv_file))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __data_generation(self, batch):
        X = np.empty((self.batch_size, *self.target_size, 3), dtype=np.float32)
        y = np.empty((self.batch_size, len(self.y_cols)), dtype=np.float32)

        for i, data in enumerate(batch):
  
            img_path = os.path.join(self.directory, data['name'])
            image = load_img(img_path, target_size=self.target_size)
            arr = img_to_array(image) / 255.0

            if self.augment and self.idg is not None:
                arr = self.idg.random_transform(arr)

            X[i] = arr
            y[i] = data[self.y_cols].values.astype(np.float32)

        return X, y

batch_size  = 16
target_size = (256, 256)

train_generator = CustomDataGenerator(
    csv_file=train_csv, directory=TRAIN_IMG_DIR,
    batch_size=batch_size, target_size=target_size, y_cols=y_cols,
    shuffle=True, augment=True
)
test_generator = CustomDataGenerator(
    csv_file=test_csv, directory=TEST_IMG_DIR,
    batch_size=batch_size, target_size=target_size, y_cols=y_cols,
    shuffle=False, augment=False
)

In [None]:

def stack_y(gen):
    ys = []
    for i in range(len(gen)):
        _, yy = gen[i]
        ys.append(yy)
    return np.concatenate(ys, axis=0)

y_train = stack_y(train_generator)
y_test  = stack_y(test_generator)
print("y_train.shape, y_test.shape =", y_train.shape, y_test.shape)


def plot_model_history(model_history):
    fig, axs = plt.subplots(1, 2, figsize=(10, 3), dpi=120)
    axs[0].plot(range(1, len(model_history.history['mse'])+1), model_history.history['mse'])
    axs[0].plot(range(1, len(model_history.history['val_mse'])+1), model_history.history['val_mse'])
    axs[0].set_title('Model MSE'); axs[0].set_ylabel('mse'); axs[0].set_xlabel('Epoch'); axs[0].legend(['train', 'val'], loc='best')
    axs[1].plot(range(1, len(model_history.history['loss'])+1), model_history.history['loss'])
    axs[1].plot(range(1, len(model_history.history['val_loss'])+1), model_history.history['val_loss'])
    axs[1].set_title('Model Loss'); axs[1].set_ylabel('Loss'); axs[1].set_xlabel('Epoch'); axs[1].legend(['train', 'val'], loc='best')
    fig.savefig('curve.jpg', dpi=600, bbox_inches='tight')
    plt.show()

In [None]:
def build_model(hp):
    input_image = Input(shape=(256, 256, 3))

    base_model = tf.keras.applications.Xception(
        weights='imagenet', include_top=False, input_shape=(256, 256, 3)
    )
    x = base_model(input_image)
    x = Flatten()(x)


    for i in range(2):
        x = Dense(
            units=hp.Int(f'units_{i}', min_value=128, max_value=512, step=16),
            activation=hp.Choice(f'activation_{i}', values=['relu', 'tanh'])
        )(x)

    output = Dense(200)(x)
    model = Model(inputs=input_image, outputs=output)

    model.compile(
        optimizer=Adam(
            learning_rate=hp.Float('learning_rate', min_value=1e-4, max_value=1e-3, sampling='LOG')
        ),
        loss='mse',
        metrics=['mse']
    )
    return model


TUNER_DIR = 'my_dir'
PROJECT   = 'helloworld_single_image'

tuner = kt.RandomSearch(
    build_model,
    objective='val_loss',
    max_trials=100,            
    executions_per_trial=1,
    directory=TUNER_DIR,
    project_name=PROJECT
)


already_tuned = False
try:
   
    _ = tuner.get_best_hyperparameters(num_trials=1)
    if len(_) > 0:
        already_tuned = True
except Exception:
    already_tuned = False

stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

if not already_tuned:
   
    tuner.search(
        train_generator,
        epochs=10,
        validation_data=test_generator,
        callbacks=[stop_early],
        verbose=1
    )

best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"[BEST HP] lr={best_hps.get('learning_rate')}")
for i in range(2):
    print(f"[BEST HP] Dense{i+1}: units={best_hps.get(f'units_{i}')}, act={best_hps.get(f'activation_{i}')}")


In [None]:

model = tuner.hypermodel.build(best_hps)
ckpt = tf.keras.callbacks.ModelCheckpoint(
    filepath="./best_model.h5", save_best_only=True, monitor='val_loss', mode='min'
)
history = model.fit(
    train_generator,
    steps_per_epoch=len(train_generator),
    epochs=100,                              
    validation_data=test_generator,
    validation_steps=len(test_generator),
    callbacks=[ckpt, stop_early],
    verbose=1
)
plot_model_history(history)


hist_df = pd.DataFrame(history.history)
hist_df.insert(0, 'epoch', range(1, len(hist_df)+1)) 


hist_df.to_excel('history_epoch_metrics.xlsx', index=False)

In [None]:
best = load_model('best_model.h5', compile=False)

y_test_pred  = best.predict(test_generator,  verbose=0)
y_train_pred = best.predict(train_generator, verbose=0)


y_test_true = joblib.load('ss.pkl').inverse_transform(y_test)
y_test_pred_is = joblib.load('ss.pkl').inverse_transform(y_test_pred)


r2_list, area_list = [], []
for i in range(y_test_true.shape[0]):
    r2_list.append(round(metrics.r2_score(y_test_true[i, :], y_test_pred_is[i, :]), 4))
    denom = np.trapz(y_test_true[i, :], dx=1.0)
    nume  = np.trapz(y_test_pred_is[i, :], dx=1.0)
    area_list.append((nume / denom) if denom != 0 else np.nan)

eps = 1e-8
denom_mat = np.maximum(np.abs(y_test_true), eps)  # shape: (N,200)
rel_rmse_vec  = np.sqrt(np.mean(((y_test_pred_is - y_test_true) / denom_mat) ** 2, axis=1))
rel_rmse_mean = float(np.nanmean(rel_rmse_vec))


df_metrics = pd.DataFrame({
    'name': test_csv['name'].reset_index(drop=True),
    'R2': r2_list,
    'AreaRatio': area_list,
    'RelRMSE': rel_rmse_vec
})
df_metrics.to_excel('test_metrics_R2_Area_RelRMSE.xlsx', index=False)

print("Test R² mean:", np.nanmean(r2_list))
print("Test AreaRatio mean:", np.nanmean(area_list))
print("Test RelRMSE mean:", rel_rmse_mean)


pd.DataFrame(y_test_true).assign(name=test_csv['name'].values).to_excel('True-test.xlsx', index=False)
pd.DataFrame(y_test_pred_is).assign(name=test_csv['name'].values).to_excel('Pred-test.xlsx', index=False)



In [None]:
loss_values = history.history['loss']
val_loss_values = history.history['val_loss']
num_epoch = len(loss_values)
df_loss = pd.DataFrame({'epoch': np.arange(1, num_epoch + 1),
                        'loss': loss_values,
                        'val_loss': val_loss_values})
df_loss.to_csv('loss_curve_image_only.csv', index=False)

In [None]:

k = min(10, y_test_true.shape[0])
for i in range(k):
    plt.figure(figsize=(8, 3), dpi=120)
    sns.lineplot(x=range(200), y=y_test_true[i, :], label='True')
    sns.lineplot(x=range(200), y=y_test_pred_is[i, :], label='Pred')
    plt.title(f"{test_csv.iloc[i]['name']} | R²={r2_list[i]:.3f}, RelRMSE={rel_rmse_vec[i]:.3f}")
    plt.legend(); plt.tight_layout(); plt.show()

In [None]:
def collect_y_and_names(generator, csv_df):
    y_all = []
    names_all = []
    for bi in range(len(generator)):
        Xb, yb = generator[bi]
        y_all.append(yb)

        start = bi * generator.batch_size
        end = min((bi + 1) * generator.batch_size, len(csv_df))
        batch_names = list(csv_df.iloc[start:end]['name'].values)
        names_all.extend(batch_names)
    return np.concatenate(y_all, axis=0), names_all

y_train, names_train = collect_y_and_names(train_generator, train_csv)
y_test,  names_test  = collect_y_and_names(test_generator,  test_csv)


y_train_pred = model.predict(train_generator, verbose=0)
y_test_pred  = model.predict(test_generator,  verbose=0)


ss: StandardScaler = joblib.load('ss.pkl')
y_train_inv = ss.inverse_transform(y_train)
y_test_inv  = ss.inverse_transform(y_test)
y_train_pred_inv = ss.inverse_transform(y_train_pred)
y_test_pred_inv  = ss.inverse_transform(y_test_pred)

In [None]:

r2_list, area_list = [], []
for i in range(y_test_true.shape[0]):
    r2_list.append(round(metrics.r2_score(y_test_true[i, :], y_test_pred_is[i, :]), 4))

    denom = np.trapz(y_test_true[i, :], dx=1.0)
    nume  = np.trapz(y_test_pred_is[i, :], dx=1.0)
    area_list.append((nume / denom) if denom != 0 else np.nan)


eps = 1e-8
denom = np.maximum(np.abs(y_test_true), eps)              # shape: (N,200)
rel_rmse_vec = np.sqrt(np.mean(((y_test_pred_is - y_test_true) / denom) ** 2, axis=1))
rel_rmse_mean = float(np.nanmean(rel_rmse_vec))


df_metrics = pd.DataFrame({
    'name': test_csv['name'].reset_index(drop=True),
    'R2': r2_list,
    'AreaRatio': area_list,
    'RelRMSE': rel_rmse_vec
})
df_metrics.to_excel('test_metrics_R2_Area_RelRMSE.xlsx', index=False)

print("Test R² mean:", np.nanmean(r2_list))
print("Test AreaRatio mean:", np.nanmean(area_list))
print("Test RelRMSE mean:", rel_rmse_mean)


pd.DataFrame(y_test_true).assign(name=test_csv['name'].values).to_excel('True-test.xlsx', index=False)
pd.DataFrame(y_test_pred_is).assign(name=test_csv['name'].values).to_excel('Pred-test.xlsx', index=False)
