In [1]:
import random
import os
import glob
import numpy as np
import matplotlib.pyplot as plt
import PIL
import tqdm
from pathlib import Path
from collections import Counter
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split

from sys import getsizeof

In [3]:
data_dir = "/kaggle/input/captchanumberdataset/captcha_number_dataset"

# Get list of all the images that end with .jpg
image_paths = sorted(list(glob.glob(f"{data_dir}/*.png")))
print(len(image_paths))
image_paths[0]

In [4]:
def label_from_img_path(img_path):
    return os.path.splitext(os.path.basename(img_path))[0]

image_paths[230], label_from_img_path(image_paths[230])

In [5]:
labels = [label_from_img_path(img) for img in image_paths]
#checks which characters are in the dataset
characters = set(char for label in labels for char in label)
CHARACTERS = sorted(list(characters))

print("Number of images found: ", len(image_paths))
print("Random image: ", random.choice(image_paths))
print("Number of labels found: ", len(labels))
print("Random label: ", random.choice(labels))
print("sorted charecters set: ", CHARACTERS)
NUM_CHAR = len(CHARACTERS)
print("number of charecters: ", NUM_CHAR)

In [6]:
# Maximum length of any captcha in the dataset
max_length = max([len(label) for label in labels])

print ("the maximum length of any captcha", max_length)

# minimum length of any captcha in the dataset
min_length = min([len(label) for label in labels])

print ("the minimum length of any captcha", min_length)

if (max_length == min_length):
    print ("all the cpatchas are 5 letters long")
else:
    print ("the captchats are of diffrent length")
#now we know that all the cpatchas are 5 letters long

In [7]:
CHAR_PER_LABEL = 5
label_char_count = np.array([len(label) for label in labels])
(label_char_count == CHAR_PER_LABEL).all()

In [8]:
def char_to_one_hot(char: str):
    out = np.zeros(len(CHARACTERS))
    idx = CHARACTERS.index(char)
    out[idx] = 1
    return out


def one_hot(characters: str):
    return np.hstack([char_to_one_hot(c) for c in characters]).astype('uint8')

In [11]:
assert len(char_to_one_hot('0')) == NUM_CHAR
assert char_to_one_hot('1').sum() == 1

test_string = '01234'
assert len(one_hot(test_string)) == NUM_CHAR * len(test_string)

In [12]:
def one_hot_to_char(x: np.array):
    y = np.array(x)
    y = y.squeeze()
    assert len(y) == NUM_CHAR
    idx = np.argmax(y)
    return(CHARACTERS[idx])


def one_hot_to_label(x):
    y = np.array(x)
    y = y.squeeze()
    label_list = []
    assert len(y) == len(CHARACTERS * CHAR_PER_LABEL)
    for i in range(0, CHAR_PER_LABEL):
        start = i * NUM_CHAR
        end = start + NUM_CHAR
        label_list.append(one_hot_to_char(y[start: end]))
    return "".join(label_list)

In [13]:
test_string = "34532"
assert one_hot_to_label(one_hot(test_string)) == test_string

## Data handling

In [14]:
images = [np.array(PIL.Image.open(img_path).convert('L')).astype('uint8') for img_path in tqdm.tqdm(image_paths)]
images_arr = np.array(images)
images_arr.shape

In [15]:
labels_one_hot = np.array([one_hot(label) for label in tqdm.tqdm(labels)])
getsizeof(labels_one_hot)/1e6

In [16]:
getsizeof(images_arr)/1e6, getsizeof(labels_one_hot)/1e6

In [17]:
from sklearn.model_selection import train_test_split

N = None

train_X, val_X, train_y, val_y = train_test_split(images_arr[:N, ...], labels_one_hot[:N, ...], test_size=0.2, random_state=12345)
train_X.shape, val_X.shape, train_y.shape, val_y.shape

In [18]:
del images_arr
del labels_one_hot

In [19]:
for i in [576, 1023, 536]:
    img = train_X[i, ...].squeeze()
    plt.figure()
    plt.imshow(img)
    plt.title(one_hot_to_label(train_y[i, ...].squeeze()))
plt.show()

In [20]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
train_datagen = ImageDataGenerator(rescale=1./255)
val_datagen = ImageDataGenerator(rescale=1./255)

In [22]:
def my_loss(y_true, y_pred):
    tot = 0.0
    for i in range(CHAR_PER_LABEL):
        start = i * NUM_CHAR
        end = start + NUM_CHAR
        tot += tf.nn.softmax_cross_entropy_with_logits(y_true[:, start:end], y_pred[:, start:end], axis=-1)
    return tot

def score_2(y_true, y_pred):
    hits = []
    for i in range(0, CHAR_PER_LABEL):
        start = i * NUM_CHAR
        end = start + NUM_CHAR
        idx = tf.math.argmax(y_pred[:, start:end], axis=1)
        hits.append([y_true[:, start:end][i, index] for i, index in enumerate(idx.numpy())])
    return tf.math.reduce_sum(tf.convert_to_tensor(hits), axis=0)/CHAR_PER_LABEL

def score_tf(y_true, y_pred):
    y_pred_mat = tf.reshape(y_pred, (-1, CHAR_PER_LABEL, NUM_CHAR))
    idx = tf.math.argmax(y_pred_mat, axis=-1)
    
    y_true_mat = tf.cast(tf.reshape(y_true, (-1, CHAR_PER_LABEL, NUM_CHAR)), 'float32')
    return tf.math.reduce_mean(tf.math.reduce_sum(tf.one_hot(idx, NUM_CHAR) * y_true_mat, axis=-1), axis=-1) 

def score_np(y_true, y_pred):
    y_pred_mat = np.reshape(y_pred, (-1, CHAR_PER_LABEL, NUM_CHAR))
    idx = y_pred_mat.argmax(axis=-1)
    
    y_true_mat = np.reshape(y_true, (-1, CHAR_PER_LABEL, NUM_CHAR))
    return (tf.one_hot(idx, NUM_CHAR).numpy() * y_true_mat).sum(axis=-1).mean(axis=-1) 

#test

s1 = "12345"
s2 = "23456"

sp1 = "34567"  # 2/5
sp2 = "45678"  # 5/5

y_true = np.vstack([one_hot(s1), one_hot(s2)])

# lets 
y_pred = np.vstack([one_hot(sp1), one_hot(sp2)])
y_pred = np.clip(y_pred + 0.0*np.random.rand(*y_pred.shape), 0, 1)
y_true_tf = tf.convert_to_tensor(y_true)
y_pred_tf = tf.convert_to_tensor(y_pred)
y_true_tf.shape, y_pred_tf.shape
my_loss(y_true_tf, y_pred_tf)
print("tensorflow", score_2(y_true_tf, y_pred_tf), score_tf(y_true_tf, y_pred_tf))
print("numpy", score_np(y_true, y_pred))

In [23]:
from keras.callbacks import ReduceLROnPlateau

In [24]:
os.getcwd()
checkout_dir = 'model_checkout'
if checkout_dir not in os.listdir():
    os.mkdir('model_checkout')

In [27]:
model = tf.keras.models.Sequential([
    keras.Input(shape=(40, 150, 1)),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(pool_size=(1, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.5),
#     tf.keras.layers.GlobalAveragePooling2D(),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.75),
    tf.keras.layers.Dense(len(CHARACTERS) * 5)
])

model.summary()

In [28]:
batch_size = 16

In [29]:
model_name = f"cnn_{model.count_params()}_params"

# Define the learning rate schedule
lr = 1e-3
lr_str = str(lr).replace(".", "")
learning_rate_schedule = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5)
checkpoint = tf.keras.callbacks.ModelCheckpoint(checkout_dir + f'/{model_name}_batch_size_{batch_size}_init_lr_{lr_str}.weights.h5', save_best_only=True, save_weights_only=True)
# Compile and train your model
# run_eagerly=True is for score_2 to work
model.compile(optimizer=tf.keras.optimizers.Adam(lr), loss=my_loss, metrics=['accuracy', score_tf])

In [30]:
# Train the model with the learning rate schedule
history = model.fit(
    train_datagen.flow(train_X[..., np.newaxis], train_y, batch_size=batch_size),
    epochs=60,
    batch_size=batch_size,
    validation_data=val_datagen.flow(val_X[..., np.newaxis], val_y, batch_size=batch_size),
    callbacks=[learning_rate_schedule, checkpoint]
    
)

In [41]:
history.history.keys()

In [31]:
#Plot Results
_, ax = plt.subplots(1,2, figsize=(12,4))

ax[0].plot(history.history['score_tf'])
ax[0].plot(history.history['val_score_tf'])
ax[0].set_title('model accuracy')
ax[0].set_ylabel('mean score')
ax[0].set_xlabel('epoch')
ax[0].legend(['train', 'val'], loc='upper left')


ax[1].plot(history.history['loss'])
ax[1].plot(history.history['val_loss'])
ax[1].set_title('training loss')
ax[1].set_ylabel('loss')
ax[1].set_xlabel('epoch')
ax[1].legend(['train', 'val'], loc='upper left')
plt.show()

In [32]:
i = 0
y_pred = model.predict(val_X[0:1, ...]/255)

plt.figure()
plt.imshow(val_X[i, ...].squeeze())
plt.title(one_hot_to_label(val_y[i, ...].squeeze()))
print(one_hot_to_label(y_pred.squeeze()))

In [33]:
val_pred = model.predict(val_X/255)
print(val_pred.shape)

In [34]:
scores = score_np(val_y, val_pred)

In [35]:
counts, bins = np.histogram(scores, bins=np.arange(0, 1.2, 0.2))

In [36]:
plt.bar(x=bins[1:], height=counts/counts.sum(), width=0.15)
plt.title("score frequency")
plt.ylabel("frequency")
plt.xlabel("score")
plt.show()

In [38]:
loaded_model = tf.keras.models.Sequential([
    keras.Input(shape=(40, 150, 1)),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(pool_size=(1, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.5),
#     tf.keras.layers.GlobalAveragePooling2D(),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.75),
    tf.keras.layers.Dense(len(CHARACTERS) * 5)
])

weights_path = "/kaggle/working/model_checkout/cnn_13470514_params_batch_size_16_init_lr_0001.weights.h5"
loaded_model.load_weights(weights_path)

In [39]:
i = 0
y_pred = loaded_model.predict(val_X[0:1, ...]/255)

plt.figure()
plt.imshow(val_X[i, ...].squeeze())
plt.title(one_hot_to_label(val_y[i, ...].squeeze()))
print(one_hot_to_label(y_pred.squeeze()))