In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.io as scio
from sklearn.neighbors import LocalOutlierFactor as LOF
from sklearn import metrics
from pyod.models.lof import LOF as PYOD_LOF
from sklearn.svm import OneClassSVM
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.callbacks import CSVLogger
from tensorflow.keras.preprocessing.image import img_to_array, array_to_img
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Flatten, Dense, Reshape
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.utils import Sequence
import cv2  # OpenCVをインポート

In [2]:
# Data Preprocessing
def load_and_preprocess_data(filepath, scale_factor=6000):
    yxqdata = scio.loadmat(filepath)
    keys = ['X_training_normal', 'X_training', 'X_test', 'X_pre']
    data = {key: yxqdata[key] / scale_factor for key in keys}
    for key in data:
        data[key][data[key] < 0] = 0

    labels = {key: (yxqdata[key.replace('X', 'Y')]).squeeze() for key in keys}
    return data, labels

data, labels = load_and_preprocess_data('./CAO.mat')


In [3]:
def data_resize(data, a, b):
    data_reshaped = data.reshape(-1, data.shape[1], data.shape[2])  # Assuming the data is already 3D
    resized = np.zeros((data_reshaped.shape[0], a, b, 1))  # Adjusted shape to include channel dimension
    for i in range(data_reshaped.shape[0]):
        img = array_to_img(np.expand_dims(data_reshaped[i], axis=-1))  # Adding the channel dimension
        img = img.resize((b, a))  # Resize the image to (width, height)
        resized[i] = img_to_array(img) / 255.0  # Normalize the array
    return np.moveaxis(resized, 0, -1)

for key in data:
    data[key] = data_resize(data[key], 80, 120)

In [4]:
# Dataset Definition
class yxqDataset:
    def __init__(self, data, labels):
        self.labels = labels
        self.imgs = data

    def __len__(self):
        return self.labels.shape[0]

    def __getitem__(self, idx):
        image = self.imgs[:, :, 0]  # Access the first channel (grayscale)
        label = self.labels[idx]
        return image, label

In [5]:
def create_dataloader(data, labels, batch_size=16, shuffle=True):
    dataset = yxqDataset(data, labels)
    X, y = [], []
    for i in range(dataset.__len__()):
        img, label = dataset.__getitem__(i)
        X.append(img)
        y.append(label)
    X = np.array(X)
    y = np.array(y)
    return train_test_split(X, y, test_size=0.2, shuffle=shuffle)

X_train, X_val, y_train, y_val = create_dataloader(data['X_training_normal'], labels['X_training_normal'])

In [6]:
# Define CAE
input_img = Input(shape=(80, 120, 1))

# Encoder
x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_img)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2), padding='same')(x)
shape = x.shape
x = Flatten()(x)
encoded = Dense(32, activation='relu')(x)

# Decoder
x = Dense(shape[1] * shape[2] * shape[3], activation='relu')(encoded)
x = Reshape((shape[1], shape[2], shape[3]))(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
decoded = Conv2D(1, (3, 3), activation='tanh', padding='same')(x)

In [8]:
autoencoder = Model(input_img, decoded)
autoencoder.compile(optimizer=Adam(learning_rate=1e-3), loss=MeanSquaredError())
autoencoder.summary()

In [None]:
# Training Function
def train_model(num_epochs, model, train_loader, log_file):
    train_losses = []
    for epoch in range(num_epochs):
        train_loss = 0.0
        for img, _ in train_loader:
            loss = model.train_on_batch(img, img)
            train_loss += loss * img.shape[0]
        train_loss /= len(train_loader.indices)
        print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {train_loss:.6f}')
        train_losses.append(train_loss)
        df = pd.DataFrame([[epoch + 1, train_loss]], columns=['epoch', 'train_loss'])
        df.to_csv(log_file, mode='a', header=False, index=False)
    return train_losses

In [None]:
X_train = np.mean(X_train, axis=-1, keepdims=True)
X_val = np.mean(X_val, axis=-1, keepdims=True)

In [None]:
# Training
csv_logger = CSVLogger('./result/CAO/loss/CAO_loss1.csv')
history = model.fit(X_train, X_train, epochs=100, batch_size=16, shuffle=True,
                    validation_data=(X_val, X_val), callbacks=[csv_logger])

In [None]:
# Plot Training Loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Save the model
model.save('./updated_model_CAO.h5')


In [None]:
from keras.losses import MeanSquaredError

# Load Model
model = models.load_model('./updated_model_CAO.h5', custom_objects={'mse': MeanSquaredError()})


In [None]:
# Prediction and Reconstruction
def plot_reconstructed_images(X, model):
    n = 5
    fig, axs = plt.subplots(2, n, figsize=(15, 6))
    for i in range(n):
        img = X[i].reshape(80, 120)
        axs[0, i].imshow(img, cmap='gray')
        axs[0, i].set_title("Original")

        reconstructed = model.predict(X[i].reshape(1, 80, 120, 1))
        axs[1, i].imshow(reconstructed.reshape(80, 120), cmap='gray')
        axs[1, i].set_title("Reconstructed")
    plt.show()

plot_reconstructed_images(X_train, model)
