In [None]:
from __future__ import print_function, division

from keras.datasets import mnist
from keras.layers.merge import _Merge
from keras.layers import Input, Dense, Reshape, Flatten, Dropout
from keras.layers import BatchNormalization, Activation, ZeroPadding2D, Concatenate
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import RMSprop
from functools import partial

import keras.backend as K

import matplotlib.pyplot as plt

import sys

import numpy as np

In [None]:
img_dim = 2048
semantic_dim = 500
# latent_dim = 120
latent_dim = 50
n_critic = 5

epochs=50000
batch_size=32
sample_interval=50

In [None]:
# Generator network
z = Input(shape=(latent_dim,),name='z_input')
semantics = Input(shape=(semantic_dim,),name='semantics_input')

merged_layer = Concatenate()([z,semantics])
generator = Dense(2048, activation="relu")(merged_layer)

generator = Dense(2048, activation="relu")(generator)
# generator = Activation("tanh")(generator)

generator = Model(inputs=[z, semantics], outputs=generator, name='generator')

generator.summary()

In [None]:
# Discriminator
z = Input(shape=(latent_dim,),name='z_input')
img = Input(shape=(img_dim,),name='img_input')
semantics = Input(shape=(semantic_dim,),name='semantics_input')
# d_in = concatenate([z, img, semantics])
merged_layer = Concatenate()([z, img, semantics])

discriminator = Dense(4096)(merged_layer)
discriminator = LeakyReLU(alpha=0.2)(discriminator)
discriminator = Dropout(0.25)(discriminator)


discriminator = Dense(2048)(discriminator)
discriminator = LeakyReLU(alpha=0.2)(discriminator)
discriminator = Dropout(0.25)(discriminator)

discriminator = Dense(1)(discriminator)

discriminator = Model(inputs=[z, img, semantics], outputs=discriminator, name='discriminator')
discriminator.summary()

In [None]:
class RandomWeightedAverage(_Merge):
    """Provides a (random) weighted average between real and generated image samples"""
    def _merge_function(self, inputs):
#         alpha = K.random_uniform((32, 1, 1, 1))
        alpha = K.random_uniform((32, 1))
        return (alpha * inputs[0]) + ((1 - alpha) * inputs[1])

In [None]:
# 惩罚函数
def gradient_penalty_loss(y_true, y_pred, averaged_samples):
    """
    Computes gradient penalty based on prediction and weighted real / fake samples
    """
    gradients = K.gradients(y_pred, averaged_samples)[0]
    # compute the euclidean norm by squaring ...
    gradients_sqr = K.square(gradients)
    #   ... summing over the rows ...
    gradients_sqr_sum = K.sum(gradients_sqr,
                              axis=np.arange(1, len(gradients_sqr.shape)))
    #   ... and sqrt
    gradient_l2_norm = K.sqrt(gradients_sqr_sum)
    # compute lambda * (1 - ||grad||)^2 still for each single sample
    gradient_penalty = K.square(1 - gradient_l2_norm)
    # return the mean as loss over all the batch samples
    return K.mean(gradient_penalty)


#它取的是两个图像差异的均值。这种损失函数可以改善生成对抗网络的收敛性。
def wasserstein_loss(y_true, y_pred):
    return K.mean(y_true * y_pred)

# 均方差
def mean_squared_error(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

In [None]:
#-------------------------------
# Construct Computational Graph
#       for the discriminator
#-------------------------------
# Freeze generator's layers while training discriminator
generator.trainable = False
real_img = Input(shape=(img_dim,),name='real_img')
z = Input(shape=(latent_dim,),name='z_input')
semantics = Input(shape=(semantic_dim,),name='semantics_input')

# Generate image based of noise (fake sample)
fake_img = generator([z, semantics])

# Discriminator determines validity of the real and fake images
fake = discriminator([z,fake_img, semantics])
valid = discriminator([z,real_img, semantics])

# Construct weighted average between real and fake images
interpolated_img = RandomWeightedAverage()([real_img, fake_img])
# Determine validity of weighted sample
validity_interpolated = discriminator([z,interpolated_img, semantics])

# Use Python partial to provide loss function with additional
# 'averaged_samples' argument
partial_gp_loss = partial(gradient_penalty_loss,
                  averaged_samples=interpolated_img)
partial_gp_loss.__name__ = 'gradient_penalty' # Keras requires function names

discriminator_model = Model(inputs=[real_img, z, semantics],
                    outputs=[valid, fake, validity_interpolated])


optimizer = RMSprop(lr=0.000001)


discriminator_model.compile(loss=[wasserstein_loss,
                                      wasserstein_loss,
                                      partial_gp_loss],
                                optimizer=optimizer,
                                loss_weights=[1, 1, 10])

discriminator_model.summary()

In [None]:
#-------------------------------
# Construct Computational Graph
#         for Generator
#-------------------------------

# For the generator we freeze the discriminator's layers
discriminator.trainable = False
generator.trainable = True

# Sampled noise for input to generator
z = Input(shape=(latent_dim,),name='z_input')
semantics = Input(shape=(semantic_dim,),name='semantics_input')
# Generate images based of noise
img = generator([z, semantics])
# Discriminator determines validity
valid = discriminator([z, img, semantics])
# Defines generator model
generator_model = Model([z, semantics], valid)
generator_model.compile(loss=wasserstein_loss, optimizer=optimizer)
generator_model.summary()

In [None]:
from numpy import *  
import operator
def classify(inX,dataSet,labels,k):
    # 获取维度
    dataSetSize=dataSet.shape[0]   # 训练数据集数量

    diffMat=tile(inX,(dataSetSize,1))-dataSet  # 测试样本的各维度的差值

    sqDiffMat=diffMat**2  # 平方计算

    sqDistance=sqDiffMat.sum(axis=1)  # 输出每行的值

    distances=sqDistance**0.5   # 开方计算

    sortedDistances=distances.argsort()   # 排序 按距离从小到大 输出索引

    classCount={}
    for i in range(k):
#         print(sortedDistances[i])
        voteIlabel=labels[sortedDistances[i]]
        classCount[voteIlabel]=classCount.get(voteIlabel,0)+1.0
    sortedClassCount=sorted(classCount.items(),key=operator.itemgetter(1),reverse=True)

    return sortedClassCount[0][0]

def accuracy_train(x,y,z):
    group = z
    labels= [0,1,2]

    num=0
    y_pred =[]

    for i in range(6000):
        res=classify(x[i],group,labels,1)
        y_pred.append(res)
        
        if res == y[i]:
             num = num+1

    accuracy = num/6000
    
    return accuracy

def accuracy_test1(x,y,z):
    group = z
    labels= [0,1,2]

    num=0
    y_pred =[]

    for i in range(6000):
        res=classify(x[i],group,labels,1)
        y_pred.append(res)
        
        if res == y[i]:
             num = num+1

    accuracy = num/6000
    
    return accuracy

def accuracy_test2(x,y,z):
    group = z
    labels= [0,1,2,3]

    num=0
    y_pred =[]

    for i in range(8000):
        res=classify(x[i],group,labels,1)
        y_pred.append(res)
        
        if res == y[i]:
             num = num+1

    accuracy = num/8000
    
    return accuracy

In [None]:
x_train = np.load('./datas/features/feature_train.npy')
y_train = np.load('./datas/train/y_train.npy')

x_test = np.load('./datas/features/feature_test.npy')
y_test = np.load('./datas/test/y_test.npy')

semantic_train = np.load('./datas/semantic_test/semantic_train.npy')
semantic_test = np.load('./datas/semantic_test/semantic_test.npy')
print(x_train.shape,y_train.shape)
print(x_test.shape,y_test.shape)
print(semantic_train.shape,semantic_test.shape)

In [None]:
flag_acc_test1 = 0
flag_acc_test2 = 0

valid = -np.ones((batch_size, 1))
fake =  np.ones((batch_size, 1))

dummy = np.zeros((batch_size, 1)) # Dummy gt for gradient penalty
for epoch in range(epochs):

    for _ in range(n_critic):

        # ---------------------
        #  Train Discriminator
        # ---------------------

        # Select a random batch of images
        idx = np.random.randint(0, x_train.shape[0], batch_size)
        imgs = x_train[idx]
        semantics = semantic_train[idx]
        # Sample generator input
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        # Train the discriminator
        d_loss = discriminator_model.train_on_batch([imgs, noise, semantics],
                                                        [valid, fake, dummy])

    # ---------------------
    #  Train Generatorhttp://localhost:8888/notebooks/lk_model/WGANGP.ipynb#
    # ---------------------

    g_loss = generator_model.train_on_batch([noise,semantics], valid)
    
    mean_squared_error_loss = mean_squared_error(imgs,generator.predict([noise,semantics]))

    # If at save interval => save generated image samples
    if epoch % sample_interval == 0:
        samples = 7
        noise = np.random.normal(0, 1, (samples, latent_dim))
        semantics_all = np.load('./datas/semantic_test/semantic_all.npy')
        fake_img = generator.predict([noise, semantics_all])
        z1 = fake_img[0:3]
        z2 = fake_img[3:6]
        z3 = fake_img[3:7]
        
        acc_train= accuracy_train(x_train,y_train,z1)
        acc_test1 = accuracy_test1(x_test,y_test,z2)
        acc_test2 = accuracy_test2(x_test,y_test,z3)
        
        if(flag_acc_test1 < acc_test1):
            flag_acc_test1 = acc_test1
        
        if(flag_acc_test2 < acc_test2):
            flag_acc_test2 = acc_test2
        
        
        print ("[epoch: %d] [acc_train: %f] [acc_test1: %f] [acc_test2: %f]" % (epoch,acc_train, flag_acc_test1, flag_acc_test2))
