In [None]:
#tensorflow랑 keras 충돌 방지 위해서 기존의 keras 삭제
!pip uninstall -y keras

In [None]:
#텐서플로우 설치
!pip install tensorflow

In [None]:
import tensorflow as tf
print("Tensorflow version " + tf.__version__)

In [None]:
#구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

### 4가지 카테고리에 해당하는 클래스 이름을 폴더에서 읽어와 레이블로 사용하기 위한 준비

In [None]:
# angry : angry
# happy : happy
# relaxed : relaxed
# sad : sad

import os
driver_image_path = '/content/drive/MyDrive/Dog Emotion'
class_names = sorted(os.listdir(driver_image_path))
print(class_names)

In [None]:
#openCV 설치
!pip install opencv-python

In [None]:
#이미지 크기 확인하기
import cv2
image = cv2.imread('/content/drive/MyDrive/Dog Emotion/angry/09dUVMcjCDfOtbeYDQg5Fvu3GPHWJg811.jpg')
image.shape

### OpenCV로 불러온 이미지의 파일 경로를 images 리스트에 넣어주고 인코딩한 레이블을 각 이미지에 할당

In [None]:
import cv2
import os

resize_size = (200, 200)
img_size = (200, 200, 3)

images = []
labels = []

for class_name in class_names:
    images_files_folder_name = os.path.join(driver_image_path, class_name)
    if os.path.isdir(images_files_folder_name):
        for image_name in os.listdir(images_files_folder_name):
            if image_name.endswith('.jpg'):
                image_full_path = os.path.join(images_files_folder_name, image_name)
                img = cv2.imread(image_full_path)

                if img is not None:
                    img_resized = cv2.resize(img, resize_size)
                    img_cvt = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)

                    images.append(img_cvt)
                    labels.append(class_names.index(class_name))
                else:
                    print(f"읽기 실패: {image_full_path}")

print(f"Total images : {len(images)} , Total labels : {len(labels)}")

### 성능 최적화를 위해 이미지와 마스크 리스트를 넘파이 형태로 변환

In [None]:
import numpy as np

images = np.asarray(images)
labels = np.asarray(labels)

### images , labels 리스트를 x_train 과 x_test로 분할(Train에 80%, Test에 20% 할당)

In [None]:
from sklearn.model_selection import train_test_split
x_train, x_test , y_train , y_test = train_test_split(images , labels , test_size = 0.2)

### 랜덤회전 , 랜덤 수평플립, 랜덤 대비 조정 3가지 데이터 증강 적용

In [None]:
import tensorflow as tf

data_augmentation = tf.keras.Sequential([
    #랜덤 회전
    tf.keras.layers.RandomRotation(factor=0.05),
    #랜덤 수평 플립
    tf.keras.layers.RandomFlip(mode='horizontal'),
    #랜덤 대비 조정
    tf.keras.layers.RandomContrast(factor=0.2),
])

### 차례대로 이미지 크기를 지정된 크기로 조정해주는 전처리/이미지 증강 함수, 이미지와 레이블을 TensorFlow 데이터셋으로 변환하고, 전처리 및 증강을 적용하여 배치와 프리패칭을 설정하는 Tensorflow 데이터셋 생성 함수 정의

In [None]:
#이미지를 지정한 크기로 변환해주는 함수
def preprocessing_image(image , label):
  image = tf.image.resize(image , img_size[:2])
  return image , label

#이미지 증강을 적용해주는 함수
def augment_image(image , label):
  image = data_augmentation(image)
  return image , label

#이미지와 레이블을 tensorflow dataset으로 변환하고, 전처리 및 증강을 적용하여 배치와 프리패칭을 설정하는 함수
def create_tensorflow_dataset(images ,labels , batch_size = 32 , buffer_size = 1000 , augment = False):
  dataset = tf.data.Dataset.from_tensor_slices((images , labels))
  dataset = dataset.map(preprocessing_image , num_parallel_calls = tf.data.AUTOTUNE)
  if augment:
    dataset = dataset.map(augment_image , num_parallel_calls = tf.data.AUTOTUNE)
  dataset = dataset.shuffle(buffer_size)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(buffer_size = tf.data.AUTOTUNE)
  return dataset

In [None]:
global_batch_size = 32

### 앞서 정의한 TensorFlow Dataset 함수를 이용하여 Train_dataset 및 Test_dataset 데이터셋 생성(Train_dataset에만 데이터 증강 적용)

In [None]:
import tensorflow as tf

train_dataset = create_tensorflow_dataset(x_train , y_train , batch_size = global_batch_size , buffer_size = len(x_train) , augment = True)
val_dataset = create_tensorflow_dataset(x_test , y_test , batch_size = global_batch_size , buffer_size = len(x_test) , augment = False)

In [None]:
import tensorflow as tf

# TPU 있으면 TPU, 없으면 CPU/GPU 선택
try:
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.config.experimental_connect_to_cluster(resolver)
    tf.tpu.experimental.initialize_tpu_system(resolver)
    strategy = tf.distribute.TPUStrategy(resolver)
    print("TPU 사용:", resolver.master())
except Exception:
    strategy = tf.distribute.get_strategy()
    print("TPU 없음, 기본 전략 사용:", type(strategy))

data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),
])

def preprocess(image, label):
    image = tf.image.resize(image, [img_size[0], img_size[1]])
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

def augment(image, label):
    image = data_augmentation(image)
    return image, label

def create_tensorflow_dataset(images, labels, batch_size, buffer_size, augment_flag=False):
    ds = tf.data.Dataset.from_tensor_slices((images, labels))
    ds = ds.shuffle(buffer_size)
    ds = ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    if augment_flag:
        ds = ds.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

global_batch_size = 32

train_dataset = create_tensorflow_dataset(x_train, y_train,
                                          batch_size=global_batch_size,
                                          buffer_size=len(x_train),
                                          augment_flag=True)
val_dataset = create_tensorflow_dataset(x_test, y_test,
                                        batch_size=global_batch_size,
                                        buffer_size=len(x_test),
                                        augment_flag=False)

with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=img_size),
        tf.keras.layers.Conv2D(64, kernel_size=3, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Conv2D(64, kernel_size=3, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling2D(pool_size=2),

        tf.keras.layers.Conv2D(128, kernel_size=2, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Conv2D(128, kernel_size=2, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling2D(pool_size=2),

        tf.keras.layers.Conv2D(256, kernel_size=2, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Conv2D(256, kernel_size=2, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling2D(pool_size=2),

        tf.keras.layers.Conv2D(512, kernel_size=2, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Conv2D(512, kernel_size=2, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling2D(pool_size=2),

        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(len(class_names), activation='softmax')
    ])
    model.compile(
        loss='sparse_categorical_crossentropy',
        optimizer='adam',
        metrics=['accuracy']
    )

early_stopping_cb = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', patience=20, restore_best_weights=True
)

#학습
history = model.fit(
    train_dataset,
    epochs=150,
    validation_data=val_dataset,
    callbacks=[early_stopping_cb]
)