# 구글 드라이브 연결

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# [Step 0] 초기 설정

In [None]:
import os
import numpy as np
import shutil
from PIL import Image
from numpy import expand_dims
from keras.api._v2.keras import activations
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import load_img, img_to_array, ImageDataGenerator, array_to_img
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from tensorflow.python.client import device_lib
from datetime import datetime
import json

data_path = os.getcwd() + '/drive/MyDrive/2023-2-Capstone-ML/dataset/'
model_path = os.getcwd() + '/drive/MyDrive/2023-2-Capstone-ML/model/'
ori_path = data_path + 'pet_img/'


data_dir = data_path + 'gen_img/'
train_dir = data_path + 'train_ds'
validation_dir = data_path + 'val_ds'
test_ratio = 0.2

# [Step 1] 이미지 증식 함수

## 이미지 증식 클래스

In [None]:
class ImageProcess:
    def __init__(self, path=None, name=None, num=None, dir_path=None):
        self.image = Image.open(path)
        self.size = (224, 224)
        self.path = path
        self.name = name
        self.num = num
        self.dir_path = dir_path

    def image_processing(self):
        self.image = self.image.resize(self.size)
        image_array = np.asarray(self.image)
        image_array = image_array / 255.0
        return image_array

    def image_generator(self):
        img = Image.open(self.path)
        img = img.resize(self.size)
        img_array = img_to_array(img)
        img_array = img_array.reshape((1,) + img_array.shape)
        datagen = ImageDataGenerator(
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            shear_range=0.2,
            zoom_range=0.2,
            horizontal_flip=True,
            brightness_range=[0.7, 1.3],
            fill_mode='nearest')

        num_augmented_images = 30
        i = 0

        for batch in datagen.flow(img_array, batch_size=1):
          augmented_image = array_to_img(batch[0])
          augmented_image.save(self.dir_path + self.name + '_' + str(self.num) + '.jpg')
          i += 1
          self.num += 1
          if i >= num_augmented_images:
            break
        return self.num


In [None]:
def path_setting(img_name):
  gen_path = data_path + 'gen_img/' + img_name + '/'
  train_path = data_path + 'train_img/' + img_name + '/'
  test_path = data_path + 'test_img/' + img_name + '/'
  pet_path = ori_path + img_name
  return gen_path, train_path, test_path, pet_path

def generator_img(img_name):
  gen_path, train_path, test_path, pet_path = path_setting(img_name)

  # 디렉토리 생성
  os.makedirs(gen_path, exist_ok=True)
  os.makedirs(train_path, exist_ok=True)
  os.makedirs(test_path, exist_ok=True)

  # 증식 실시할 데이터 지정
  pet_img_list = []
  for img in os.listdir(pet_path):
    pet_img_list.append(img)

  # 학습용, 테스트용 데이터 분리
  img_count = len(pet_img_list)
  test_count = int(img_count * test_ratio)
  test_img_list = np.random.choice(pet_img_list, test_count, False)
  train_idx = np.where(np.isin(pet_img_list, test_img_list) == False)[0]
  train_img_list = np.array(pet_img_list)[train_idx]

  # 테스트용 데이터 저장
  test_idx = 0
  for img in test_img_list :
    pet_img_path = pet_path + '/' + str(img)
    test_img_path = test_path + str(img)[:-4] + '_' + str(test_idx) + '.jpg'
    test_idx += 1
    shutil.copyfile(pet_img_path, test_img_path)
    print(pet_img_path + "에 있는 이미지를 " + test_img_path + "에 복사")
  print(str(img_name) + "번 동물 테스트 사진 " + str(test_idx) + "개 생성")

  # 학습용 데이터 저장 및 증식
  train_idx = 0
  for img in train_img_list :
    pet_img_path = pet_path + '/' + str(img)
    train_img_path = train_path + str(img)[:-4] + '_' + str(train_idx) + '.jpg'
    train_idx += 1
    shutil.copyfile(pet_img_path, train_img_path)
    print(pet_img_path + "에 있는 이미지를 " + train_img_path + "에 복사")

    # 데이터 증식
    obj = ImageProcess(train_img_path, str(img)[:-4], train_idx, gen_path)
    train_idx = obj.image_generator()
  print(str(img_name) + "번 동물 학습 사진 " + str(train_idx) + "개 증식")


In [None]:
for pet in os.listdir(ori_path):
  generator_img(pet)

# [Step 2] train, valid 데이터 분리

In [None]:
def split_train_valid() :
  # 경로 생성
  os.makedirs(train_dir, exist_ok=True)
  os.makedirs(validation_dir, exist_ok=True)

  subdirs = [d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))]

  # train, validation 분리
  for subdir in subdirs:
    subdir_path = os.path.join(data_dir, subdir)
    images = [f for f in os.listdir(subdir_path) if f.endswith('.jpg')]
    train_images, validation_images = train_test_split(images, test_size=0.2, random_state=42)

    # Move images to training directory
    for image in train_images:
        source = os.path.join(subdir_path, image)
        destination = os.path.join(train_dir, subdir, image)
        os.makedirs(os.path.dirname(destination), exist_ok=True)
        shutil.copy(source, destination)

    # Move images to validation directory
    for image in validation_images:
        source = os.path.join(subdir_path, image)
        destination = os.path.join(validation_dir, subdir, image)
        os.makedirs(os.path.dirname(destination), exist_ok=True)
        shutil.copy(source, destination)
    print(str(subdir) + "번 사진 " + str(len(train_images) + len(validation_images)) + "개를 추가하였습니다.")
split_train_valid()

# [Step 3] 학습

## 이미지 학습 클래스

In [None]:
class EfficientNet:
    def training_model(self):
      train_datagen = ImageDataGenerator(
            rescale=1./255,
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            shear_range=0.2,
            zoom_range=0.2,
            horizontal_flip=True,
            brightness_range=[0.7, 1.3],
            fill_mode='nearest'
      )

      # Define data augmentation for validation images
      validation_datagen = ImageDataGenerator(
          rescale=1./255
      )

      # Create data generators
      batch_size = 16
      train_generator = train_datagen.flow_from_directory(
          train_dir,
          target_size=(224, 224),
          batch_size=batch_size,
          class_mode='categorical')

      validation_generator = validation_datagen.flow_from_directory(
          validation_dir,
          target_size=(224, 224),
          batch_size=batch_size,
          class_mode='categorical')

      # Load the EfficientNetB0 model with pre-trained weights (include_top=False)
      base_model = EfficientNetB0(weights='imagenet', include_top=False)

      # Add a global average pooling layer and a fully connected layer for classification
      x = base_model.output
      x = GlobalAveragePooling2D()(x)
      x = Dense(1024, activation='relu')(x)
      predictions = Dense(len(os.listdir(train_dir)), activation='softmax')(x)

      # Create the model
      model = keras.models.Model(inputs=base_model.input, outputs=predictions)
      custom_optimizer = Adam(learning_rate=0.00003)

      # Compile the model
      model.compile(optimizer=custom_optimizer,
                    loss='categorical_crossentropy',
                    metrics=['accuracy'])

      # Define callbacks
      model_checkpoint = ModelCheckpoint(model_path + 'save_model/model.h5', save_best_only=True, save_weights_only=True, monitor='val_loss', mode='min')
      early_stopping = EarlyStopping(monitor='val_loss', patience=10, mode='min')

      # Train the model
      num_epochs = 50
      history = model.fit(train_generator,
                          epochs=num_epochs,
                          validation_data=validation_generator,
                          callbacks=[model_checkpoint, early_stopping])

      # Save the trained model
      model.save(model_path + 'save_model/pet_classification_efficientnet_b0.h5')

    @staticmethod
    def draw_graph(filename):
        with open(model_path + 'acc_history/'+ filename, 'r') as json_file:
            data = json.load(json_file)
            acc = data['acc']
            val_acc = data['val_acc']
            loss = data['loss']
            val_loss = data['val_loss']

            epochs = range(len(acc))
            plt.figure(figsize=(100, 100))
            plt.subplot(211)
            plt.plot(epochs, acc, 'r', label='Training accuracy')
            plt.plot(epochs, val_acc, 'b', label='Validation accuracy')
            plt.title('Training and validation accuracy')
            plt.legend(loc=0)

            plt.subplot(212)
            plt.plot(epochs, loss, 'r', label='Training loss')
            plt.plot(epochs, val_loss, 'b', label='Validation loss')
            plt.title('Training and validation loss')
            plt.legend(loc=0)

            plt.show()

    @staticmethod
    def predict_pet(img_path, real_model_path, label_list):
      print(img_path)
      model = keras.models.load_model(real_model_path)
      img = image.load_img(img_path, target_size=(224, 224))
      img_array = image.img_to_array(img)
      img_array = np.expand_dims(img_array, axis=0)  # Add batch dimension
      img_array = img_array / 255.0  # Rescale to the range [0, 1]

      # Make the prediction
      prediction = model.predict(img_array)
      print(prediction)

      arr = prediction[0].tolist()
      list = []
      for index, value in enumerate(arr):
        list.append((label_list[index], value))
      list.sort(key = lambda x : -x[1])

      for i in range(3):
        print("유사도 " + str(i+1) + "등 : " + str(list[i][0]) + ", 정확도 " + str(list[i][1]))
      print()
      return list[0][0]


In [None]:
def train():
  model = EfficientNet()
  model.training_model()
  print("학습 완료")
train()

# [Step 4] 예측

In [None]:
def predict() :
  test_val = os.listdir(data_path + 'test_img')
  test_val.sort()

  test_count = 0
  correct_count = 0
  for val in test_val :
    test_img = os.listdir(data_path + 'test_img/' + str(val))
    test_img.sort()
    for img_name in test_img :
      img_path = data_path + 'test_img/' + str(val) + '/' + str(img_name)
      real_model_path = model_path + 'save_model/pet_classification_efficientnet_b0.h5'
      result = EfficientNet.predict_pet(img_path, real_model_path, test_val)

      test_count += 1
      if str(result) == str(val) :
        correct_count += 1
      print("%0.2f%%" % (correct_count / test_count * 100))
predict()

In [None]:
def graph():
  filename = 'acc_history_v20231151726.txt'
  EfficientNet.draw_graph(filename)
graph()