In [None]:
from keras.utils.generic_utils import Progbar
from keras.models import Model, Sequential
from keras.layers import Input, Dense, Conv2D, MaxPooling2D, BatchNormalization, LeakyReLU, Reshape, Conv2DTranspose, \
    Flatten
from keras.layers.core import Activation
from keras.optimizers import Adam
import keras.backend as K
import math, cv2
import numpy as np
#import cupy as np
import os

from keras.models import model_from_json

# Generator
class Generator(object):
    def __init__(self, input_dim, image_shape):
        INITIAL_CHANNELS = 128
        # 生成した画像の1/4をsizeとする
        INITIAL_SIZE = 25

        inputs = Input((input_dim,))
        fc1 = Dense(input_dim=input_dim, units=INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE)(inputs)
        fc1 = BatchNormalization()(fc1)
        fc1 = LeakyReLU(0.2)(fc1)
        fc2 = Reshape((INITIAL_SIZE, INITIAL_SIZE, INITIAL_CHANNELS),
                      input_shape=(INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE,))(fc1)
        up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
        conv1 = Conv2D(64, (3, 3), padding='same')(up1)
        conv1 = BatchNormalization()(conv1)
        conv1 = Activation('relu')(conv1)
        up2 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv1)
        conv2 = Conv2D(image_shape[2], (5, 5), padding='same')(up2)
        outputs = Activation('tanh')(conv2)

        self.model = Model(inputs=[inputs], outputs=[outputs])

    def get_model(self):
        return self.model

# Discriminator
class Discriminator(object):
    def __init__(self, input_shape):
        inputs = Input(input_shape)
        conv1 = Conv2D(64, (5, 5), padding='same')(inputs)
        conv1 = LeakyReLU(0.2)(conv1)
        pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        conv2 = Conv2D(128, (5, 5), padding='same')(pool1)
        conv2 = LeakyReLU(0.2)(conv2)
        pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        fc1 = Flatten()(pool2)
        fc1 = Dense(1)(fc1)
        outputs = Activation('sigmoid')(fc1)

        self.model = Model(inputs=[inputs], outputs=[outputs])

    def get_model(self):
        return self.model


# DCGAN
class DCGAN(object):
    def __init__(self, input_dim, image_shape):
        self.input_dim = input_dim
        self.d = Discriminator(image_shape).get_model()
        self.g = Generator(input_dim, image_shape).get_model()

    def compile(self, g_optim, d_optim):
        self.d.trainable = False
        self.dcgan = Sequential([self.g, self.d])
        self.dcgan.compile(loss='binary_crossentropy', optimizer=g_optim)
        self.d.trainable = True
        self.d.compile(loss='binary_crossentropy', optimizer=d_optim)

    def train(self, epochs, batch_size, X_train):
        g_losses = []
        d_losses = []
        for epoch in range(epochs):
            np.random.shuffle(X_train)
            n_iter = X_train.shape[0] // batch_size
            progress_bar = Progbar(target=n_iter)
            for index in range(n_iter):
                # create random noise -> N latent vectors
                noise = np.random.uniform(-1, 1, size=(batch_size, self.input_dim))

                # load real data & generate fake data
                image_batch = X_train[index * batch_size:(index + 1) * batch_size]
                for i in range(batch_size):
                    if np.random.random() > 0.5:
                        image_batch[i] = np.fliplr(image_batch[i])
                    if np.random.random() > 0.5:
                        image_batch[i] = np.flipud(image_batch[i])
                generated_images = self.g.predict(noise, verbose=0)

                # attach label for training discriminator
                X = np.concatenate((image_batch, generated_images))
                y = np.array([1] * batch_size + [0] * batch_size)

                # training discriminator
                d_loss = self.d.train_on_batch(X, y)

                # training generator
                g_loss = self.dcgan.train_on_batch(noise, np.array([1] * batch_size))

                progress_bar.update(index, values=[('g', g_loss), ('d', d_loss)])
            g_losses.append(g_loss)
            d_losses.append(d_loss)
            
            if (epoch + 1) % 1 == 0:
                image = self.combine_images(generated_images)
                image = (image + 1) / 2.0 * 255.0
                cv2.imwrite('./results/' + str(epoch) + ".png", image)
            print('\nEpoch' + str(epoch) + " end")

            # save weights for each epoch
            if (epoch + 1) % 10 == 0:
                self.g.save('./weights/generator_' + str(epoch) + '.h5', True)
                self.d.save('./weights/discriminator_' + str(epoch) + '.h5', True)
        return g_losses, d_losses

    def load_weights(self, g_weight, d_weight):
        self.g.load_weights(g_weight)
        self.d.load_weights(d_weight)

    def combine_images(self, generated_images):
        num = generated_images.shape[0]
        width = int(math.sqrt(num))
        height = int(math.ceil(float(num) / width))
        shape = generated_images.shape[1:4]
        image = np.zeros((height * shape[0], width * shape[1], shape[2]),
                         dtype=generated_images.dtype)
        for index, img in enumerate(generated_images):
            i = int(index / width)
            j = index % width
            image[i * shape[0]:(i + 1) * shape[0], j * shape[1]:(j + 1) * shape[1], :] = img[:, :, :]
        return image

# AnoGAN
def sum_of_residual(y_true, y_pred):
    return K.sum(K.abs(y_true - y_pred))


class ANOGAN(object):
    def __init__(self, input_dim, g):
        self.input_dim = input_dim
        self.g = g
        g.trainable = False
        # Input layer cann't be trained. Add new layer as same size & same distribution
        anogan_in = Input(shape=(input_dim,))
        g_in = Dense((input_dim), activation='tanh', trainable=True)(anogan_in)
        g_out = g(g_in)
        self.model = Model(inputs=anogan_in, outputs=g_out)
        self.model_weight = None

    def compile(self, optim):
        self.model.compile(loss=sum_of_residual, optimizer=optim)
        K.set_learning_phase(0)

    def compute_anomaly_score(self, x, iterations=300):
        z = np.random.uniform(-1, 1, size=(1, self.input_dim))

        # learning for changing latent
        loss = self.model.fit(z, x, batch_size=1, epochs=iterations, verbose=0)
        loss = loss.history['loss'][-1]
        similar_data = self.model.predict_on_batch(z)

        return loss, similar_data


# train
if __name__ == '__main__':
    batch_size = 32
    epochs = 500
    input_dim = 100
    g_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)
    d_optim = Adam(lr=0.0001, beta_1=0.5, beta_2=0.999)

    import cv2
    import numpy as np
    import os
    from keras.preprocessing.image import img_to_array, load_img

    TRAIN_IMAGE_PATH = "./cats/"
    #x_IMAGE_PATH = "./Floc_Image_rot/s24_196_under/test/"
    #y_IMAGE_PATH = "./Floc_Image_rot/s24_196_over/"

    img_list = os.listdir(TRAIN_IMAGE_PATH)
    x_train = []
    for img in img_list:
        img = img_to_array(load_img(TRAIN_IMAGE_PATH + img, grayscale=False, target_size=(input_dim, input_dim, 3)))
        img = (img.astype(np.float32) - 127.5) / 127.5
        x_train.append(img)
    x_train = np.array(x_train)

    input_shape = x_train[0].shape

    # train generator & discriminator
    dcgan = DCGAN(input_dim, input_shape)
    
    #dcgan.load_weights('/Users/takaaki/Desktop/generator_1.h5',
    #                                    '/Users/takaaki/Desktop/discriminator_1.h5')
    
    dcgan.compile(g_optim, d_optim)
    g_losses, d_losses = dcgan.train(epochs, batch_size, x_train)
    with open('loss.csv', 'w') as f:
        for g_loss, d_loss in zip(g_losses, d_losses):
            f.write(str(g_loss) + ',' + str(d_loss) + '\n')


# test
K.set_learning_phase(1)


def denormalize(X):
    return ((X + 1.0) / 2.0 * 255.0).astype(dtype=np.uint8)


# test
if __name__ == '__main__':
    iterations = 50
    
    # 画像サイズ
    input_dim = 200
    anogan_optim = Adam(lr=0.001, beta_1=0.9, beta_2=0.999)
    
    y_IMAGE_PATH = "./Floc_Image/s24_190_over/"



    img_list = os.listdir(y_IMAGE_PATH)
    with open('./results/img_name.txt', 'a') as f:
        f.write(str(img_list) + '\n')

    y_test = []
    for img in img_list:
        img = img_to_array(load_img(y_IMAGE_PATH + img, grayscale=True, target_size=(input_dim, input_dim, 1)))
        # -1から1の範囲に正規化
        img = (img.astype(np.float32) - 127.5) / 127.5
        y_test.append(img)
    y_test = np.array(y_test)
    

    # load weights
    dcgan = DCGAN(input_dim, input_shape)
    dcgan.load_weights('./weights/generator_1099.h5',
                       './weights/discriminator_1099.h5')


    for i, test_img in enumerate(y_test):
        test_img = test_img[np.newaxis, :, :, :]
        anogan = ANOGAN(input_dim, dcgan.g)
        anogan.compile(anogan_optim)
        anomaly_score, generated_img = anogan.compute_anomaly_score(test_img, iterations)

        generated_img = denormalize(generated_img)
        imgs = np.concatenate((denormalize(test_img[0]), generated_img[0]), axis=1)
        cv2.imwrite('predict' + os.sep + str(int(anomaly_score)) + '_' + str(i) + '.png', imgs)
        print(str(i) + ' %.2f' % anomaly_score)

        with open('./results/y_test_score.txt', 'a') as f:
            f.write(str(anomaly_score) + '\n')

if __name__ == '__main__':
    # plot histgram
    import matplotlib.pyplot as plt
    import csv

    x = []
    with open('C:/Users/Takaaki Ishii/Desktop/results/y_train_score.txt', 'r') as f:
        reader = csv.reader(f)
        for row in reader:
            row = int(float(row[0]))
            x.append(row)
    y = []
    with open('C:/Users/Takaaki Ishii/Desktop/results/y_test_score.txt', 'r') as f:
        reader = csv.reader(f)
        for row in reader:
            row = int(float(row[0]))
            y.append(row)

    plt.title("Histgram of Score")
    plt.xlabel("Score")
    plt.ylabel("freq")
    plt.hist(x, bins=40, alpha=0.3, histtype='stepfilled', color='r', label="1")
    plt.hist(y, bins=40, alpha=0.3, histtype='stepfilled', color='b', label='9')
    plt.legend(loc=1)
    plt.savefig("C:/Users/Takaaki Ishii/Desktop/results/histgram.png")
    plt.show()



In [None]:
# 深層畳み込み生成対抗ネットワーク（DCGAN）と異常検出モデル（AnoGAN）を設定し、訓練する
from keras.models import Model, Sequential
from keras.layers import Input, Dense, Conv2D, MaxPooling2D, BatchNormalization, LeakyReLU, Reshape, Conv2DTranspose, \
    Flatten, Activation
from keras.optimizers import Adam
import keras.backend as K
import numpy as np
import cv2
import os
from keras.preprocessing.image import img_to_array, load_img

# Generatorクラス
class Generator(object):
    def __init__(self, input_dim, image_shape):
        INITIAL_CHANNELS = 128
        INITIAL_SIZE = 25  # 生成される画像の初期サイズ

        inputs = Input((input_dim,))
        fc1 = Dense(input_dim=input_dim, units=INITIAL_CHANNELS * INITIAL_SIZE * INITIAL_SIZE)(inputs)
        fc1 = BatchNormalization()(fc1)
        fc1 = LeakyReLU(0.2)(fc1)
        fc2 = Reshape((INITIAL_SIZE, INITIAL_SIZE, INITIAL_CHANNELS))(fc1)
        up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
        conv1 = Conv2D(64, (3, 3), padding='same')(up1)
        conv1 = BatchNormalization()(conv1)
        conv1 = Activation('relu')(conv1)
        up2 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv1)
        conv2 = Conv2D(image_shape[2], (5, 5), padding='same')(up2)
        outputs = Activation('tanh')(conv2)

        self.model = Model(inputs=[inputs], outputs=[outputs])

# Discriminatorクラス
class Discriminator(object):
    def __init__(self, input_shape):
        inputs = Input(input_shape)
        conv1 = Conv2D(64, (5, 5), padding='same')(inputs)
        conv1 = LeakyReLU(0.2)(conv1)
        pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        conv2 = Conv2D(128, (5, 5), padding='same')(pool1)
        conv2 = LeakyReLU(0.2)(conv2)
        pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        fc1 = Flatten()(pool2)
        fc1 = Dense(1)(fc1)
        outputs = Activation('sigmoid')(fc1)

        self.model = Model(inputs=[inputs], outputs=[outputs])

# DCGANクラス
class DCGAN(object):
    def __init__(self, input_dim, image_shape):
        self.d = Discriminator(image_shape).model
        self.g = Generator(input_dim, image_shape).model

    def compile(self, g_optim, d_optim):
        self.d.trainable = False
        dcgan = Sequential([self.g, self.d])
        dcgan.compile(loss='binary_crossentropy', optimizer=g_optim)
        self.d.trainable = True
        self.d.compile(loss='binary_crossentropy', optimizer=d_optim)

# 主要な訓練プロセスとテストプロセスは大幅に簡略化されています。
# 訓練プロセスでは、画像データを読み込み、DCGANモデルをインスタンス化し、オプティマイザを設定してモデルをコンパイルし、訓練を実行します。
# AnoGANクラスやその他の関数は、異常検出のための追加機能を提供しますが、主要な訓練ループからは省略しています。

# 重要なポイント:
# - `Generator`は、ランダムノイズから画像を生成するネットワークです。
# - `Discriminator`は、画像が本物か生成されたものかを識別するネットワークです。
# - `DCGAN`は、これら二つのネットワークを組み合わせ、互いに競合させながら訓練します。
# - コードの不要な部分（重複するインポート文や未使用の変数など）を削除し、日本語でのコメントを追加して、各クラスとメソッドの目的を明確にしました。


In [None]:


# DCGANの訓練プロセス
# DCGANは、GeneratorとDiscriminatorという2つのネットワークで構成されます。G
# eneratorはランダムノイズから画像を生成し、Discriminatorは画像が本物か生成されたものかを識別します。
# 訓練プロセスでは、Generatorを改善してよりリアルな画像を生成し、Discriminatorを改善して本物と生成された画像をより正確に識別するようにします。

# 訓練プロセスの実装
def train(self, epochs, batch_size, X_train):
    half_batch = int(batch_size / 2)
    
    for epoch in range(epochs):
        # バッチ単位での訓練
        for _ in range(int(X_train.shape[0] / batch_size)):
            
            # 本物の画像のサンプリング
            idx = np.random.randint(0, X_train.shape[0], half_batch)
            imgs = X_train[idx]

            # 偽の画像の生成
            noise = np.random.normal(0, 1, (half_batch, self.input_dim))
            gen_imgs = self.g.predict(noise)

            # Discriminatorの訓練
            d_loss_real = self.d.train_on_batch(imgs, np.ones((half_batch, 1)))
            d_loss_fake = self.d.train_on_batch(gen_imgs, np.zeros((half_batch, 1)))
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # Generatorの訓練
            noise = np.random.normal(0, 1, (batch_size, self.input_dim))
            valid_y = np.array([1] * batch_size)
            g_loss = self.dcgan.train_on_batch(noise, valid_y)

            # 進捗の表示
            print("%d [D loss: %f] [G loss: %f]" % (epoch, d_loss, g_loss))


In [None]:
# 異常検出（AnoGAN）
# AnoGANは、訓練済みのGeneratorを使用して、入力画像に似た画像を生成することにより、異常検出を行います。
# 異常検出のプロセスは、入力画像に最も近い潜在空間のベクトルを見つけることです。

# AnoGANの異常検出プロセス
def compute_anomaly_score(self, x, iterations=500):
    z = np.random.normal(size=(1, self.input_dim))
    z = K.variable(z)
    x = K.variable(x)
    
    # 損失関数を定義
    loss = K.mean(K.abs(self.g(z) - x))
    
    # 損失に対するzの勾配を計算
    grads = K.gradients(loss, [z])[0]
    
    # zを更新する関数
    iterate = K.function([z], [loss, grads])
    
    # 勾配降下法でzを更新
    for i in range(iterations):
        loss_value, grads_value = iterate([z])
        z -= grads_value * 0.01
        
    z = K.eval(z)
    g_z = K.eval(self.g(K.variable(z)))
    
    # 異常スコア（入力画像と生成画像の差の絶対値の和）を計算
    anomaly_score = np.sum(np.abs(g_z - x))
    return anomaly_score, g_z


In [None]:
# 訓練と異常検出の実行
# 訓練済みのモデルを使用して異常検出を行います。ここでは、訓練プロセスの後に異常検出を行う一連のステップを実行します。

if __name__ == '__main__':
    # データの準備
    X_train = # ここで訓練データを準備

    # DCGANのインスタンス化とコンパイル
    dcgan = DCGAN(input_dim, image_shape)
    dcgan.compile(g_optim, d_optim)
    
    # DCGANの訓練
    g_losses, d_losses = dcgan.train(epochs, batch_size, X_train)
    
    # 異常検出の実行
    test_img = # ここでテスト画像を準備
    anogan = ANOGAN(input_dim, dcgan.g)
    anogan.compile(anogan_optim)
    anomaly_score, similar_img = anogan.compute_anomaly_score(test_img)
    
    print("Anomaly Score:", anomaly_score)
    # 異常スコアと類似画像の表示など
