# Data Preprocessing

In [1]:
import sys
import shutil
from pathlib import Path

In [2]:
def preprocess_emoji(dset_path):
    # make emoji unicode vocabulary
    code_vocaburary = {}
    code_path = Path("dataset/description/unicode.txt")
    code_list = code_path.read_text(encoding="utf-8").split("\n")
    for index, data in enumerate(code_list):
        code_vocaburary[data] = index

    # chack dataset path
    image_path = Path(dset_path)
    if image_path.exists() == False:
        exit("Check your dataset path!")

    # copy designated emoji images
    for filepath in list(image_path.glob("./**/64/**/*.png")):
        if str(filepath.name.split(".")[0]) in code_list:
            shutil.copyfile(
                filepath,
                "dataset/edited/"
                + str(code_vocaburary[filepath.name.split(".")[0]])
                + ".png",
            )

In [3]:
print("Started pre-processing")
preprocess_emoji("dataset/original")
print("Completed pre-processing")

Started pre-processing
Completed pre-processing


## Constants

In [4]:
# size of images we will be generating (and training upon)
# 64 X 64 X 3(channels)
IMAGE_SHAPE = (64, 64, 3)

# dimension of word_vector (embedding) = 300 X  1
EMBEDDING_DIM = 300

# Latent dim used in layers of models = 100  X 1
LATENT_DIM = 100

# directory containing preprocessed images for training
IMAGE_DIR = "dataset/edited/"

# directory containing preprocessed txt
TXT_DIR = "dataset/description/detailed/"

# Data Loading

In [5]:
from pathlib import Path
from sklearn.model_selection import train_test_split
from keras.preprocessing.image import img_to_array, load_img
from nltk.tokenize import sent_tokenize
import numpy as np
import sys
import re

Using TensorFlow backend.


In [6]:
# function to load dataset
def load_dataset(img_dir, txt_dir, img_shape, split_rate=0.1):
    t_path = Path(txt_dir)
    i_path = Path(img_dir)

    images = dict()
    texts = dict()

    for filename in list(i_path.glob("*.png")):
        name = filename.name.replace(".png", "")
        images[name] = filename.resolve()

    for filename in list(t_path.glob("*.txt")):
        name = filename.name.replace(".txt", "")
        texts[name] = filename.read_text(encoding="utf-8").lower()

    image_list = []
    caption_list = []
    numbers = []

    for name, item_path in images.items():
        if name in texts:
            text = texts[name]
            text = text.replace(
                "“", ""
            )  # need to remove explicitly as ascii has only one double-quotes, no start-end double-quotes
            text = text.replace("”", "")
            tokenized = sent_tokenize(text)  # tokenizes sentences, delimiter = "."
            label_number = int(name)

            for sentence in tokenized:
                regex_any_symbol = re.compile("[!-/:-@[-`{-~]")
                filtered_sentence = re.sub(
                    regex_any_symbol, "", sentence
                )  # removes any symbol from description
                #                 print(filtered_sentence)
                image = img_to_array(
                    load_img(item_path, target_size=(img_shape[0], img_shape[1]))
                )
                image = (image.astype(np.float32) / 127.5) - 1.0
                #                 print(image)
                image_list.append(image)
                caption_list.append(filtered_sentence)
                numbers.append(label_number)

    image_list = np.array(image_list)
    caption_list = np.array(caption_list)
    numbers = np.array(numbers)

    print("Dataset Size: %s" % len(image_list))
    (
        image_train,
        image_test,
        caption_train,
        caption_test,
        numbers_train,
        numbers_test,
    ) = train_test_split(image_list, caption_list, numbers, test_size=split_rate)

    return (
        image_train,
        caption_train,
        image_test,
        caption_test,
        numbers_train,
        numbers_test,
    )

In [7]:
# Loading dataset:
(
    image_train,
    caption_train,
    image_test,
    caption_test,
    numbers_train,
    numbers_test,
) = load_dataset(IMAGE_DIR, TXT_DIR, IMAGE_SHAPE)

Dataset Size: 260


# Word2Em

In [8]:
import urllib.request
import os
import zipfile

In [9]:
def load_glove(glove_file_path, embedding_dim=EMBEDDING_DIM):
    print("Loading glove file, please wait...")
    _word2em = {}
    file = open(glove_file_path, mode="rt", encoding="utf8")
    for line in file:
        words = line.strip().split()
        word = words[0]
        embeds = np.array(words[1:], dtype=np.float32)
        _word2em[word] = embeds
    file.close()
    print("Finished.")
    return _word2em

In [10]:
word2em = load_glove("/home/kaustubh/emotigan/utils/glove.6B.300d.txt")

Loading glove file, please wait...
Finished.


# Sentence2Em

In [11]:
# returns embedding<output> for a sentence<input>
def vectorize_sentence(sentence, embedding_dim=EMBEDDING_DIM):
    words = sentence.split(" ")
    em = np.zeros(shape=(embedding_dim,))
    for word in words:
        try:
            em = np.add(em, word2em[word])
        except KeyError:
            #             print('Error: Not found "' + word + '"')
            pass
    return em

In [12]:
# vectorize list of sentences
def vectorize_sent_list(sent_list):
    out = []
    for sent in sent_list:
        v = vectorize_sentence(sent)
        out.append(v)
    return np.array(out)

# Model

In [13]:
# GPU setting
import tensorflow as tf
from tensorflow.keras.backend import set_session

config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", allow_growth=True  # specify GPU number
    )
)

print(config)

set_session(tf.Session(config=config))

gpu_options {
  allow_growth: true
  visible_device_list: "0"
}



In [14]:
from __future__ import print_function, division

from keras.layers import Input, Dense, Reshape, Flatten, Dropout, concatenate
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam
from keras import backend as kb
from keras.layers import Lambda

In [15]:
# return generator model (keras)
def build_generator(
    latent_dim=LATENT_DIM, embedding_dim=EMBEDDING_DIM, channels=IMAGE_SHAPE[2]
):
    generator_input = Input(shape=(latent_dim,), name="g_input")
    cond_input = Input(shape=(embedding_dim,), name="cond_g_input")
    cond_output = Dense(100)(cond_input)

    G = concatenate([generator_input, cond_output])
    G = Dense(256 * 8 * 8, activation="relu")(G)
    G = Reshape((8, 8, 256))(G)
    G = UpSampling2D()(G)
    G = Conv2D(256, kernel_size=3, padding="same")(G)
    G = BatchNormalization(momentum=0.8)(G)
    G = Activation("relu")(G)
    G = UpSampling2D()(G)
    G = Conv2D(128, kernel_size=3, padding="same")(G)
    G = BatchNormalization(momentum=0.8)(G)
    G = Activation("relu")(G)
    G = UpSampling2D()(G)
    G = Conv2D(64, kernel_size=3, padding="same")(G)
    G = BatchNormalization(momentum=0.8)(G)
    G = Activation("relu")(G)
    G = Conv2D(filters=channels, kernel_size=3, padding="same")(G)
    G = Activation("tanh")(G)

    generator = Model([generator_input, cond_input], G)
    generator.summary()

    return generator

In [16]:
def build_discriminator(embedding_dim=EMBEDDING_DIM, img_shape=IMAGE_SHAPE):
    discriminator_input = Input(shape=img_shape, name="d_input")
    cond_input = Input(shape=(embedding_dim,), name="cond_d_input")

    D = Conv2D(64, kernel_size=3, strides=2, padding="same")(discriminator_input)
    D = LeakyReLU(alpha=0.2)(D)
    D = Dropout(0.25)(D)
    D = Conv2D(128, kernel_size=3, strides=2, padding="same")(D)
    D = ZeroPadding2D(padding=((0, 1), (0, 1)))(D)
    D = BatchNormalization(momentum=0.8)(D)
    D = LeakyReLU(alpha=0.2)(D)
    D = Dropout(0.25)(D)
    D = Conv2D(256, kernel_size=3, strides=1, padding="same")(D)
    D = BatchNormalization(momentum=0.8)(D)
    D = LeakyReLU(alpha=0.2)(D)
    D = Dropout(0.25)(D)
    D = Conv2D(512, kernel_size=3, strides=2, padding="same")(D)
    D = BatchNormalization(momentum=0.8)(D)
    D = LeakyReLU(alpha=0.2)(D)

    #     print(D._keras_shape)

    cond_d_hidden = Dense(100)(cond_input)
    cond_d_hidden = Reshape((1, 1, 100))(cond_d_hidden)

    #     print(cond_d_hidden._keras_shape)

    cond_d_output = Lambda(
        lambda x: kb.tile(x, [1, 9, 9, 1]), output_shape=[9, 9, 100]
    )(cond_d_hidden)

    #     print(cond_d_output._keras_shape)

    #         cond_d_output = Lambda(lambda x: kb.tile(x, [2,2,1]))(cond_d_hidden)

    D = concatenate([D, cond_d_output], axis=-1)
    D = Conv2D(512, kernel_size=3, strides=1, padding="same")(D)
    D = BatchNormalization(momentum=0.8)(D)
    D = LeakyReLU(alpha=0.1)(D)
    D = Dropout(0.25)(D)
    D = Flatten()(D)
    discriminator_output = Dense(1, activation="sigmoid")(D)

    discriminator = Model([discriminator_input, cond_input], discriminator_output)
    discriminator.summary()

    return discriminator

In [17]:
# optimizers
optimizer_g = Adam(0.0005, 0.5)
optimizer_d = Adam(0.00005, 0.5)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


In [18]:
# Build the generator
generator = build_generator()

# The generator takes noise as input and generates imgs
z = Input(shape=(LATENT_DIM,))
cond_input = Input(shape=(EMBEDDING_DIM,))
img = generator([z, cond_input])

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
cond_g_input (InputLayer)       (None, 300)          0                                            
__________________________________________________________________________________________________
g_input (InputLayer)            (None, 100)          0                                            
__________________________________________________________________________________________________
dense_1 (Dense)                 (None, 100)          30100       cond_g_input[0][0]               
__________________________________________________________________________________________________
concatenate_1 (Concatenate)     (None, 200)          0           g_input[0][0]                    
                                                                 dense_1[0][0]              

In [19]:
# Build and Compile Discriminator
discriminator = build_discriminator()
discriminator.compile(
    loss="binary_crossentropy", optimizer=optimizer_d, metrics=["accuracy"]
)

# For the combined model we will only train the generator
discriminator.trainable = False

# The discriminator takes generated images as input and determines validity
valid = discriminator([img, cond_input])

Model: "model_2"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
d_input (InputLayer)            (None, 64, 64, 3)    0                                            
__________________________________________________________________________________________________
conv2d_5 (Conv2D)               (None, 32, 32, 64)   1792        d_input[0][0]                    
__________________________________________________________________________________________________
leaky_re_lu_1 (LeakyReLU)       (None, 32, 32, 64)   0           conv2d_5[0][0]                   
__________________________________________________________________________________________________
dropout_1 (Dropout)             (None, 32, 32, 64)   0           leaky_re_lu_1[0][0]              
____________________________________________________________________________________________

In [20]:
# The combined model  (stacked generator and discriminator)
# Trains the generator to fool the discriminator
combined = Model([z, cond_input], valid)
combined.compile(loss="binary_crossentropy", optimizer=optimizer_g)

# Train

In [35]:
# Epochs and Batch Size
EPOCHS = 5000
BATCH_SIZE = 26

In [36]:
import time
import pandas as pd

def train(epochs=EPOCHS, batch_size=BATCH_SIZE, save_interval=20, latent_dim=LATENT_DIM):
    # load dataset
    (X_train, Captions, X_test, Captions_test, Labels) = (
        image_train,
        caption_train,
        image_test,
        caption_test,
        numbers_train,
    )

    caption_list_train = []
    caption_list_test = []
    
    for caption in Captions:
        caption_list_train.append([str(caption)])
    for caption in Captions_test:
        caption_list_test.append([str(caption)])
        
    df = pd.DataFrame(caption_list_train, columns=["caption"])
    df.to_csv("./saved_model/caption_train.csv")
    df = pd.DataFrame(caption_list_test, columns=["caption"])
    df.to_csv("./saved_model/caption_test.csv")

    # Adversarial ground truths
    valid = np.ones((batch_size, 1))
    fake = np.zeros((batch_size, 1))

    batch_count = int(X_train.shape[0] / batch_size)
    history = []
    history_test = []

    for epoch in range(epochs):
        for batch_index in range(batch_count):
            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random half of images
            # idx = np.random.randint(0, X_train.shape[0], batch_size)
            imgs = X_train[batch_index * batch_size : (batch_index + 1) * batch_size]
            texts_input = Captions[
                batch_index * batch_size : (batch_index + 1) * batch_size
            ]
            
            texts = vectorize_sent_list(texts_input)

            # Sample noise and generate a batch of new images
            noise = np.random.normal(0, 1, (batch_size, latent_dim))
            gen_imgs = generator.predict([noise, texts])

            # Train the discriminator (real classified as ones and generated as zeros)
            start = time.time()
            d_loss_real = discriminator.train_on_batch([imgs, texts], valid)
            d_loss_fake = discriminator.train_on_batch([gen_imgs, texts], fake)
            batch_time_d = time.time() - start
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            # Train the generator (wants discriminator to mistake images as real)
            start = time.time()
            g_loss = combined.train_on_batch([noise, texts], valid)
            batch_time_g = time.time() - start

            # Plot the progress
            batch_time = batch_time_d + batch_time_g
            print(
                "%d-%d [D loss: %f, acc.: %.2f%%] [G loss: %f] [Time: %f]"
                % (epoch, batch_index, d_loss[0], 100 * d_loss[1], g_loss, batch_time)
            )
            history.append(
                [epoch, batch_index, d_loss[0], 100 * d_loss[1], g_loss, batch_time]
            )

        # Test the model
        texts_test = vectorize_sent_list(Captions_test)
        noise_test = np.random.normal(0, 1, (batch_size, latent_dim))
        gen_imgs_test = generator.predict([noise_test, texts_test])
        start = time.time()
        d_loss_real_test = discriminator.test_on_batch([X_test, texts_test], valid)
        d_loss_fake_test = discriminator.test_on_batch(
            [gen_imgs_test, texts_test], fake
        )
        batch_time_d_test = time.time() - start
        d_loss_test = 0.5 * np.add(d_loss_real_test, d_loss_fake_test)
        start = time.time()
        g_loss_test = combined.test_on_batch([noise_test, texts_test], valid)
        batch_time_g_test = time.time() - start

        # Plot the test progress
        batch_time_test = batch_time_d_test + batch_time_g_test
        print(
            "%d (test) [D loss: %f, acc.: %.2f%%] [G loss: %f] [Time: %f]"
            % (
                epoch,
                d_loss_test[0],
                100 * d_loss_test[1],
                g_loss_test,
                batch_time_test,
            )
        )
        history_test.append(
            [epoch, d_loss_test[0], 100 * d_loss_test[1], g_loss_test, batch_time_test]
        )

        # If at save interval => save generated image samples & training weights
        if epoch % save_interval == 0:
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            texts_input = Captions[idx]
            texts = vectorize_sent_list(texts_input)
            save_imgs(epoch, texts)

            generator.save_weights(
                filepath="./saved_model/generator_weights_" + str(epoch) + ".h5"
            )
            discriminator.save_weights(
                filepath="./saved_model/discriminator_weights_" + str(epoch) + ".h5"
            )

    # save weights & history
    df_train = pd.DataFrame(
        history, columns=["epoch", "batch", "d_loss", "acc", "g_loss", "time[sec]"]
    )
    df_train.to_csv("./saved_model/history.csv")
    df_test = pd.DataFrame(
        history_test, columns=["epoch", "d_loss", "acc", "g_loss", "time[sec]"]
    )
    df_test.to_csv("./saved_model/history_test.csv")
    generator.save_weights(filepath="./saved_model/generator_weights.h5")
    discriminator.save_weights(filepath="./saved_model/discriminator_weights.h5")

In [37]:
def save_imgs(epoch, texts, batch_size=BATCH_SIZE, latent_dim = LATENT_DIM):
    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    if batch_size == 260: # 260 is dataset size
        texts = vectorize_sent_list(texts)
    gen_imgs = generator.predict([noise, texts])
    gen_img = combine_normalized_images(gen_imgs)
    img_from_normalized_img(gen_img).save("images/snapshot/%d.png" % epoch)

In [38]:
def load_model(
    gen_path="./saved_model/generator_weights.h5",
    dis_path="./saved_model/discriminator_weights.h5",
):
    """
    Function: load_model  
    This function loads a pre-trained model.  

    Input: model_dir_path: designate where weights file is.  
    Output: None (pre-trained model will be loaded.)
    """

    ### load weights
    generator.load_weights(gen_path)
    discriminator.load_weights(dis_path)

In [39]:
def generate_image_from_text(text, flag=True):
    ### prepare an empty array
    noise = np.zeros(shape=(1, LATENT_DIM))
    encoded_text = np.zeros(shape=(1, EMBEDDING_DIM))

    ### generate sample for input data
    encoded_text[0, :] = vectorize_sentence(text)
    noise[0, :] = np.random.uniform(0, 1, LATENT_DIM)

    ### predict and generate an image
    generated_images = generator.predict([noise, encoded_text])
    generated_image = generated_images[0]

    if flag is True:
        generated_image = generated_image * 127.5 + 127.5
        return Image.fromarray(generated_image.astype(np.uint8))
    elif flag is not True:
        return generated_image

In [40]:
import math
from PIL import Image


def combine_normalized_images(generated_images):
    num = generated_images.shape[0]
    width = int(math.sqrt(num))
    height = int(math.ceil(float(num) / width))
    shape = generated_images.shape[1:]
    image = np.zeros(
        (height * shape[0], width * shape[1], shape[2]), dtype=generated_images.dtype
    )
    for index, img in enumerate(generated_images):
        i = int(index / width)
        j = index % width
        image[
            i * shape[0] : (i + 1) * shape[0], j * shape[1] : (j + 1) * shape[1], :
        ] = img
    return image


def img_from_normalized_img(normalized_img):
    image = normalized_img * 127.5 + 127.5
    return Image.fromarray(image.astype(np.uint8))


def generate_mode():
    img_size = (64, 64, 3)
    img_path = "./emoji/edited/emoji_64x64/"
    txt_path = "./emoji/description/detailed"
    glove_path = "./utils/glove.6B.300d.txt"

    load_model()

    iteration = 0
    caption_list = []
    print("Generating images...")
    for image, caption in zip(X_train, Captions):
        edited_image = image * 127.5 + 127.5
        edited_image = Image.fromarray(edited_image.astype(np.uint8))
        edited_image.save("./images/original/" + str(iteration) + ".png")
        generated_image = generate_image_from_text(caption)
        generated_image.save("./images/output/" + str(iteration) + ".png")
        caption_list.append([str(caption)])
        iteration += 1

    df = pd.DataFrame(caption_list, columns=["caption"])
    df.to_csv("./images/caption.csv")

    # plot all emojis
    save_imgs(epoch=5000, texts=Captions, batch_size=260)
    print("Done!")


def train_mode():
    img_path = "./emoji/edited/emoji_64x64/"
    txt_path = "./emoji/description/detailed"
    glove_path = "./utils/glove.6B.300d.txt"

    train(epochs=EPOCHS, batch_size=BATCH_SIZE, save_interval=50)

In [None]:
train_mode()

0-0 [D loss: 1.073027, acc.: 26.92%] [G loss: 1.047645] [Time: 4.654494]
0-1 [D loss: 1.045501, acc.: 32.69%] [G loss: 1.007440] [Time: 1.244125]
0-2 [D loss: 1.249445, acc.: 30.77%] [G loss: 0.939642] [Time: 1.242951]
0-3 [D loss: 1.013429, acc.: 51.92%] [G loss: 0.977450] [Time: 1.284915]
0-4 [D loss: 1.081573, acc.: 28.85%] [G loss: 1.172710] [Time: 1.240307]
0-5 [D loss: 1.069229, acc.: 30.77%] [G loss: 1.108414] [Time: 1.250738]
0-6 [D loss: 1.313168, acc.: 13.46%] [G loss: 1.097589] [Time: 1.239226]
0-7 [D loss: 0.933174, acc.: 40.38%] [G loss: 0.988986] [Time: 1.249938]
0-8 [D loss: 1.158534, acc.: 28.85%] [G loss: 0.926158] [Time: 1.307086]
0 (test) [D loss: 0.847239, acc.: 50.00%] [G loss: 1.295125] [Time: 1.036617]
1-0 [D loss: 1.054996, acc.: 28.85%] [G loss: 0.958817] [Time: 1.244555]
1-1 [D loss: 1.452193, acc.: 5.77%] [G loss: 1.044287] [Time: 1.252266]
1-2 [D loss: 1.320516, acc.: 17.31%] [G loss: 1.067235] [Time: 1.316810]
1-3 [D loss: 1.051701, acc.: 42.31%] [G loss: 0