In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from scipy import signal
from scipy.stats import pearsonr
# import pywt  # Continuous Wavelet Transform
import copy
import scipy.stats as st
from scipy.special import comb
import seaborn as sns
from sympy import *
import math

import kennard_stone as ks 

print(tf.__version__)

In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

In [None]:
path = '../../Datas/Paper_data/土壤有机质数据/2024第二批数据(96个土样)/re_vis-NIR.csv'
data = pd.read_csv(path)

In [None]:
data.head()

In [None]:
X = data.loc[:,"X400":"X2400"].values.astype("float32")
Y = data["SOM"].values.astype("float32")
wavelengths = np.linspace(400, 2400, X.shape[1])
train_data = data.values.astype("float32")

In [None]:
def show_hyperspectral_image(_data, title=None, x_label_start=0, sample_interval=10):
    y = _data
    x = range(0, _data.shape[1])
    axis_x_label = range(x_label_start, y.shape[1] * sample_interval + x_label_start, sample_interval)
    fig, ax = plt.subplots(figsize=[6, 4],dpi=400)
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)
    for i in range(0, y.shape[0]):
        plt.plot(x, y[i])
    xticks_interval = 20 
    # xticks_interval = 200 
    plt.xticks(x[::xticks_interval], axis_x_label[::xticks_interval], rotation=0)
    plt.xlabel('Wavelength/nm', fontsize=13)
    plt.ylabel('Reflectance', fontsize=13)
    plt.title(title, fontsize=15)
    # plt.grid(linestyle = '--',alpha=0.7)
    # plt.savefig("C:/Users/xy445/Desktop/Reflectance.png")
    plt.show()

def SG(data, w=11, p=2):
    return signal.savgol_filter(data, w, p)

In [None]:
show_hyperspectral_image(SG(X,w=17,p=2),'Raw',400,10)

In [None]:
x_train, x_test, y_train, y_test = ks.train_test_split(train_data[:,1:], train_data[:,:1], test_size=0.3)

gan_train_data = np.hstack((y_train,x_train))

In [None]:
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range=(0, 1))
normalized_data = scaler.fit_transform(gan_train_data)

# reconstructed_data = scaler.inverse_transform(normalized_data)

gan_data_matrix = np.zeros((len(normalized_data),217,1)).astype(np.float32)
for i in range(len(normalized_data)):
    gan_data_matrix[i] = normalized_data[i].reshape((217,1))
gan_data_matrix.shape

In [None]:
latent_dim = 100      # 噪声向量维度
signal_length = 217   # 信号长度

In [None]:
def build_generator():
    model = tf.keras.Sequential([
        layers.Input(shape=(latent_dim,)),
        layers.Dense(32, activation='relu', kernel_initializer='he_normal'),
        layers.BatchNormalization(),
        layers.Dense(64, activation='relu', kernel_initializer='he_normal'),
        layers.BatchNormalization(),
        layers.Dense(128, activation='relu', kernel_initializer='he_normal'),
        layers.BatchNormalization(),
        layers.Dense(signal_length, activation='tanh')
    ])
    return model
g_model = build_generator()
g_model.summary()

In [None]:
def build_discriminator():
    model = tf.keras.Sequential([
        layers.Input(shape=(signal_length, 1)),
        layers.Conv1D(32, 5, strides=2, padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.Dropout(0.3),
        
        layers.Conv1D(64, 5, strides=2, padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.Dropout(0.3),

        layers.Flatten(),
        layers.Dense(128),
        layers.LeakyReLU(alpha=0.2),
        layers.Dense(1, activation='sigmoid')
    ])
    return model
d_model = build_discriminator()
d_model.summary()

In [None]:
class StandardGAN(keras.Model):
    def __init__(self, generator, discriminator, **kwargs):
        super().__init__(**kwargs)
        self.generator = generator
        self.discriminator = discriminator
        # self.g_optimizer = tf.keras.optimizers.Adam(0.0002, beta_1=0.5)
        # self.d_optimizer = tf.keras.optimizers.Adam(0.0002, beta_1=0.5)
        self.g_optimizer = tf.keras.optimizers.Adam(learning_rate=tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=0.0002, decay_steps=10000, decay_rate=0.9, staircase=True), beta_1=0.5, beta_2=0.999)
        self.d_optimizer = tf.keras.optimizers.Adam(learning_rate=tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=0.0002, decay_steps=10000, decay_rate=0.9, staircase=True), beta_1=0.5, beta_2=0.999)
        self.loss_fn = tf.keras.losses.BinaryCrossentropy()
        
        self.g_loss_metric = tf.keras.metrics.Mean(name="g_loss")
        self.d_loss_metric = tf.keras.metrics.Mean(name="d_loss")

    @property
    def metrics(self):
        return [self.g_loss_metric, self.d_loss_metric]

    def train_step(self, real_data):
        batch_size = tf.shape(real_data)[0]
        
        noise = tf.random.normal(shape=(batch_size, latent_dim))   

        valid = tf.ones((batch_size, 1)) * 0.9  # 真实标签设为0.9
        fake = tf.zeros((batch_size, 1)) * 0.1  # 假标签设为0.1

        # valid = tf.ones((batch_size, 1))
        # fake = tf.zeros((batch_size, 1))

        with tf.GradientTape() as d_tape:
            generated_data = self.generator(noise)
            generated_data = tf.reshape(generated_data, [-1, signal_length, 1])
            
            real_pred = self.discriminator(real_data)
            fake_pred = self.discriminator(generated_data)
            
            d_real_loss = self.loss_fn(valid, real_pred)
            d_fake_loss = self.loss_fn(fake, fake_pred)
            d_total_loss = (d_real_loss + d_fake_loss) / 2

        d_gradients = d_tape.gradient(d_total_loss, self.discriminator.trainable_variables)
        self.d_optimizer.apply_gradients(zip(d_gradients, self.discriminator.trainable_variables))

        with tf.GradientTape() as g_tape:
            regenerated_data = self.generator(noise)
            regenerated_data = tf.reshape(regenerated_data, [-1, signal_length, 1])
            
            valid_pred = self.discriminator(regenerated_data)
            
            g_loss = self.loss_fn(valid, valid_pred)

        g_gradients = g_tape.gradient(g_loss, self.generator.trainable_variables)
        self.g_optimizer.apply_gradients(zip(g_gradients, self.generator.trainable_variables))

        self.g_loss_metric.update_state(g_loss)
        self.d_loss_metric.update_state(d_total_loss)
        
        return {
            "generator_loss": self.g_loss_metric.result(),
            "discriminator_loss": self.d_loss_metric.result()
        }

In [None]:
class GANMonitor(keras.callbacks.Callback):
    def __init__(self, num_img=65, latent_dim=128, last_end_epochs=0):
        self.num_img = num_img
        self.latent_dim = latent_dim
        self.last_end_epochs = last_end_epochs

    def on_epoch_end(self, epoch, logs=None):
      if (self.last_end_epochs+epoch+1) % 100 == 0:
        self.model.discriminator.save(f'../../models/1D-GAN[20250131]/GAN-D[{self.last_end_epochs+epoch+1}].h5')
        self.model.generator.save(f'../../models/1D-GAN[20250131]/GAN-G[{self.last_end_epochs+epoch+1}].h5')
        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
        generated_data = self.model.generator(random_latent_vectors, training=False)
        generated_data = generated_data.numpy()
        gan_data_matrix = np.zeros((len(generated_data),217)).astype(np.float32)
        for i in range(len(gan_data_matrix)):
            gan_data_matrix[i] = generated_data[i].reshape((217))
            gan_data_matrix[i] = scaler.inverse_transform(gan_data_matrix[i].reshape(1, -1))
        reflact = gan_data_matrix[:,1:]
        # fig, ax = plt.subplots(figsize=[6, 4],dpi=400)
        x = range(350, reflact.shape[1]+350)
        for i in range(0, reflact.shape[0]):
          plt.plot(x, reflact[i])
        plt.grid(linestyle = '--',alpha=0.7)
        plt.show()

In [None]:
with tf.device('/device:GPU:0'):
    epochs = 20000
    last_end_epochs = 0 
    cbk = GANMonitor(num_img=65, latent_dim=latent_dim, last_end_epochs=last_end_epochs)

    generator = build_generator()
    discriminator = build_discriminator()
    if last_end_epochs != 0:
        generator = tf.keras.models.load_model(f'../../models/1D-GAN[20250131]/GAN-G[{last_end_epochs}].h5')
        discriminator = tf.keras.models.load_model(f'../../models/1D-GAN[20250131]/GAN-D[{last_end_epochs}].h5')
    
    standard_gan = StandardGAN(generator=generator, discriminator=discriminator)
    standard_gan.compile()

    dataset = tf.data.Dataset.from_tensor_slices(gan_data_matrix)
    dataset = dataset.shuffle(buffer_size=10000)
    dataset = dataset.batch(65, drop_remainder=True)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    
    real_signals = dataset
    
    history = standard_gan.fit(
        real_signals,
        batch_size=65,
        epochs=epochs,
        verbose=1,
        callbacks=[cbk]
    )