In [1]:
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, AveragePooling2D, Dropout, Flatten, Concatenate, Activation, GlobalAveragePooling2D, BatchNormalization
from keras.models import Sequential, Model
from keras.applications.vgg16 import VGG16
from keras.applications.vgg19 import VGG19
from keras.applications.resnet_v2 import ResNet50V2
from keras.applications.resnet import ResNet101
from keras.applications.inception_resnet_v2 import InceptionResNetV2
import cv2
import os
import math
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

In [2]:

# 이미지 전처리 함수

# HSV로 변환
# 학습 모델에서는 이미지를 rgb를 읽어옴. 서버 프로그램에서는 bgr로 이미지를 읽어와서 서버에서는 bgr을 rgb로 convert 해줘야함.
def image_preprocessing(image):
    image = np.array(image)
    image_resize = cv2.resize(image, dsize = (400, 400))
    image_hsv = cv2.cvtColor(image_resize, cv2.COLOR_RGB2HSV)
    image_scaled = image_hsv.copy()
    image_scaled[:,:,0] = image_hsv[:,:,0] / 180.
    image_scaled[:,:,1] = image_hsv[:,:,1] / 255.
    image_scaled[:,:,2] = image_hsv[:,:,2] / 255.
    return image_scaled

# # GrayScale로 변환
# def image_preprocessing(image):
#     image_gray_reshape = image.reshape(image.shape[0],image.shape[1], 1)
#     image_scaled = image_gray_reshape / 255.
#     return image_scaled

# GoogleNet Inception
def inception(x_in, x1_f,x3r_f,x3_f,x5r_f,x5_f,po):

    x1 = MaxPooling2D(pool_size=(3,3),strides=(1,1),padding = 'SAME')(x_in)
    x1 = Conv2D(filters= po,kernel_size=(1,1),padding="SAME")(x1)
    x1 = Activation('relu')(x1)

    x2 = Conv2D(filters=x5r_f,kernel_size=(1,1),padding="SAME")(x_in)
    x2 = Conv2D(filters=x5_f,kernel_size=(5,5),padding="SAME")(x2)
    x2 = Activation('relu')(x2)

    x3 = Conv2D(filters=x3r_f,kernel_size=(1,1),padding="SAME")(x_in)
    x3 = Conv2D(filters=x3_f,kernel_size=(3,3),padding="SAME")(x3)
    x3 = Activation('relu')(x3)

    x4 = Conv2D(filters=x1_f,kernel_size=(1,1),padding="SAME")(x_in)
    x4 = Activation('relu')(x4)

    out = Concatenate()([x1,x2,x3,x4])
    return out


def create_model(model, item_code, img_shape, grayscale=False, batch_size=32, test_or_val='test', epochs=10, model_save=True):
    
    # flow_from_directory를 통해 이미지 불러오기
    train_gen = ImageDataGenerator(preprocessing_function=image_preprocessing)
    test_gen = ImageDataGenerator(preprocessing_function=image_preprocessing)
    val_gen = ImageDataGenerator(preprocessing_function=image_preprocessing)

    if grayscale:
        color_mode = 'grayscale'
    else:
        color_mode = 'rgb'

    train_flow_gen = train_gen.flow_from_directory(
        directory=f'./model_images/{item_code}/train', color_mode = color_mode,
        target_size=img_shape[:2], class_mode='categorical', batch_size=batch_size, shuffle=False)
    test_flow_gen = test_gen.flow_from_directory(
        directory=f'./model_images/{item_code}/test', color_mode = color_mode,
        target_size=img_shape[:2], class_mode='categorical', batch_size=batch_size, shuffle=False)
    if test_or_val == 'val':
        val_flow_gen = val_gen.flow_from_directory(
            directory=f'./model_images/{item_code}/validation', color_mode = color_mode,
            target_size=img_shape[:2], class_mode='categorical', batch_size=batch_size, shuffle=False)


    print(f'>> directory read : {train_flow_gen.directory}')
    print(f'>>{train_flow_gen.class_indices}')
    print(f'>> image_shape: {train_flow_gen.image_shape}')
    print(f'>> train_count: {train_flow_gen.n}')
    print(f'>> test_count: {test_flow_gen.n}')


    # 학습 모델


    ##------------------------------------------------------------------------------------------------------------------------------
    # PDPROG    # PDPROG    # PDPROG    # PDPROG    # PDPROG    # PDPROG    # PDPROG    # PDPROG    # PDPROG    # PDPROG    # PDPROG
    if model == 'pdprog':
        model_name = 'PdProg'
        model = tf.keras.models.Sequential([
            tf.keras.layers.Conv2D(32, (2,2), activation='relu', input_shape=img_shape),
            tf.keras.layers.MaxPooling2D(2,2),
            tf.keras.layers.Conv2D(64, (2,2), activation='relu'),
            tf.keras.layers.MaxPooling2D(2,2),
            tf.keras.layers.Conv2D(128, (2,2), activation='relu'),
            tf.keras.layers.MaxPooling2D(2,2),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(256,activation='relu'),
            tf.keras.layers.Dense(train_flow_gen.num_classes, activation='softmax')
        ])
        model.summary()
        model.compile(loss = 'categorical_crossentropy',
                      optimizer = 'adam',
                      metrics = ['accuracy'])
    ##------------------------------------------------------------------------------------------------------------------------------


    #------------------------------------------------------------------------------------------------------------------------------
    # GoogleNet    # GoogleNet    # GoogleNet    # GoogleNet    # GoogleNet    # GoogleNet    # GoogleNet    # GoogleNet    # GoogleNet
    elif model == 'googlenet':
        model_name = 'GoogleNet'
        input_data = Input(shape=(400,400,3))
        x = Conv2D(filters=64,kernel_size=(7,7),strides=(2,2),padding="SAME")(input_data)
        x = MaxPooling2D(pool_size=(3,3),strides=(2,2),padding="SAME")(x)
        x = tf.keras.layers.LayerNormalization()(x)

        x = Conv2D(filters=64,kernel_size=(1,1),strides=(1,1),padding="SAME")(x)
        x = Conv2D(filters=192,kernel_size=(3,3),strides=(1,1),padding="SAME")(x)

        x = tf.keras.layers.LayerNormalization()(x)
        x = MaxPooling2D(pool_size=(3,3),strides=(2,2),padding="SAME")(x)


        x = inception(x,64,96,128,16,32,32)
        x = inception(x,128,128,192,32,96,64)

        x = MaxPooling2D(pool_size=(3,3),strides=(2,2),padding="SAME")(x)
        x = inception(x,192,96,208,16,48,64)

        ax1 = AveragePooling2D(pool_size=(5,5),strides=(3,3))(x)
        ax1 = Conv2D(filters=128,kernel_size=(1,1),padding="SAME")(ax1)
        ax1 = Flatten()(ax1)
        ax1 = Dense(1024,activation="relu")(ax1)
        ax1 = Dropout(0.7)(ax1)
        ax1 = Dense(6,activation="softmax")(ax1)

        x = inception(x,160,112,224,24,64,64)
        x = inception(x,128,128,256,24,64,64)

        x = inception(x,112,114,288,32,64,64)

        ax2 = AveragePooling2D(pool_size=(5,5),strides=(3,3))(x)
        ax2 = Conv2D(filters=128,kernel_size=(1,1),padding="SAME")(ax2)
        ax2 = Flatten()(ax2)
        ax2 = Dense(1024,activation="relu")(ax2)
        ax2 = Dropout(0.7)(ax2)
        ax2 = Dense(6,activation="softmax")(ax2)

        x = inception(x,256,160,320,32,128,128)
        x = MaxPooling2D(pool_size=(3,3),strides=(2,2),padding="SAME")(x)

        x = inception(x,256,160,320,32,128,128)
        x = inception(x,384,192,384,48,128,128)

        x = GlobalAveragePooling2D()(x)
        x = Dropout(0.4)(x)

        outputs = Dense(6,activation="softmax")(x)
        model = tf.keras.models.Model(input_data,[outputs,ax1,ax2],name = 'googlenet')
    #------------------------------------------------------------------------------------------------------------------------------


    # #------------------------------------------------------------------------------------------------------------------------------
    elif model == 'vgg16':
    # #VGG16    #VGG16    #VGG16    #VGG16    #VGG16    #VGG16    #VGG16    #VGG16    #VGG16    #VGG16    #VGG16    #VGG16    #VGG16
        model_name = 'VGG16'
        model = VGG16(weights= None, input_shape = (400, 400, 3), classes=6)
    # #------------------------------------------------------------------------------------------------------------------------------
    #
    #
    # #------------------------------------------------------------------------------------------------------------------------------
    elif model == 'vgg19':
    # #VGG19     #VGG19    #VGG19     #VGG19    #VGG19     #VGG19    #VGG19     #VGG19    #VGG19     #VGG19    #VGG19     #VGG19
        model_name = 'VGG19'
        model = VGG19(weights=None, input_shape = (400, 400, 3), classes=6)
    # #------------------------------------------------------------------------------------------------------------------------------
    #
    #
    # #------------------------------------------------------------------------------------------------------------------------------
    # #AlexNet    #AlexNet    #AlexNet    #AlexNet    #AlexNet    #AlexNet    #AlexNet    #AlexNet    #AlexNet    #AlexNet    #AlexNet
    elif model == 'alexnet':
        model_name = 'AlexNet'
        input_shape = (400, 400, 3)
        x = Input(shape = input_shape, name='INPUT')

        # CONV
        conv1 = Conv2D(filters=96, kernel_size=11, activation='relu', strides=4, name='CONV_1')(x)
        pool1 = MaxPooling2D((3,3), strides=2, name='POOL_1')(conv1)  # overlapped pooling
        # lrn1 = local_response_normalization(conv1,depth_radius=5, bias=2, alpha=0.0001, beta=0.75)
        lrn1 = BatchNormalization(name='LRN_1')(pool1)

        conv2 = Conv2D(filters=256, kernel_size=5, activation='relu', strides=1, padding='same', name='CONV_2')(lrn1)
        pool2 = MaxPooling2D((3,3), strides=2, name='POOL_2')(conv2)
        # lrn2 = local_response_normalization(conv2,depth_radius=5, bias=2,  alpha=0.0001, beta=0.75)
        lrn2 = BatchNormalization(name='LRN_2')(pool2)

        conv3 = Conv2D(filters=384, kernel_size=3, activation='relu', strides=1, padding='same', name='CONV_3')(lrn2)
        conv4 = Conv2D(filters=384, kernel_size=3, activation='relu', strides=1, padding='same', name='CONV_4')(conv3)
        conv5 = Conv2D(filters=256, kernel_size=3, activation='relu', strides=1, padding='same', name='CONV_5')(conv4)
        pool3 = MaxPooling2D((3,3), strides=2, name='POOL_3')(conv5)

        # FC
        f = Flatten()(pool3)
        f = Dense(512, activation='relu', name='FC_1')(f)
        # f = Dropout(0.5)(f)  # 논문 parameter 0.5 이용
        # f = Dense(4096, activation='relu', name='FC_2')(f)
        # f = Dropout(0.5)(f)
        out = Dense(6, activation='softmax', name='OUTPUT')(f)

        model = tf.keras.models.Model(inputs=x, outputs=[out])
    # #------------------------------------------------------------------------------------------------------------------------------


    # #------------------------------------------------------------------------------------------------------------------------------
    # #ResNetV2    #ResNetV2    #ResNetV2    #ResNetV2    #ResNetV2    #ResNetV2    #ResNetV2    #ResNetV2    #ResNetV2    #ResNetV2
    elif model == 'resnet_v2':
        model_name = 'ResNet_v2'
        model = ResNet50V2(weights=None, input_shape=(400, 400, 3), classes=6)
    # #------------------------------------------------------------------------------------------------------------------------------
    #
    #
    # #------------------------------------------------------------------------------------------------------------------------------
    # #Inception ResNet    #Inception ResNet    #Inception ResNet    #Inception ResNet    #Inception ResNet    #Inception ResNet
    elif model == 'inception_resnet':
        model_name = 'Inception Resnet'
        model = InceptionResNetV2(weights = None, input_shape=(400, 400, 3), classes=6)
    # #------------------------------------------------------------------------------------------------------------------------------
    #
    #
    # #------------------------------------------------------------------------------------------------------------------------------
    #ResNet101    #ResNet101    #ResNet101    #ResNet101    #ResNet101    #ResNet101    #ResNet101    #ResNet101    #ResNet101
    elif model == 'resnet101':
        model_name = 'ResNet101'
        model = ResNet101(weights=None, input_shape=(400, 400, 3), classes=6)
    # #------------------------------------------------------------------------------------------------------------------------------


    print(model_name)
    model.summary()
    model.compile(loss = 'categorical_crossentropy',
                  optimizer = 'adam',
                  metrics = ['accuracy'])


    # 체크포인트
    cb_checkpoint = ModelCheckpoint(filepath=f'model_hdf5/{item_code}_defect.h5', monitor='val_loss', verbose=1, save_best_only=True,
                                    save_weight_only=True)
    
    if test_or_val == 'test':
        validation_data = test_flow_gen
    elif test_or_val == 'val':
        validation_data = val_flow_gen


    if model_save:
        history = model.fit(train_flow_gen,
                            batch_size = batch_size,
                            epochs=epochs,
                            callbacks =[cb_checkpoint],
                            validation_data=validation_data,
                            steps_per_epoch= int(math.ceil(1. * train_flow_gen.n / batch_size,)),
                            validation_steps = int(math.ceil(1. * test_flow_gen.n / batch_size))
                            )
    else:
        history = model.fit(train_flow_gen,
                            batch_size = batch_size,
                            epochs=epochs,
                            validation_data=validation_data,
                            steps_per_epoch= int(math.ceil(1. * train_flow_gen.n / batch_size,)),
                            validation_steps = int(math.ceil(1. * val_flow_gen.n / batch_size))
                            )
    if test_or_val == 'test':
        model.evaluate(validation_data, batch_size = int(math.ceil(1. * test_flow_gen.n / batch_size)))
    elif test_or_val == 'val':
        model.evaluate(validation_data, batch_size = int(math.ceil(1. * val_flow_gen.n / batch_size)))


    # confusion matrix 출력
    if test_or_val == 'test':
        real_target = test_flow_gen.labels
        predict_target = []
        for i in model.predict(test_flow_gen):
            predict_target.append(np.argmax(i))
            
    if test_or_val == 'val':
        real_target = val_flow_gen.labels
        predict_target = []
        for i in model.predict(val_flow_gen):
            predict_target.append(np.argmax(i))
            
    predict_target = np.asarray(predict_target)
    con = tf.math.confusion_matrix(labels=real_target, predictions=predict_target)
    print('\n\n\n\n>> confusion_matrix')
    print(list(train_flow_gen.class_indices.keys()))
    print(con)

    # 학습 그래프 출력
    plt.figure(1)
    plt.subplot(211)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model Accucarcy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')

    plt.subplot(212)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')

    plt.show()
    
    return history

In [None]:
#model = [pdprog, googlenet, vgg16, vgg19, alexnet, resnet_v2, inception_resnet]

model = create_model(
    model = 'alexnet',
    item_code='DA4649_rotate', # 품목코드. ./model_images 쪽에서 동일한 폴더명을 읽어오고, 추후 저장시 해당 품목코드를 기반으로 저장
    img_shape=(400, 400, 3),
    grayscale = False, # False일 경우 rgb, True일 경우 grayscale
    batch_size=32, # 모델 훈련 batch_size
    test_or_val='test', # 검증세트를 'test' or 'val'로 설정
    epochs=10,
    model_save=True # True일 경우 학습모델(.h5) 저장
)