# Captchas

**see:** https://keras.io/examples/vision/captcha_ocr/<br>
**original:** https://colab.research.google.com/drive/1Olw2KMHfPlnGaYuzffl2zb6D1etlBGZf?usp=sharing<br>
**View Github version in Colab:** <a href="https://colab.research.google.com/github/KnollFrank/2captcha-worker-assistant-server/blob/master/captcha_ocr_trainAndSaveModel_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a><br>
**paper:** Simple and Easy: Transfer Learning-Based Attacks to Text CAPTCHA<br>

## Setup

In [None]:
import os
import numpy as np

from pathlib import Path

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers


In [None]:
class GoogleDriveManager:
  
  _googleDriveFolder = Path('/content/gdrive')
  _baseFolder = _googleDriveFolder / 'MyDrive/CAPTCHA/models/'

  @staticmethod
  def mount():
    from google.colab import drive
    drive.mount(str(GoogleDriveManager._googleDriveFolder))

  @staticmethod
  def uploadFolderToGoogleDrive(folder):
    !zip -r {folder}.zip {folder}/
    !cp {folder}.zip {GoogleDriveManager._baseFolder}

  @staticmethod
  def downloadFolderFromGoogleDrive(folder):
    !cp {GoogleDriveManager._baseFolder}/{folder}.zip .
    !rm -rf {folder}
    !unzip {folder}.zip


In [None]:
from PIL import Image, ImageDraw, ImageFont
import random
import string
from pathlib import Path


class CaptchaGenerator:

    characters = sorted(set(list(string.ascii_letters + string.digits)))
    captchaLength = 6

    def __init__(self, numCaptchas, dataDir):
        self.numCaptchas = numCaptchas
        self.dataDir = dataDir

    def createAndSaveCaptchas(self):
        self._prepareDataDir()
        for _ in range(self.numCaptchas):
            self._createAndSaveCaptcha()

    def _prepareDataDir(self):
        !rm -fr {self.dataDir}
        self.dataDir.mkdir(parents=True, exist_ok=True)

    def _createAndSaveCaptcha(self):
        captchaString = self._createCaptchaString()
        captcha = self._createCaptcha(captchaString)
        captcha.save(f"{str(self.dataDir)}/{captchaString}.jpeg")

    def _createCaptchaString(self):
        return ''.join(random.choice(CaptchaGenerator.characters) for _ in range(CaptchaGenerator.captchaLength))

    def _createCaptcha(self, word):
        image = Image.new("RGB", (360, 96), "#373737")
        draw = ImageDraw.Draw(image)
        font = ImageFont.truetype("ariali.ttf", size=40)
        draw.text((30, 10), word[0], font=font)
        draw.text((80, 30), word[1], font=font)
        draw.text((135, 10), word[2], font=font)
        draw.text((190, 30), word[3], font=font)
        draw.text((250, 10), word[4], font=font)
        draw.text((295, 30), word[5], font=font)
        return image


In [None]:
def getImagesAndLabels(dataDir):
    fileSuffix = ".jpeg"
    images = sorted(list(map(str, list(dataDir.glob("*" + fileSuffix)))))
    labels = [image.split(os.path.sep)[-1].split(fileSuffix)[0] for image in images]
    return images, labels


In [None]:
class CharNumConverter:

    def __init__(self, characters):
        self.char_to_num = layers.StringLookup(vocabulary=list(characters), mask_token=None)
        self.num_to_char = layers.StringLookup(
            vocabulary=self.char_to_num.get_vocabulary(),
            mask_token=None,
            invert=True)

In [None]:
class DataSplitter:

    def __init__(self, x, y):
        (self.x_train, self.y_train), (x_valid_test, y_valid_test) = DataSplitter._splitData(np.array(x), np.array(y), train_size=0.7)
        (self.x_valid, self.y_valid), (self.x_test, self.y_test) = DataSplitter._splitData(x_valid_test, y_valid_test, train_size=0.5)

    def getTrain(self):
        return (self.x_train, self.y_train)

    def getValid(self):
        return (self.x_valid, self.y_valid)

    def getTest(self):
        return (self.x_test, self.y_test)

    @staticmethod
    def _splitData(x, y, train_size=0.9, shuffle=True):
        size = len(x)
        indices = np.arange(size)
        if shuffle:
            np.random.shuffle(indices)
        train_samples = int(size * train_size)
        x_train, y_train = x[indices[:train_samples]], y[indices[:train_samples]]
        x_test, y_test = x[indices[train_samples:]], y[indices[train_samples:]]
        return (x_train, y_train), (x_test, y_test)


In [None]:
class DatasetFactory:
    
    def __init__(self, img_height, img_width, char_to_num, batch_size):
        self.img_height = img_height
        self.img_width = img_width
        self.char_to_num = char_to_num
        self.batch_size = batch_size

    def createDataset(self, x, y):
        dataset = tf.data.Dataset.from_tensor_slices((x, y))
        dataset = dataset.map(self._encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
        dataset = dataset.batch(self.batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
        return dataset

    def _encode_single_sample(self, img_path, label):
        img = tf.io.read_file(img_path)
        img = tf.io.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, [self.img_height, self.img_width])
        # Map the characters in label to numbers
        label = self.char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
        # Return a dict as our model is expecting two inputs
        return {"image": img, "label": label}


In [None]:
def getTrainValidationTestDatasets(dataDir, datasetFactory):
    images, labels = getImagesAndLabels(dataDir)
    print("Number of images found:", len(images))
    print("Characters:", CaptchaGenerator.characters)

    dataSplitter = DataSplitter(images, labels)
    
    return (
        datasetFactory.createDataset(*dataSplitter.getTrain()),
        datasetFactory.createDataset(*dataSplitter.getValid()),
        datasetFactory.createDataset(*dataSplitter.getTest())
        )

In [None]:
import matplotlib.pyplot as plt
import math

def displayImagesInGrid(numGridCols, images, titles, titleColors):
    assert len(images) == len(titles) == len(titleColors)
    images = [image.numpy().astype(np.uint8) for image in images]
    numGridRows = math.ceil(len(images) / numGridCols)
    _, axs = plt.subplots(numGridRows, numGridCols, figsize=(15, 5))
    for row in range(numGridRows):
        for col in range(numGridCols):
            ax = axs[row, col]
            ax.axis("off")
            i = row * numGridCols + col
            if(i < len(images)):
                ax.imshow(images[i])
                ax.set_title(titles[i], color=titleColors[i])
    plt.show()


In [None]:
def display16Predictions(model, dataset, predictionsDecoder):
    for batch in dataset.take(1):
        numPredictions2Display = 16
        batch_images = batch["image"][:numPredictions2Display]
        batch_labels = batch["label"][:numPredictions2Display]

        preds = model.predict(batch_images)
        pred_texts = predictionsDecoder.decode_batch_predictions(preds)
        orig_texts = predictionsDecoder.asStrings(batch_labels)

        displayImagesInGrid(
            4,
            batch_images,
            [f"Prediction/Truth: {pred_text}/{orig_text}" for (pred_text, orig_text) in zip(pred_texts, orig_texts)],
            ['green' if pred_text == orig_text else 'red' for (pred_text, orig_text) in zip(pred_texts, orig_texts)])

In [None]:
# see https://keras.io/guides/making_new_layers_and_models_via_subclassing/
class CTCLayer(layers.Layer):
    
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred


class ModelFactory:
    
    predictionModelInputLayerName = "image"
    predictionModelOutputLayerName = "dense2"

    def __init__(self, img_height, img_width, char_to_num):
        self.img_height = img_height
        self.img_width = img_width
        self.char_to_num = char_to_num

    # see https://www.tensorflow.org/api_docs/python/tf/keras/applications/resnet/ResNet101
    def createResNet101(self):
        return self._createModel(
            baseModelFactory = lambda input_tensor: tf.keras.applications.resnet.ResNet101(
                input_tensor = input_tensor,
                weights = 'imagenet',
                include_top = False),
            preprocess_input = tf.keras.applications.resnet.preprocess_input,
            name = 'ResNet101')

    def createMobileNetV2(self):
        return self._createModel(
            baseModelFactory = lambda input_tensor: tf.keras.applications.MobileNetV2(
                input_tensor = input_tensor,
                weights = 'imagenet',
                include_top = False),
            preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input,
            name = 'MobileNetV2')

    def createMobileNetV3Small(self):
        return self._createModel(
            baseModelFactory = lambda input_tensor: tf.keras.applications.MobileNetV3Small(
                input_tensor = input_tensor,
                minimalistic = True,
                weights = 'imagenet',
                include_top = False),
            preprocess_input = tf.keras.applications.mobilenet_v3.preprocess_input,
            name = 'MobileNetV3Small')
            
    @staticmethod
    def createPredictionModel(model):
        return keras.models.Model(
            model.get_layer(name=ModelFactory.predictionModelInputLayerName).input,
            model.get_layer(name=ModelFactory.predictionModelOutputLayerName).output)

    def _createModel(self, baseModelFactory, preprocess_input, name):
        # Inputs to the model
        input_image = layers.Input(
            shape=(self.img_height, self.img_width, 3),
            name=ModelFactory.predictionModelInputLayerName,
            dtype="float32")
        labels = layers.Input(name="label", shape=(None,), dtype="float32")
        
        image = preprocess_input(input_image)
        # Transpose the image because we want the time dimension to correspond to the width of the image.
        image = tf.keras.layers.Permute(dims=[2, 1, 3])(image)
        base_model = baseModelFactory(image)
        x = layers.Reshape(
            target_shape=(base_model.output_shape[1], base_model.output_shape[2] * base_model.output_shape[3]),
            name="reshape")(base_model.output)
        x = layers.Dense(64, activation="relu", name="dense1")(x)
        x = layers.Dropout(0.2)(x)

        # RNNs
        x = layers.Bidirectional(
            layers.LSTM(
                128,
                return_sequences=True,
                dropout=0.25,
                unroll=False,
                name="LSTM1"))(x)
        x = layers.Bidirectional(
            layers.LSTM(
                64,
                return_sequences=True,
                dropout=0.25,
                unroll=False,
                name="LSTM2"))(x)

        # Output layer
        x = layers.Dense(
            len(self.char_to_num.get_vocabulary()) + 1,
            activation="softmax",
            name=ModelFactory.predictionModelOutputLayerName)(x)

        # Add CTC layer for calculating CTC loss at each step
        output = CTCLayer(name="ctc_loss")(labels, x)

        model = keras.models.Model(
            inputs=[input_image, labels],
            outputs=output,
            name=name)
        # "The model is optimized by a stochastic gradient descent (SGD) strategy with an initial learning rate of 0.004, weight decay of 0.00004 and momentum of 0.9."
        # from tensorflow.keras.optimizers import SGD
        # model.compile(optimizer=SGD(learning_rate=0.004, "weight_decay=0.00004," momentum=0.9)
        model.compile(optimizer=keras.optimizers.Adam())
        return model


In [None]:
def printLayers(model):
    for i, layer in enumerate(model.layers):
        print(i, layer.name)


In [None]:
class PredictionsDecoder:

    def __init__(self, captchaLength, num_to_char):
        self.captchaLength = captchaLength
        self.num_to_char = num_to_char

    def decode_batch_predictions(self, pred):
        return self.asStrings(self.ctc_decode(pred))

    def ctc_decode(self, pred):
        input_len = np.ones(pred.shape[0]) * pred.shape[1]
        # Use greedy search. For complex tasks, you can use beam search
        return keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][:, :self.captchaLength]

    def asStrings(self, labels):
        return [self.asString(label) for label in labels]

    def asString(self, label):
        return tf.strings.reduce_join(self.num_to_char(label)).numpy().decode("utf-8")


In [None]:
class ModelDAO:

    def __init__(self, inColab):
        self.inColab = inColab

    def saveModel(self, model):
        !rm -rf {model.name}
        model.save(model.name)
        if self.inColab:
            GoogleDriveManager.uploadFolderToGoogleDrive(model.name)

    def loadModel(self, modelName):
        if self.inColab:
            GoogleDriveManager.downloadFolderFromGoogleDrive(modelName)
        return keras.models.load_model(modelName)


In [None]:
# FK-TODO: entferne die getAccuracy()-Methode. Implementiere stattdessen https://stackoverflow.com/questions/37657260/how-to-implement-custom-metric-in-keras oder https://keras.io/api/metrics/#custom-metrics
def getAccuracy(dataset, prediction_model, ctc_decode):
    accuracy = tf.keras.metrics.Accuracy()

    for batch in dataset:
        accuracy.update_state(batch["label"], ctc_decode(prediction_model.predict(batch["image"], verbose=0)))

    return accuracy.result().numpy()

## Preparation

In [None]:
inColab = 'google.colab' in str(get_ipython())

In [None]:
if inColab:
    GoogleDriveManager.mount()

In [None]:
if inColab:
  !cp {GoogleDriveManager._baseFolder}/captchas.zip .
  !unzip captchas.zip

In [None]:
modelDAO = ModelDAO(inColab)

In [None]:
charNumConverter = CharNumConverter(CaptchaGenerator.characters)

In [None]:
predictionsDecoder = PredictionsDecoder(CaptchaGenerator.captchaLength, charNumConverter.num_to_char)

In [None]:
(img_width, img_height) = (241, 62)

In [None]:
datasetFactory = DatasetFactory(img_height, img_width, charNumConverter.char_to_num, batch_size = 64)

## Create And Train Base Model

In [None]:
if inColab:
    !sudo apt install ttf-mscorefonts-installer
    !sudo fc-cache -f
    !fc-match Arial

In [None]:
# "We generate 200,000 images for base model pre-training"
captchaGenerator = CaptchaGenerator(
    numCaptchas = 200000, # 50, # 200000,
    dataDir = Path("captchas/generated/VAERS/"))

In [None]:
captchaGenerator.createAndSaveCaptchas()

In [None]:
train_dataset, validation_dataset, test_dataset = getTrainValidationTestDatasets(captchaGenerator.dataDir, datasetFactory)

In [None]:
for batch in train_dataset.take(1):
    numImages2Display = 16
    images = batch["image"][:numImages2Display]
    labels = batch["label"][:numImages2Display]
    displayImagesInGrid(4, images, predictionsDecoder.asStrings(labels), ['black'] * len(labels))

In [None]:
modelFactory = ModelFactory(img_height, img_width, charNumConverter.char_to_num)

In [None]:
model = modelFactory.createMobileNetV3Small()
model.summary()

In [None]:
# "the success rates became stable after the base-model training epochs exceeded 20"
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=20)


In [None]:
modelDAO.saveModel(model)

In [None]:
prediction_model = ModelFactory.createPredictionModel(model)
prediction_model.summary()


In [None]:
display16Predictions(prediction_model, test_dataset, predictionsDecoder)

In [None]:
getAccuracy(test_dataset, prediction_model, predictionsDecoder.ctc_decode)

## Transfer learning

In [None]:
# "we collected 1,500 real CAPTCHAs from the websites. Note that only 500 of them are used for fine-tuning, and another 1,000 are applied to calculate the test accuracy"
# FK-TODO: lade das pre-trainierte model und trainiere es mit 500 real-world-Daten aus dem Ordner captchas/VAERS/, die restlichen 540 (es sollten nach obigem Zitat aber 1,000 sein) sind dann die Test-Daten.
# see https://keras.io/guides/transfer_learning/
# see https://www.tensorflow.org/tutorials/images/transfer_learning


In [None]:
modelName, numTrainableLayers = 'MobileNetV3Small', 104
# modelName, numTrainableLayers = 'ResNet101', 348

In [None]:
model = modelDAO.loadModel(modelName)
model.summary(show_trainable=True)

In [None]:
# printLayers(model)

In [None]:
model.trainable = True
for layer in model.layers[:numTrainableLayers]:
    layer.trainable = False

In [None]:
model.summary(show_trainable=True)

In [None]:
train_dataset, validation_dataset, test_dataset = getTrainValidationTestDatasets(Path("captchas/VAERS/"), datasetFactory)

In [None]:
# "The model is optimized by a stochastic gradient descent (SGD) strategy with an initial learning rate of 0.004, weight decay of 0.00004 and momentum of 0.9."
from tensorflow.keras.optimizers import SGD
# model.compile(optimizer=SGD(learning_rate=0.0001, momentum=0.9))
model.compile(optimizer='adam')

# "Therefore, in our experiments, we chose 1 epoch for the fine-tuning stage."
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=20)


In [None]:
prediction_model = ModelFactory.createPredictionModel(model)
prediction_model.summary()

In [None]:
getAccuracy(test_dataset, prediction_model, predictionsDecoder.ctc_decode)

In [None]:
display16Predictions(prediction_model, test_dataset, predictionsDecoder)