# 경북대학교 수강신청 자동입력방지문자 인식

**제작:** [hw0603/Altius](http://altius.iptime.org)<br>
**생성일:** 2021/08/26<br>
**최근 수정일:** 2022/08/12<br>
**설명:** CNN, RNN, CTC loss를 사용하여 경북대학교 수강신청 페이지 자동입력방지문자 인식을 위한 OCR 모델을 구현합니다.

## 개요

Keras의 함수형 API를 활용하여 간단한 OCR 모델을 생성합니다.  
CNN과 RNN을 활용하는 것 외에도 새로운 레이어 클래스를 만들고, 이것을 CTC Loss 구현을 위한 "Endpoint Layer"로 사용하는 방법도 보여 줍니다.  
새로운 레이어를 만드는 방법에 관한 자세한 설명은 [이곳](https://keras.io/guides/making_new_layers_and_models_via_subclassing/)을 확인하세요.

## 런타임 설정
TensorFlow 학습을 위해 GPU가 필요합니다.  
런타임->런타임 유형 변경 메뉴에서 하드웨어 가속기를 GPU로 설정해 주세요  
(Tesla K80 << Tesla P4 <= Tesla T4 << Tesla P100 << Tesla V100)

In [None]:
# 할당된 GPU 확인
!nvidia-smi --query-gpu=gpu_name,driver_version,memory.total --format=csv

####[TIP] Colab 런타임 할당 해제 방지
장시간 대기 시 Colab 런타임 할당 해제를 방지하기 위해 다음 코드를 브라우저 콘솔에서 실행해 주세요

```
function ClickConnect(){
    console.log("코랩 연결 끊김 방지");
    document.querySelector("colab-toolbar-button#connect").click()
}
setInterval(ClickConnect, 60 * 1000)
```



## 구글 드라이브 마운트

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## 모듈 임포트

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
 
from pathlib import Path
from collections import Counter
 
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

## 학습 데이터 압축 해제 & 로드

In [None]:
!rm -rf /content/captcha_images
%cd "/content/drive/MyDrive/Colab Notebooks"
!unzip -qq captcha_images.zip -d /content/captcha_images

이 데이터 셋은 약 4000개의 라벨링된 `PNG` 이미지로 구성되어 있습니다.  
각 샘플에 대한 label은 문자로 구성되어 있으며, 파일명에서 확인할 수 있습니다.  
모델을 훈련시키기 위해 label의 문자를 숫자로 매핑할 것입니다. 이후, 예측을 위해 다시 숫자를 문자로 매핑합니다.  
이를 위해서 문자를 정수로, 정수를 문자로 매핑하는 두 개의 딕셔너리를 만들것입니다.

In [None]:
# 데이터 경로
data_dir = Path("/content/captcha_images")
 
# 모든 이미지들의 리스트 생성
images = sorted(list(map(str, list(data_dir.glob("*.png")))))
labels = [img.split(os.path.sep)[-1].split(".png")[0] for img in images]
characters = list(set(char for label in labels for char in label))
characters.sort()
 
print("Number of images found: ", len(images))
print("Number of labels found: ", len(labels))
print("Number of unique characters: ", len(characters))
print("Characters present: ", characters)
print("Characters: ", "".join(characters)) # 추론 시에 같은 characters 데이터가 있어야 함
 
# 학습과 검증에 사용될 batch size
batch_size = 16
 
# 이미지 크기 설정
img_width = 140
img_height = 35
 
# 이미지가 convolutional blocks에 의해 downsample되는 비율을 2로 설정할 것입니다.
# 우리는 2번의 convolutional blocks를 사용할 것이기 때문에
# 이미지는 한 변을 기준으로 4배 줄어듭니다.
downsample_factor = 4
 
# 라벨 중 가장 긴 것의 길이 구함
max_length = max([len(label) for label in labels])

## 데이터 전처리

In [None]:
# 문자를 숫자로 매핑
char_to_num = layers.StringLookup(
    vocabulary=list(characters), mask_token=None
)
 
# 숫자를 문자로 매핑
num_to_char = layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)
 
 
def split_data(images, labels, train_size=0.9, shuffle=True):
    # 1. Dataset의 전체 크기 구함
    size = len(images)
    # 2. Dataset의 인덱스를 담은 np.array 생성 (필요 시 셔플)
    indices = np.arange(size)
    if shuffle:
        np.random.shuffle(indices)
    # 3. 비율에 맞게 train set 사이즈 설정
    train_samples = int(size * train_size)
    # 4. train set과 validation set 분리
    x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]]
    x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]]
    return x_train, x_valid, y_train, y_valid
 
 
# 반환된 데이터 저장
x_train, x_valid, y_train, y_valid = split_data(np.array(images), np.array(labels))
 
 
def encode_single_sample(img_path, label):
    # 1. 이미지 로드
    img = tf.io.read_file(img_path)

    # 2. PNG 이미지 디코드 (원본 파일의 채널(RGBA 4채널) 그대로 사용)
    img = tf.io.decode_png(img, channels=0)

    # 3. R, G, B, A 채널을 분리한 후 A 채널만 추출하여 그레이스캐일로 변환
    r, g, b, a = img[:, :, 0], img[:, :, 1], img[:, :, 2], img[:, :, 3]
    rgb = tf.stack([a], axis=-1)
    img = rgb

    # 4. 8bit([0, 255]) 데이터를 float32([0, 1]) 범위로 변환
    img = tf.image.convert_image_dtype(img, tf.float32)
    
    # 5. 인식률 향상을 위해 이미지를 이진화(Binarization) 함
    img = tf.where(img > 0, 1, 0)
    
    # 6. 이미지 크기에 맞게 리사이징
    img = tf.image.resize(img, [img_height, img_width])
    
    # 7. 이미지 가로세로 바꿈 -> 이미지의 가로와 시간 차원을 대응하기 위함
    img = tf.transpose(img, perm=[1, 0, 2])
    
    # 8. label의 문자들을 숫자로 매핑
    label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
    
    # 9. model의 형식에 맞게 데이터 반환
    return {"image": img, "label": label}

## `Dataset` 객체 생성

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = (
    train_dataset.map(
        encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid))
validation_dataset = (
    validation_dataset.map(
        encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

## 샘플 데이터 시각화

In [None]:
_, ax = plt.subplots(4, 4, figsize=(10, 5))
for batch in train_dataset.take(1):
    images = batch["image"]
    labels = batch["label"]
    for i in range(16):
        img = (images[i] * 255).numpy().astype("uint8")
        label = tf.strings.reduce_join(num_to_char(labels[i])).numpy().decode("utf-8")
        ax[i // 4, i % 4].imshow(img[:, :, 0].T, cmap="gray")
        ax[i // 4, i % 4].set_title(label)
        ax[i // 4, i % 4].axis("off")
plt.show()

## 모델 정의

In [None]:
class CTCLayer(layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost
 
    def call(self, y_true, y_pred):
        # 모델이 training하는 경우, `self.add_loss()`를 사용하여 loss를 계산하고 더해줌
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")
 
        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
 
        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)
 
        # 테스트 시에는 예측 결과값만 반환
        return y_pred
 
 
def build_model():
    # model Input 정의
    input_img = layers.Input(
        shape=(img_width, img_height, 1), name="image", dtype="float32"
    )
    labels = layers.Input(name="label", shape=(None,), dtype="float32")
 
    # 첫 번째 conv block
    x = layers.Conv2D(
        32,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv1",
    )(input_img)
    x = layers.MaxPooling2D((2, 2), name="pool1")(x)
 
    # 두 번째 conv block
    x = layers.Conv2D(
        64,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv2",
    )(x)
    x = layers.MaxPooling2D((2, 2), name="pool2")(x)
 
    # 두 번의 max pool(stride 2, pool size 2)을 사용하므로 feature maps는 4배 downsample 됨
    # 마지막 레이어의 필터의 갯수는 64개
    # 모델의 RNN 파트에 넣기 전에 Reshape를 해 줌
    new_shape = ((img_width // 4), (img_height // 4) * 64)
    x = layers.Reshape(target_shape=new_shape, name="reshape")(x)
    x = layers.Dense(64, activation="relu", name="dense1")(x)
    x = layers.Dropout(0.2)(x)
 
    # RNNs
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.25))(x)
    x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.25))(x)
 
    # Output layer
    x = layers.Dense(
        len(char_to_num.get_vocabulary()) + 1, activation="softmax", name="dense2"
    )(x)
 
    # 모델에 CTC loss를 계산하는 CTC Layer 추가
    output = CTCLayer(name="ctc_loss")(labels, x)
 
    # 모델 정의
    model = keras.models.Model(
        inputs=[input_img, labels], outputs=output, name="ocr_model_v1"
    )
    # Optimizer 정의
    opt = keras.optimizers.Adam()
    # 모델 컴파일 후 반환
    model.compile(optimizer=opt)
    return model
 
 
# 모델 구함
model = build_model()
model.summary()

## 모델 학습

In [None]:
epochs = 500
early_stopping_patience = 10
# Early stopping 콜백 함수 선언
early_stopping = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True
)
 
# 모델 학습
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=epochs,
    callbacks=[early_stopping],
)

## 모델 평가

In [None]:
results = model.evaluate(validation_dataset, batch_size=batch_size)
print("Test loss:", results)

## 이미지에서 텍스트 추론

In [None]:
# 출력 레이어까지 레이어를 추출하여 예측 모델을 가져옵니다.
prediction_model = keras.models.Model(
    model.get_layer(name="image").input, model.get_layer(name="dense2").output
)
prediction_model.summary()

# 추론 결과 후처리
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Greedy 알고리즘 사용. 복잡한 작업은 Beam Search도 사용 가능
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
        :, :max_length
    ]
    # 매핑된 데이터 복구
    output_text = []
    for res in results:
        res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
        output_text.append(res)
    return output_text


# 랜덤 샘플 추론하여 시각화
for batch in validation_dataset.take(1):
    batch_images = batch["image"]
    batch_labels = batch["label"]
 
    preds = prediction_model.predict(batch_images)
    pred_texts = decode_batch_predictions(preds)
 
    orig_texts = []
    for label in batch_labels:
        label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
        orig_texts.append(label)

    _, ax = plt.subplots(4, 4, figsize=(15, 5))
    for i in range(len(pred_texts)):
        img = (batch_images[i, :, :, 0] * 255).numpy().astype(np.uint8)
        img = img.T
        title = f"Predict: {pred_texts[i]}"
        ax[i // 4, i % 4].imshow(img, cmap="gray")
        ax[i // 4, i % 4].set_title(title)
        ax[i // 4, i % 4].axis("off")
plt.show()

## 모델 파일 저장

In [None]:
save_path = "/content/drive/MyDrive/Colab Notebooks/models"
prediction_model.save(f"{save_path}/data.h5")
prediction_model.save("/content/model/data.h5")

## Keras `.h5` 모델을  `.tflite` 모델로 변환

TensorFlow Keras 모델을 TensorFlow Lite에서도 사용할 수 있도록 변환합니다.

In [None]:
saved_path = "/content/drive/MyDrive/Colab Notebooks/models"
h5_model_path = f"{saved_path}/data.h5"
 
# H5 모델 변환
h5_model = tf.keras.models.load_model(h5_model_path)
converter = tf.lite.TFLiteConverter.from_keras_model(h5_model)
# 220810 추가) tflite 플래그 설정 안 하면 LSTM 변환 시 오류 발생
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS] 
converter._experimental_lower_tensor_list_ops = False


tflite_model = converter.convert()
 
# TFLite 모델 저장
with open(f"{saved_path}/data.tflite", 'wb') as f:
    f.write(tflite_model)
with open(f"/content/model/data.tflite", 'wb') as f:
    f.write(tflite_model)