<a href="https://colab.research.google.com/github/aloml2543/KB_OCR/blob/main/src/model3_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from tqdm import tqdm
import datetime
import random
plt.style.use('dark_background')

from tensorflow.keras import backend as K
 
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2,preprocess_input
from tensorflow.keras import layers
from tensorflow import keras
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.layers import Input,GlobalMaxPooling2D,Dense, Conv2D, BatchNormalization, Activation, MaxPooling2D, Reshape, LSTM, Lambda, add, concatenate, Bidirectional
from tensorflow.keras.models import Model, save_model, load_model
from tensorflow.keras.preprocessing.image import img_to_array,load_img
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard

In [None]:
def split_data(imgs, labels, split = 0.9, get = 1):
  get_point = int(len(imgs) * get)
  img_get = imgs[:get_point]
  lable_get = labels[:get_point]
  split_point = int(len(img_get) * split)

  return img_get[:split_point], lable_get[:split_point], img_get[split_point:], lable_get[split_point:]
#데이터셋 압축 해제
!unzip -n "/content/drive/MyDrive/KB_OCR/DATA/snukb_dataset.zip"  -d "/content/"

#dataset 가져오기
labels = pd.read_csv('/content/train/train.csv')
image_dir = [str(index) + '.jpg' for index in list(labels.index.to_numpy())]
labels = list(labels.label.to_numpy())

train_img, train_label, val_img, val_label = split_data(image_dir, labels)


#파라미터들
model_path = '/content/drive/MyDrive/KB_OCR/model1-1/'
img_width = 128
img_height = 64
 
max_length = max([len(label) for label in labels])
characters = sorted(list(set(char for label in labels for char in label)))
num_classes = len(characters) + 1

print("최대 길이", max_length)
print("문자수", len(characters))
print("앞 20글자", characters[0:20])
print("뒤 20글자", characters[-20:])

print("\ntrain_img 수:", len(train_img))
print("train_label 수:", len(train_label))
print("val_img 수:", len(val_img))
print("val_label 수:", len(val_label))

Archive:  /content/drive/MyDrive/KB_OCR/DATA/snukb_dataset.zip
최대 길이 38
문자수 1211
앞 20글자 [' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3']
뒤 20글자 ['훈', '훔', '훨', '훼', '휴', '흉', '흐', '흑', '흔', '흘', '흙', '흡', '흥', '흩', '희', '흰', '히', '힌', '힐', '힘']

train_img 수: 27054
train_label 수: 27054
val_img 수: 3006
val_label 수: 3006


In [None]:
def labels_to_text(labels):     # index형 데이터 -> 글자 변환
    return ''.join(list(map(lambda x: characters[int(x)], labels)))
 
def text_to_labels(text):      # 글자 -> index형 데이터 변환
    return list(map(lambda x: characters.index(x), text))

#입력값 제너레이터
class TextImageGenerator:
    def __init__(self, img_dirpath,img_dir,labels, img_w, img_h,
                 batch_size, downsample_factor, max_text_len=100):
        self.img_h = img_h
        self.img_w = img_w
        self.labels = labels
        self.batch_size = batch_size
        self.max_text_len = max_text_len
        self.downsample_factor = downsample_factor
        self.img_dirpath = img_dirpath                  # image dir path
        self.img_dir = img_dir     # images list
        self.n = len(self.img_dir)                      # number of images
        self.indexes = list(range(self.n))
        self.cur_index = 0
        self.imgs = np.zeros((self.n, self.img_h, self.img_w))
        self.texts = []
 
    ## samples의 이미지 목록들을 opencv로 읽어 저장하기, texts에는 label 저장
    def build_data(self):
        for i, img_file in enumerate(self.img_dir):
            img = cv2.imread(self.img_dirpath + img_file, cv2.COLOR_BGR2GRAY)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            img = cv2.GaussianBlur(img, ksize=(5,5), sigmaX=10) #노이즈 삭제
            img = cv2.adaptiveThreshold(
                img,
                maxValue=255.0,
                adaptiveMethod=cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                thresholdType=cv2.THRESH_BINARY_INV,
                blockSize=19,
                C=2
            )
            img = cv2.resize(img, (self.img_w, self.img_h))
            img = img.astype(np.float32)
            img = (img / 255.0)
 
            self.imgs[i, :, :] = img
            self.texts.append(labels[i])
 
    def next_sample(self):      ## index max -> 0 으로 만들기
        self.cur_index += 1
        if self.cur_index >= self.n:
            self.cur_index = 0
            random.shuffle(self.indexes)
        return self.imgs[self.indexes[self.cur_index]], self.texts[self.indexes[self.cur_index]]
 
    def next_batch(self):       ## batch size만큼 가져오기
        while True:
            X_data = np.ones([self.batch_size, self.img_w, self.img_h, 1])     # (bs, 128, 64, 1)
            Y_data = np.zeros([self.batch_size, self.max_text_len])   # (bs, 9)
            input_length = np.ones((self.batch_size, 1)) * (self.img_w // self.downsample_factor - 2)  # (bs, 1)
            label_length = np.zeros((self.batch_size, 1))           # (bs, 1)
 
            for i in range(self.batch_size):
                img, text = self.next_sample()
                img = img.T
                img = np.expand_dims(img, -1)
                X_data[i] = img
                word = text_to_labels(text)
                for point in range(len(word)):
                    Y_data[i][point] = word[point]
                label_length[i] = len(text)
 
            # dict 형태로 복사
            inputs = {
                'the_input': X_data,  # (bs, 128, 64, 1)
                'the_labels': Y_data,  # (bs, 8)
                'input_length': input_length,  # (bs, 1) -> 모든 원소 value = 30
                'label_length': label_length  # (bs, 1) -> 모든 원소 value = 8
            }
            outputs = {'ctc': np.zeros([self.batch_size])}   # (bs, 1) -> 모든 원소 0
            yield (inputs, outputs)

train_file_path = '/content/train/images/'
train_dataset_generator = TextImageGenerator(train_file_path,tqdm(train_img), train_label, img_width, img_height, 1, 4, max_text_len=max_length)
train_dataset_generator.build_data()
val_dataset_generator = TextImageGenerator(train_file_path,tqdm(val_img), val_label, img_width, img_height, 1, 4, max_text_len=max_length)
val_dataset_generator.build_data()

100%|██████████| 27054/27054 [00:34<00:00, 783.52it/s]
100%|██████████| 3006/3006 [00:03<00:00, 761.34it/s]


In [None]:
def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args
    y_pred = y_pred[:,2:, :]
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)
 
def get_Model(training=True):
    inputs = Input(name='the_input', shape=(img_width, img_height, 1), dtype='float32')
 
 
    inner = Conv2D(8, (64, 64), padding='same', kernel_initializer='he_normal', name='con7')(inputs)
    inner = BatchNormalization()(inner)
    inner = Activation('relu')(inner)
 
    # CNN to RNN
    inner = Reshape(target_shape=((128, 512)), name='reshape')(inner)
    inner = Dense(64, activation='relu', kernel_initializer='he_normal', name='dense1')(inner)
 
    # RNN layer
    lstm_1 = Bidirectional(LSTM(64, return_sequences=True, kernel_initializer='he_normal', name='lstm1'))(inner)
    lstm_1b = Bidirectional(LSTM(64, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', name='lstm1_b'))(inner)
    reversed_lstm_1b = Lambda(lambda inputTensor: K.reverse(inputTensor, axes=1)) (lstm_1b)
 
    lstm1_merged = add([lstm_1, reversed_lstm_1b])
    lstm1_merged = BatchNormalization()(lstm1_merged)
 

    inner = Dense(num_classes, kernel_initializer='he_normal',name='dense2')(lstm1_merged)
    y_pred = Activation('softmax', name='softmax')(inner)
 
    labels = Input(name='the_labels', shape=[max_length], dtype='float32') 
    input_length = Input(name='input_length', shape=[1], dtype='int64')
    label_length = Input(name='label_length', shape=[1], dtype='int64')
 
    loss_out = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length])
 
    if training:
        return Model(inputs=[inputs, labels, input_length, label_length], outputs=loss_out)
    else:
        return Model(inputs=[inputs], outputs=y_pred)
 
# Get the model
 
model = get_Model()
model.summary()
model.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer=Adam(learning_rate=0.0005))

Model: "model_15"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
the_input (InputLayer)          [(None, 128, 64, 1)] 0                                            
__________________________________________________________________________________________________
con7 (Conv2D)                   (None, 128, 64, 8)   32776       the_input[0][0]                  
__________________________________________________________________________________________________
batch_normalization_364 (BatchN (None, 128, 64, 8)   32          con7[0][0]                       
__________________________________________________________________________________________________
activation_338 (Activation)     (None, 128, 64, 8)   0           batch_normalization_364[0][0]    
___________________________________________________________________________________________

In [None]:
model_path = '/content/drive/MyDrive/KB_OCR/model3-2/'
checkpoint = ModelCheckpoint(filepath=model_path + 'model3_2.hdf5', monitor='val_loss', verbose=1, mode='auto', save_best_only=True)



log_dir = model_path + "tensorbloard/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
 
now = datetime.datetime.now()
print("시작 시간:", now)
history = model.fit(train_dataset_generator.next_batch(), steps_per_epoch=train_dataset_generator.n
                    ,validation_data=val_dataset_generator.next_batch(), validation_steps=val_dataset_generator.n
                    ,callbacks=[checkpoint, tensorboard_callback]
                    ,epochs=500)
now = datetime.datetime.now()
print("종료 시간:", now)

시작 시간: 2021-09-08 15:48:36.071112
Epoch 1/500
    1/27054 [..............................] - ETA: 17:43 - loss: 17.3532





InvalidArgumentError: ignored