In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import matplotlib.pyplot as plt
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session


from tensorflow.keras.models import Sequential
import tensorflow as tf
from tensorflow.keras.layers import Dense, Conv2D, Dropout, BatchNormalization, MaxPooling2D, Flatten, Activation, GlobalAveragePooling2D, Input, Reshape, LeakyReLU, ZeroPadding2D, UpSampling2D
from tensorflow.keras import optimizers
import os
from tensorflow.keras.regularizers import l2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers
from tensorflow.keras.callbacks import CSVLogger, ModelCheckpoint, ReduceLROnPlateau
from sklearn.model_selection import train_test_split



/kaggle/input/mnist-in-csv/mnist_test.csv
/kaggle/input/mnist-in-csv/mnist_train.csv


In [2]:
train = pd.read_csv('../input/mnist-in-csv/mnist_train.csv')
test = pd.read_csv('../input/mnist-in-csv/mnist_test.csv')
train.head(3)

Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
0,5,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [3]:
x_train = train.iloc[:, 1:]
x_train = (x_train.astype(np.float32)-127.5)/127.5
x_train = x_train.values.reshape(-1, 28, 28, 1)
n_train = x_train.shape[0]
y_train = train['label']
y_train = tf.keras.utils.to_categorical(y_train, num_classes=10)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=1) # validation split

## Generative adverserial network for MNIST

In [4]:
def build_generator(n_noise = 100, img_shape=(28, 28, 1)):
    noise_shape = (n_noise,)
    model = Sequential()
    model.add(Dense(128 * 7 * 7, activation="relu", input_dim=noise_shape[0]))
    model.add(Reshape((7, 7, 128)))
    model.add(BatchNormalization(momentum=0.8))
    model.add(UpSampling2D())
    model.add(Conv2D(128, kernel_size=3, padding="same"))
    model.add(Activation("relu"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(UpSampling2D())
    model.add(Conv2D(64, kernel_size=3, padding="same"))
    model.add(Activation("relu"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Conv2D(1, kernel_size=3, padding='same'))
    model.add(Activation("tanh"))  
    model.add(tf.keras.layers.Reshape(img_shape, name='output'))
    #noise = Input(shape=(noise_shape,))
    #label = Input(shape=(1,), dtype='int32')
    #label_embedding = Flatten()(Embedding(self.num_classes, self.latent_dim)(label))
    #model_input = multiply([noise, label_embedding])
    noise = Input(shape=noise_shape)
    img = model(noise)
    #return tf.keras.Model([noise, label], img)
    return tf.keras.Model(noise, img)

def build_discriminator(img_shape=(28, 28, 1)):
    model = Sequential()
    model.add(Conv2D(16, kernel_size=3, strides=2, input_shape=img_shape, padding="same"))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(32, kernel_size=3, strides=2, padding="same"))
    model.add(ZeroPadding2D(padding=((0,1),(0,1))))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Conv2D(64, kernel_size=3, strides=2, padding="same"))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Conv2D(128, kernel_size=3, strides=1, padding="same"))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))
    model.summary()

    img = Input(shape=img_shape)
    validity = model(img)
    return tf.keras.Model(img, validity)

def build_stack(compiled=True):
    discriminator = build_discriminator()
    if compiled:
        discriminator.compile(loss='binary_crossentropy', optimizer=optimizers.Adam(), metrics=['accuracy'])
    generator = build_generator()
    if compiled:
        generator.compile(loss=tf.keras.losses.binary_crossentropy, optimizer=optimizers.Adam(),metrics=['accuracy'])
    discriminator.trainable = False # we want static training of generator and discriminator
    input_layer = Input(shape=(100,))
    img = generator(input_layer) # generate a random img
    output_layer = discriminator(img) #set output layer to return validity of input, 1=>img came from train set
    gen_disc_stack = tf.keras.Model(input_layer, output_layer)
    if True: # stack always gets compiled
        gen_disc_stack.compile(loss='binary_crossentropy', optimizer=optimizers.Adam())
    return gen_disc_stack, discriminator, generator

def generate_fake(idx, generator, out=0):
    r, c = 5, 5
    noise = np.random.normal(0, 1, (r * c, 100))
    gen_imgs = generator.predict(noise)

    # Rescale images 0 - 1
    gen_imgs = 0.5 * gen_imgs + 0.5

    fig, axs = plt.subplots(r, c)
    cnt = 0
    for i in range(r):
        for j in range(c):
            axs[i,j].imshow(gen_imgs[cnt, :,:,0], cmap='gray')
            axs[i,j].axis('off')
            cnt += 1
    fig.savefig("mnist_%d.png" % idx)
    if out: fig.show()
    plt.close()

def train_stack(epochs, x=x_train, y=y_train, batch_size=32, real_fake_split=0.5, optimizer=optimizers.Adam(), loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits=True), save_interval=50):
    gen_disc_stack, discriminator, generator = build_stack(compiled=True)
    real_batch_size = int(batch_size*real_fake_split)
    fake_batch_size = batch_size-real_batch_size
    #train_dataset = tf.data.Dataset.from_tensor_slices((x, y))
    #train_dataset_real = train_dataset.shuffle(buffer_size=1024).batch(real_batch_size)
    #train_dataset_real = train_dataset.shuffle(buffer_size=1024).batch(real_batch_size)
    for epoch in range(0, epochs):      
        real_img = x[np.random.randint(0, x.shape[0], real_batch_size)]
        noise = np.random.normal(0, 1, (fake_batch_size, 100))
        fake_img = generator.predict(noise)
        loss_real = discriminator.train_on_batch(real_img, np.ones((real_batch_size, 1)))
        loss_fake = discriminator.train_on_batch(fake_img, np.zeros((fake_batch_size, 1)))
        disc_loss = np.add(np.multiply(real_fake_split,loss_real),np.multiply(loss_fake,(1-real_fake_split))) # take convex comb of losses
        noise = np.random.normal(0, 1, (batch_size, 100))
        gen_loss = gen_disc_stack.train_on_batch(noise, np.array([1] * batch_size))

            # Plot the progress
        print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, disc_loss[0], 100*disc_loss[1], gen_loss))

            # If at save interval => save generated image samples
        if epoch % save_interval == 0:
            generate_fake(epoch, generator)
    generate_fake(epochs+1, generator, out=1)
    return gen_disc_stack, discriminator, generator
        
    
    

In [5]:
gen_disc_stack, discriminator, generator = train_stack(13000, save_interval=1000)

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 14, 14, 16)        160       
_________________________________________________________________
leaky_re_lu (LeakyReLU)      (None, 14, 14, 16)        0         
_________________________________________________________________
dropout (Dropout)            (None, 14, 14, 16)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 7, 7, 32)          4640      
_________________________________________________________________
zero_padding2d (ZeroPadding2 (None, 8, 8, 32)          0         
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 8, 8, 32)          0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 8, 8, 32)          0

real_batch_size = int(batch_size*real_fake_split)
    fake_batch_size = batch_size-real_batch_size
    train_dataset = tf.data.Dataset.from_tensor_slices((x, y))
    train_dataset_real = train_dataset.shuffle(buffer_size=1024).batch(real_batch_size)
    #train_dataset_real = train_dataset.shuffle(buffer_size=1024).batch(real_batch_size)
    for epoch in range(0, epochs):
        print('Start of epoch %d' % (epoch,))
        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
            val_1 = np.ones(len(y_batch_train))
            val_0 = np.zeros(len(y_batch_train))  # fix for general split later
            real_img = x[np.random.randint(0, x.shape[0], split_btach)]
            noise = np.random.normal(0, 1, (int(batch_size*(1-real_fake_split)), 100))
            fake_img = generator.predict(noise)
            with tf.GradientTape() as tape:
                logits_real = discriminator(x_batch_train, training=True)  # Logits for this minibatch
                loss_real = loss_fn(val_1, logits_real)
                logits_fake = discriminator(fake_img, training=True)
                loss_fake = loss_fn(val_zero, logits_fake)
                loss_value_disc = (loss_real+loss_fake)*0.5 # fix for general split later
                # Use the gradient tape to automatically retrieve
                # the gradients of the trainable variables with respect to the loss.
            grads = tape.gradient(loss_value_disc, discriminator.trainable_weights)
            optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))
            
            

    # Log every 200 batches.
    if step % 200 == 0:
        print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
        print('Seen so far: %s samples' % ((step + 1) * 64))

        
        
        
        
        model.add(Dense(256, input_shape=noise_shape))
    model.add(LeakyReLU(alpha=0.2))
    model.add(BatchNormalization(momentum=0.6))
    model.add(Dense(512))
    model.add(LeakyReLU(alpha=0.2))
    model.add(BatchNormalization(momentum=0.6))
    model.add(Dense(1024))
    model.add(LeakyReLU(alpha=0.2))
    model.add(BatchNormalization(momentum=0.6))
    model.add(Dense(np.prod(img_shape), activation='tanh'))
    model.add(tf.keras.layers.Reshape(img_shape, name='output'))
    model.summary()
    noise = Input(shape=noise_shape)
    img = model(noise)
    
    
    
    
            