In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import random
import string 
import tensorflow as tf
import cv2

In [None]:
!mkdir images

In [None]:
#parameters for generating captchas and pipeline 
letters     =string.ascii_letters
digits      =string.digits       
punctuation =string.punctuation
list(punctuation).remove("/")
vocab =list(letters + digits + punctuation)


In [None]:
print (vocab)

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', ':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_', '`', '{', '|', '}', '~']


# Generate CAPTCHA images

## Install Python's captcha library

In [None]:
!pip install captcha

Collecting captcha
  Downloading captcha-0.4-py3-none-any.whl (102 kB)
[?25l[K     |███▏                            | 10 kB 18.0 MB/s eta 0:00:01[K     |██████▍                         | 20 kB 15.0 MB/s eta 0:00:01[K     |█████████▋                      | 30 kB 9.6 MB/s eta 0:00:01[K     |████████████▉                   | 40 kB 8.1 MB/s eta 0:00:01[K     |████████████████                | 51 kB 4.0 MB/s eta 0:00:01[K     |███████████████████▏            | 61 kB 4.7 MB/s eta 0:00:01[K     |██████████████████████▍         | 71 kB 5.3 MB/s eta 0:00:01[K     |█████████████████████████▋      | 81 kB 5.1 MB/s eta 0:00:01[K     |████████████████████████████▊   | 92 kB 5.6 MB/s eta 0:00:01[K     |████████████████████████████████| 102 kB 6.2 MB/s eta 0:00:01[K     |████████████████████████████████| 102 kB 6.2 MB/s 
Installing collected packages: captcha
Successfully installed captcha-0.4


In [None]:
from captcha.image import ImageCaptcha

## CAPTCHA images 

 the function generates images  include 5 characters :
 - letters and numbers
 -  symbols and punctuations (optional) 

input:

- ratio  

    - letters : numbers : symbols
- split percentage 

output:

- Dictionary { keys -> train + validate + test , values -> list [ image path' , tag ] }


In [None]:
def CAPTCHAsgenerating (datasize: int , imgsize : tuple , ratio :tuple ,tr_s :float ):
  v_s = float((1-tr_s)/2)
  data_dict = {'tr':[],'val':[],'ts':[]}
  captcha =[]
  image = ImageCaptcha(width = imgsize[0] , height = imgsize[1] )
  for i in range(datasize):
    letter = [random.choice(letters) for i in range(ratio[0]) ]
    number = [random.choice(digits) for i in range(ratio[1]) ]
    symbol = [random.choice(punctuation) for i in range(ratio[2]) if not '/']
    captcha_text = [str(elem) for elem in letter+number+symbol ] 
    random.shuffle( captcha_text) 
    captcha_text = ''.join(captcha_text)
    path = '/content/images/'+captcha_text+'.png'
    captcha.append ([path,captcha_text])
    data = image.generate(captcha_text) 
    image.write(captcha_text, path )
    tr, val , ts = np.split(captcha, [int(len(captcha)*tr_s), int(len(captcha)*(1-v_s))])
    data_dict['tr'].extend(tr)
    data_dict['val'].extend(val)
    data_dict['ts'].extend(ts)
    
  return data_dict

In [None]:
data_dict= CAPTCHAsgenerating(datasize= 50, imgsize =(280 , 90) , ratio = (3,2,1) ,tr_s =0.8 )

In [None]:
print (data_dict['val'])

[array(['/content/images/G6M8i.png', 'G6M8i'], dtype='<U25'), array(['/content/images/bCp23.png', 'bCp23'], dtype='<U25'), array(['/content/images/09ehp.png', '09ehp'], dtype='<U25'), array(['/content/images/Jb07t.png', 'Jb07t'], dtype='<U25'), array(['/content/images/j0au0.png', 'j0au0'], dtype='<U25'), array(['/content/images/j0au0.png', 'j0au0'], dtype='<U25'), array(['/content/images/u3FD0.png', 'u3FD0'], dtype='<U25'), array(['/content/images/y3gz2.png', 'y3gz2'], dtype='<U25'), array(['/content/images/x1fJ1.png', 'x1fJ1'], dtype='<U25'), array(['/content/images/U98IE.png', 'U98IE'], dtype='<U25'), array(['/content/images/U98IE.png', 'U98IE'], dtype='<U25'), array(['/content/images/B40fZ.png', 'B40fZ'], dtype='<U25'), array(['/content/images/B40fZ.png', 'B40fZ'], dtype='<U25'), array(['/content/images/33Bhi.png', '33Bhi'], dtype='<U25'), array(['/content/images/33Bhi.png', '33Bhi'], dtype='<U25'), array(['/content/images/L15YT.png', 'L15YT'], dtype='<U25'), array(['/content/images

## Data Pipeline

In [None]:
class pipeline(tf.keras.utils.Sequence):
  def __init__(self,input_x,labels,vocab,resize_shape,batch_size,shuffle=True):
    self.x = input_x  # pipeline input 
    self.y = labels   # pipeline output
    self.resize_shape = resize_shape
    # The pipeline needs to take ''' batch size ( 8 examples , 16 examples, 32 example, 48 example)
    # and shuffle paremeter [ true - false ] to shuffle or not shuffle the data
    self.vocab      = vocab
    self.batch_size = batch_size  
    self.shuffle    = shuffle 

    self.on_epoch_end()

  def __len__(self):
    # This function determines the number of batches
    return int(np.floor(len(self.y) / self.batch_size))

  def __getitem__(self, index):
    # Get the current batch 
    indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
    X, y = self.__get_batch(indexes)
    return {'image':X,'label':y}

  def on_epoch_end(self):
    self.indexes = np.arange(len(self.x))
    if self.shuffle == True:
      np.random.shuffle(self.indexes)

  def __read_image(self,path):
    ''' a function that reads the image and do resizing changes the color convention'''
    img=cv2.imread(path)
    dim = self.resize_shape[0:2]
    img = cv2.cvtColor(img,cv2.COLOR_RGB2GRAY)
    img = img.astype('float32')

    img = cv2.resize(img,dim)
    #img = np.transpose(img, (1, 0, 2)) #img.transpose(1, 0, 2)
    return img
    
  def __get_label(self,label_string):
    ''' a function that converts string to the class number '''
    # Creating a lookup layer with a known vocabulary
    layer = tf.keras.layers.StringLookup(vocabulary=self.vocab)
    return layer(tf.strings.unicode_split(label_string, input_encoding="UTF-8"))

  def __get_batch(self, list_IDs_temp):
    '''
    Does the following three main things:
    1- Create two arrays for input and output with correct shapes
    '''
    X = np.empty((self.batch_size, self.resize_shape[0],self.resize_shape[1]),dtype=np.float32)
    y = np.empty((self.batch_size, 5))

    # Generate data
    for i, ID in enumerate(list_IDs_temp):
      # Get expression
      X[i,:,:] = self.__read_image(self.x[ID]).T

      # store label
      y[i,]    = self.__get_label(self.y[ID])

    return X, y

### Build pipeline

In [None]:
train_x= np.array( np.array( data_dict['tr' ] ) [:,0] )
train_y= np.array( np.array( data_dict['tr' ] ) [:,1] )
test_x = np.array( np.array( data_dict['ts' ] ) [:,0] )
test_y = np.array( np.array( data_dict['ts' ] ) [:,1] )
val_x  = np.array( np.array( data_dict['val'] ) [:,0] )
val_y  = np.array( np.array( data_dict['val'] ) [:,1] )

In [None]:
resize_shape= (180,45,1)
batch_size = 32
epochs     = 60

train_generator = pipeline(input_x=train_x,labels = train_y,vocab=vocab
                           ,resize_shape = resize_shape ,batch_size=batch_size)

validation_generator = pipeline(input_x = val_x , labels = val_y,vocab=vocab
                           ,resize_shape = resize_shape ,batch_size=batch_size)

test_generator = pipeline(input_x = test_x , labels = test_y,vocab=vocab
                           ,resize_shape = resize_shape ,batch_size=batch_size)

## Model Design

Convoluional Base Block 

  - Input image shape: (200,50,1)
  - 2 CNN layers with the following set of filters [ 32,64 ]
  - Each cnn is followed by a maxpooling layer 
  - Flatten layer at the end 
  - 2 Bi-directional LSTM layers [128,64] 
  - Output Layer 

In [None]:
class Evaluation(tf.keras.callbacks.Callback):
  def __init__(self, val_data_gen, val_labels, test_data_gen, test_labels,multi=True):
    super(tf.keras.callbacks.Callback, self).__init__()
    self.test_data   = test_data_gen
    self.val_labels  = val_labels
    self.val_data    = val_data_gen
    self.test_labels = test_labels

    self.max_length  =  max([len(val_labels) for label in val_labels])

    # Mapping integers back to original characters
    self.num_to_char = tf.keras.layers.StringLookup(
    vocabulary = vocab, mask_token=None, invert=True
    )

    if multi == True:
      self.param = 'ovr'
    else:
      self.param = 'raise'

  def decode_batch_predictions(self,pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search
    results = tf.keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
        :, :self.max_length
    ]
    # Iterate over the results and get back the text
    output_text = []
    for res in results:
        res = tf.strings.reduce_join(self.num_to_char(res)).numpy().decode("utf-8")
        output_text.append(res)
    return output_text
  
  def text_recall(self,preds,labels):
    compare = [int(i==j) for i,j in zip(preds,labels)]

    return sum(compare)/len(compare)

  def on_epoch_end(self, epoch, logs=None):
    y_preds  = self.model.predict(self.val_data)
    pred_text= self.decode_batch_predictions(y_preds)

    print('\n')
    print(' | val_Recall : {:.02f} %'.format(self.text_recall(pred_text,self.val_labels)))

    y_preds  = self.model.predict(self.test_data)
    pred_text= self.decode_batch_predictions(y_preds)

    print(' | test_Recall: {:.02f} %'.format(self.text_recall(pred_text,self.test_labels)))

class CTCLayer(tf.keras.layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = tf.keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred

def create_model(shape,n_fltr=32,n_nds_dns=16,n_nds_lstm=128):
  # CNN layers 
  
  ## Encoding part using a convolutional neural network base 

  image_width,image_height = shape[0],shape[1]

  # 1 - Adding two input layers [ one for the input image, the other for input label ]
  input_image = tf.keras.layers.Input(shape=(image_width,image_height,1),name='image',dtype="float32")
  input_label = tf.keras.layers.Input(name="label",shape=(None,))

  cnn_layer_1 = tf.keras.layers.Conv2D(
        32,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv1",
    )(input_image)

  cnn_layer_1 = tf.keras.layers.MaxPooling2D((2, 2), name="pool1")(cnn_layer_1)

  cnn_layer_2 = tf.keras.layers.Conv2D(
        64,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv2",
    )(cnn_layer_1)

  cnn_layer_2 = tf.keras.layers.MaxPooling2D((2, 2), name="pool2")(cnn_layer_2)

  # 2 - Flatten layer
  new_shape = ((image_width // 4), (image_height // 4) * 64)
  x = tf.keras.layers.Reshape(target_shape=new_shape, name="reshape")(cnn_layer_2)
  x = tf.keras.layers.Dense(64, activation="relu", name="dense1")(x)
  x = tf.keras.layers.Dropout(0.2)(x)

  ## Decoding part 
  # LSTM [ upgraded RNN layer ]
  bi_lstm1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(n_nds_lstm, return_sequences=True))(x)
  bi_lstm2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(n_nds_lstm//2, return_sequences=True))(bi_lstm1)

  ## Output layer: the number of output nodes is equal to the number of characters in the vocab
  out_layer = tf.keras.layers.Dense(len(vocab) + 1,activation="softmax",name="output")(bi_lstm2)

  ## CTC layer to map back the model predictions to characters
  decode_output = CTCLayer(name="Decodinglayer")(input_label,out_layer)

  model = tf.keras.models.Model(
      inputs=[input_image, input_label], outputs=decode_output, name="Captcha_model"
  )

  # Optimizer
  opt = tf.keras.optimizers.Adam()
  # Compile the model and return
  model.compile(optimizer=opt)
  return model


In [None]:
epochs = 500
early_stopping_patience = 50
# Add early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True)

# Train the model
model = create_model(shape=resize_shape)

evaluator = Evaluation(validation_generator, val_y , test_generator, test_y,multi=True)

history = model.fit(
    train_generator,
    validation_data=validation_generator,
    epochs=epochs,
    callbacks=[early_stopping,evaluator],)

Epoch 1/500


# Test Model

In [None]:
# Get the prediction model by extracting layers till the output layer
model = tf.keras.models.Model(
    model.get_layer(name="image").input, model.get_layer(name="output").output
)

num_to_char = tf.keras.layers.StringLookup(vocabulary = vocab, mask_token=None, invert=True)


# A utility function to decode the output of the network
def decode_batch_predictions(pred,vocab,max_length):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search
    results = tf.keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
        :, :max_length
    ]
    # Iterate over the results and get back the text
    num_to_char = tf.keras.layers.StringLookup(
    vocabulary = vocab, mask_token=None, invert=True
    )
    output_text = []
    for res in results:
        res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
        output_text.append(res)
    return output_text


#  Let's check results on some validation samples


preds = model.predict(test_generator)
pred_texts = decode_batch_predictions(preds,vocab=vocab,max_length=5)

def text_recall(preds,labels):
  compare = [int(i==j) for i,j in zip(preds,labels)]

  return sum(compare)/len(compare)


print("Test Recall: ",sum([int(i==j) for i,j in zip(pred_texts,test_y)])/len(test_y))