# Import

In [1]:
# You'll generate plots of attention in order to see which parts of an image
# our model focuses on during captioning
import matplotlib.pyplot as plt

# Scikit-learn includes many helpful utilities
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle
import tensorflow as tf
from tqdm import tqdm

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)



1 Physical GPUs, 1 Logical GPUs


# Path

In [89]:
IMAGE_DIR = './words_captcha/'
annotation_file = './words_captcha/spec_train_val.txt'
CHAR_NUM=26
TRAIN_NUM=100000

# Read annotation file

In [95]:
with open(annotation_file, 'r') as f:
    lines = f.readlines()
f.close()

img_name = []
annotation_list = []
test_name = []

for line in lines:
    line = line.strip('\n')
    line = line.split(' ')
    img_name.append(IMAGE_DIR+line[0]+'.png')
    annotation_list.append(line[1])
for i in range(120000, 140000):
    test_name.append('./words_captcha/a' + str(i)+'.png')

# Train Val split

In [97]:
# Select the first 30,000 captions from the shuffled set

train_name=img_name[:TRAIN_NUM]
val_name=img_name[TRAIN_NUM:]
train_annotation=annotation_list[:TRAIN_NUM]
val_annotation=annotation_list[TRAIN_NUM:]

# Start code: + End code: -

In [99]:
character_to_idx = {}
idx_to_character = {}
character_to_idx[' '] = 0
idx_to_character[0] = ' '
index = 1

for i in range(CHAR_NUM):
    character_to_idx[chr(ord('a') + i)] = index
    idx_to_character[index] = chr(ord('a') + i)
    index += 1
    
character_to_idx['+'] = 27
idx_to_character[27] = '+'
character_to_idx['-'] = 28
idx_to_character[28] = '-'

In [100]:
character_to_idx

{' ': 0,
 'a': 1,
 'b': 2,
 'c': 3,
 'd': 4,
 'e': 5,
 'f': 6,
 'g': 7,
 'h': 8,
 'i': 9,
 'j': 10,
 'k': 11,
 'l': 12,
 'm': 13,
 'n': 14,
 'o': 15,
 'p': 16,
 'q': 17,
 'r': 18,
 's': 19,
 't': 20,
 'u': 21,
 'v': 22,
 'w': 23,
 'x': 24,
 'y': 25,
 'z': 26,
 '+': 27,
 '-': 28}

In [101]:
idx_to_character

{0: ' ',
 1: 'a',
 2: 'b',
 3: 'c',
 4: 'd',
 5: 'e',
 6: 'f',
 7: 'g',
 8: 'h',
 9: 'i',
 10: 'j',
 11: 'k',
 12: 'l',
 13: 'm',
 14: 'n',
 15: 'o',
 16: 'p',
 17: 'q',
 18: 'r',
 19: 's',
 20: 't',
 21: 'u',
 22: 'v',
 23: 'w',
 24: 'x',
 25: 'y',
 26: 'z',
 27: '+',
 28: '-'}

In [126]:
def max_length(annotations):
    max_len = 0
    for annotation in annotations:
        if len(annotation) > max_len:
            max_len = len(annotation)
    return max_len
max_len = max_length(annotation_list) + 2
max_len

7

# Index word

In [103]:
train_annotation_idx = []
val_annotation_idx = []

for annotation in train_annotation:
    annotation_idx = [27]
    for char in annotation:
        annotation_idx.append(character_to_idx[char])
    annotation_idx.append(28)
    while len(annotation_idx) < max_len:
        annotation_idx.append(0)
    train_annotation_idx.append(annotation_idx)
    
for annotation in val_annotation:
    annotation_idx = [27]
    for char in annotation:
        annotation_idx.append(character_to_idx[char])
    annotation_idx.append(28)
    while len(annotation_idx) < max_len:
        annotation_idx.append(0)
    val_annotation_idx.append(annotation_idx)

In [104]:
train_annotation_idx[:5]

[[27, 20, 8, 21, 19, 28, 0],
 [27, 23, 23, 23, 28, 0, 0],
 [27, 20, 9, 5, 4, 28, 0],
 [27, 9, 4, 19, 28, 0, 0],
 [27, 10, 1, 13, 28, 0, 0]]

In [105]:
train_name[:5]

['./words_captcha/a0.png',
 './words_captcha/a1.png',
 './words_captcha/a2.png',
 './words_captcha/a3.png',
 './words_captcha/a4.png']

# Para

In [107]:
BATCH_SIZE = 40
BUFFER_SIZE = 5000
embedding_dim = 256
units = 512
vocab_size = len(char2idx)
train_steps = len(train_name) // BATCH_SIZE
val_steps = len(val_name) // BATCH_SIZE
# Shape of the vector extracted from InceptionV3 is (64, 2048)
# These two variables represent that vector shape
features_shape = 2048
attention_features_shape = 64

In [108]:
def load(image_path, annotation):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (300, 160))
    img = img/255 - 1.
    return img, annotation

# Dataset

In [109]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_name,train_annotation_idx))
train_dataset = train_dataset.map(load, num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

val_dataset = tf.data.Dataset.from_tensor_slices((val_name,val_annotation_idx))
val_dataset = val_dataset.map(load, num_parallel_calls=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.shuffle(BUFFER_SIZE)
val_dataset = val_dataset.batch(BATCH_SIZE)
val_dataset = val_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [139]:
def load_test(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (300, 160))
    img = img/255 - 1.
    return img

In [140]:
test_dataset = tf.data.Dataset.from_tensor_slices(test_name)
test_dataset = test_dataset.map(load_test, num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.batch(BATCH_SIZE)
test_dataset = test_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [35]:
train_dataset

<PrefetchDataset element_spec=(TensorSpec(shape=(None, 300, 160, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 7), dtype=tf.int32, name=None))>

# Model

In [110]:
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)

        # hidden shape == (batch_size, hidden_size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
        hidden_with_time_axis = tf.expand_dims(hidden, 1)

        # score shape == (batch_size, 64, hidden_size)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

        # attention_weights shape == (batch_size, 64, 1)
        # you get 1 at the last axis because you are applying score to self.V
        attention_weights = tf.nn.softmax(self.V(score), axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

# Use CNN similar to VGG16 to be Feature_Extracter

Using the inceptionV3 without fine tune would have bad results, accuracy=0.0005

Because of using VGG16 takes long time to train, I remove some layers

In [111]:
class conv_relu(tf.keras.layers.Layer):
    def __init__(self, filters, size, stride):
        super(conv_relu, self).__init__()
        self.conv = tf.keras.layers.Conv2D(filters, size, stride, padding="same",
                      kernel_initializer=tf.keras.initializers.TruncatedNormal())
        self.batchnorm = tf.keras.layers.BatchNormalization()
        self.lkrelu = tf.keras.layers.LeakyReLU(0.1)

    def call(self, inputs, training):
        x = self.conv(inputs)
        x = self.batchnorm(x,training = training)
        x = self.lkrelu(x)
        return x

In [112]:
class Feature_Extracter(tf.keras.Model):

    def __init__(self):
        super(Feature_Extracter, self).__init__()
        self.cr1 = conv_relu(64,3,1)
        self.cr2 = conv_relu(64,3,1)
        self.max_pooling1 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))
        self.cr3 = conv_relu(128,3,1)
        self.cr4 = conv_relu(128,3,1)
        self.max_pooling2 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))
        self.cr5 = conv_relu(256,3,1)
        self.cr6 = conv_relu(256,3,1)
        self.cr7 = conv_relu(256,3,1)
        self.max_pooling3 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))
        self.cr8 = conv_relu(512,3,1)
        self.cr9 = conv_relu(512,3,1)
        self.max_pooling4 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))
        self.cr11 = conv_relu(512,3,1)
        self.cr12 = conv_relu(512,3,1)

    def call(self, inputs, training):
        x = self.cr1(inputs,training)
        x = self.cr2(x,training)
        x = self.max_pooling1(x)
        x = self.cr3(x,training)
        x = self.cr4(x,training)
        x = self.max_pooling2(x)
        x = self.cr5(x,training)
        x = self.cr6(x,training)
        x = self.cr7(x,training)
        x = self.max_pooling3(x)
        x = self.cr8(x,training)
        x = self.cr9(x,training)
        x = self.max_pooling4(x)
        x = self.cr11(x,training)
        x = self.cr12(x,training)
        return x

In [113]:
class CNN_Encoder(tf.keras.Model):
    # Since you have already extracted the features and dumped it using pickle
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        # shape after fc == (batch_size, 64, embedding_dim)
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

In [114]:
feature_extracter = Feature_Extracter()
feature_extracter.build((None,300,160,3))
feature_extracter.summary()

Model: "feature__extracter_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv_relu_34 (conv_relu)    multiple                  2048      
                                                                 
 conv_relu_35 (conv_relu)    multiple                  37184     
                                                                 
 max_pooling2d_11 (MaxPoolin  multiple                 0         
 g2D)                                                            
                                                                 
 conv_relu_36 (conv_relu)    multiple                  74368     
                                                                 
 conv_relu_37 (conv_relu)    multiple                  148096    
                                                                 
 max_pooling2d_12 (MaxPoolin  multiple                 0         
 g2D)                                         

In [115]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        # defining attention as a separate model
        context_vector, attention_weights = self.attention(features, hidden)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # shape == (batch_size, max_length, hidden_size)
        x = self.fc1(output)

        # x shape == (batch_size * max_length, hidden_size)
        x = tf.reshape(x, (-1, x.shape[2]))

        # output shape == (batch_size * max_length, vocab)
        x = self.fc2(x)

        return x, state, attention_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

In [116]:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)

In [117]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    
    return tf.reduce_mean(loss_)

In [118]:
checkpoint_path = "./checkpoints/train"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=3)

In [119]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])

# Train

In [120]:
# adding this in a separate cell because if you run the training cell
# many times, the loss_plot array will be reset
loss_plot = []

In [121]:
@tf.function
def train_step(img_tensor, target):
    loss = 0

    # initializing the hidden state for each batch
    # because the captions are not related from image to image
    hidden = decoder.reset_state(batch_size=target.shape[0])

    dec_input = tf.expand_dims([character_to_idx['+']] * BATCH_SIZE, 1)

    with tf.GradientTape() as tape:
        features = feature_extracter(img_tensor,True)
        features = tf.reshape(features,(features.shape[0], -1, features.shape[3]))
        features = encoder(features)

        for i in range(1, target.shape[1]):
            # passing the features through the decoder
            predictions, hidden, _ = decoder(dec_input, features, hidden)

            loss += loss_function(target[:, i], predictions)

            # using teacher forcing
            dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = (loss / int(target.shape[1]))

    trainable_variables = feature_extracter.trainable_variables + encoder.trainable_variables + decoder.trainable_variables

    gradients = tape.gradient(loss, trainable_variables)

    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss, total_loss

In [132]:
from tqdm import tqdm

EPOCHS = 10

for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0
    #total_val_loss = 0

    for (batch, (img_tensor, target)) in tqdm(enumerate(train_dataset), total=train_steps):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss
    print ('Epoch {} Train Loss {:.6f}'.format(epoch + 1, total_loss/train_steps))
    loss_plot.append(total_loss / train_steps)

    ckpt_manager.save()
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

100%|██████████| 2500/2500 [26:07<00:00,  1.59it/s]


Epoch 2 Train Loss 0.022931
Time taken for 1 epoch 1568.3571164608002 sec



100%|██████████| 2500/2500 [26:23<00:00,  1.58it/s]


Epoch 3 Train Loss 0.019200
Time taken for 1 epoch 1584.1744413375854 sec



100%|██████████| 2500/2500 [25:58<00:00,  1.60it/s]


Epoch 4 Train Loss 0.013007
Time taken for 1 epoch 1558.6700956821442 sec



100%|██████████| 2500/2500 [25:55<00:00,  1.61it/s]


Epoch 5 Train Loss 0.011771
Time taken for 1 epoch 1556.1789474487305 sec



100%|██████████| 2500/2500 [25:54<00:00,  1.61it/s]


Epoch 6 Train Loss 0.009851
Time taken for 1 epoch 1554.8824553489685 sec



  3%|▎         | 79/2500 [00:55<28:27,  1.42it/s] 


KeyboardInterrupt: 

# Val

In [133]:
def val_step(img_tensor, target):
    val_loss = 0
    hidden = decoder.reset_state(batch_size=target.shape[0])
    dec_input = tf.expand_dims([char2idx['']]*BATCH_SIZE, 1)
    features = feature_extracter(img_tensor,False)
    features = tf.reshape(features,(features.shape[0], -1, features.shape[3]))
    features = encoder(features)
    result = np.full((BATCH_SIZE, 1), 27)
    for i in range(1, target.shape[1]):
        # passing the features through the decoder
        predictions, hidden, _ = decoder(dec_input, features, hidden)
        predicted_id = tf.argmax(predictions,axis=1).numpy()
        val_loss += loss_function(target[:, i], predictions)
        result = np.concatenate((result, predicted_id.reshape((BATCH_SIZE,1))), axis=1)
        dec_input = tf.expand_dims(predicted_id, 1)
    
    return val_loss, result

In [134]:
equal_num = 0
total_val_loss = 0
for (batch, (img_tensor, target)) in tqdm(enumerate(val_dataset), total=val_steps):
#     val_loss = 0
#     hidden = decoder.reset_state(batch_size=target.shape[0])
#     dec_input = tf.expand_dims([char2idx['']]*BATCH_SIZE, 1)
#     features = feature_extracter(img_tensor,False)
#     features = tf.reshape(features,(features.shape[0], -1, features.shape[3]))
#     features = encoder(features)
#     result = np.full((BATCH_SIZE, 1), 27)
#     for i in range(1, target.shape[1]):
#         # passing the features through the decoder
#         predictions, hidden, _ = decoder(dec_input, features, hidden)
#         predicted_id = tf.argmax(predictions,axis=1).numpy()
#         val_loss += loss_function(target[:, i], predictions)
#         result = np.concatenate((result, predicted_id.reshape((BATCH_SIZE,1))), axis=1)
#         dec_input = tf.expand_dims(predicted_id, 1)
    val_loss, result = val_step(img_tensor, target)
    target_array = target.numpy()
    total_val_loss += (val_loss / int(target.shape[1]))
    for i in range(BATCH_SIZE):
        for j in range(max_len):
            if result[i][j] == 28 and target_array[i][j] == 28:
                if (result[i][1:j] == target_array[i][1:j]).all():
                    equal_num+=1
                break
print ('Validation Accuracy {:.6f}, Validation Loss {:.6f}'.format(float(equal_num)/(20000.),total_val_loss/val_steps),end='\r')

100%|██████████| 500/500 [02:24<00:00,  3.45it/s]

Validation Accuracy 0.975700, Validation Loss 0.021711




Accuracy is higher than 0.9, job is done!

# Test

In [142]:
FILE_PATH='./Lab12-2_109065711.txt'
START_INDEX=120000

In [136]:
def test_txt(FILE_PATH, START_INDEX):
    num=0
    for batch, img_tensor in tqdm(enumerate(test_dataset), total = len(test_name)//BATCH_SIZE):
        hidden = decoder.reset_state(batch_size=BATCH_SIZE)
        dec_input = tf.expand_dims([char2idx['']]*BATCH_SIZE, 1)
        features = feature_extracter(img_tensor,False)
        features = tf.reshape(features,(features.shape[0], -1, features.shape[3]))
        features = encoder(features)
        result = np.full((BATCH_SIZE, 1), 27)
        for i in range(1, max_len):
            # passing the features through the decoder
            predictions, hidden, _ = decoder(dec_input, features, hidden)
            predicted_id = tf.argmax(predictions,axis=1).numpy()
            result = np.concatenate((result, predicted_id.reshape((BATCH_SIZE,1))), axis=1)
            dec_input = tf.expand_dims(predicted_id, 1)
        for i in range(BATCH_SIZE):
            output_str = ''
            for j in range(1,max_len):
                if result[i][j] == 28:
                    break
                else:
                    output_str = output_str + idx2char[result[i][j]]
            with open(FILE_PATH,'a') as f:
                f.write('a' + str(START_INDEX + num) + ' ' + output_str+'\n')
            f.close()
            num += 1
    print(num)

In [141]:
test_txt(FILE_PATH, START_INDEX)

100%|██████████| 500/500 [02:15<00:00,  3.69it/s]

20000



