In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd "/content/drive/MyDrive"

/content/drive/MyDrive


In [None]:
import os
import glob
import pandas as pd
import tensorflow as tf
import numpy as np
import json
import matplotlib.pyplot as plt
import tensorflow as tf

from PIL import Image, ImageDraw
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
base_folder = 'data/test/dog/'
class_folders = os.listdir(base_folder)

FileNotFoundError: ignored

In [None]:
image_paths = []
json_paths = []

for class_folder in class_folders:
    image_files = glob.glob(os.path.join(base_folder, class_folder, '*.jpg'))
    json_files = [file.replace('.jpg', '.json') for file in image_files]

    image_paths.extend(image_files)
    json_paths.extend(json_files)

image_paths = [path.replace('\\', '/') for path in image_paths]
json_paths = [path.replace('\\', '/') for path in json_paths]

df = pd.DataFrame({
    'image_path': image_paths,
    'json_path': json_paths,
    'label': [path.split('/')[-2] for path in image_paths]
})

In [None]:
def create_mask(image_path, json_path):
    try:
        image = Image.open(image_path)
        image = image.resize((256, 256))

        with open(json_path, 'r', encoding='UTF-8') as file:
            data = json.load(file)

        mask = Image.new('L', (256, 256), 0)
        draw = ImageDraw.Draw(mask)

        for annotation in data['labelingInfo']:
            if 'polygon' in annotation:
                polygon = []
                for i in range(1, len(annotation['polygon']['location'][0]) // 2 + 1):
                    x_key = f'x{i}'
                    y_key = f'y{i}'
                    x = annotation['polygon']['location'][0].get(x_key)
                    y = annotation['polygon']['location'][0].get(y_key)
                    if x is not None and y is not None:
                        polygon.append((x, y))

                if polygon:
                    draw.polygon(polygon, outline=1, fill=1)
        image = np.array(image, dtype=np.float32)
        mask = np.array(mask, dtype=np.float32)
        mask = np.expand_dims(mask, axis=-1)

        return image, mask
    except Exception as e:
        empty_image = np.zeros((256, 256, 3), dtype=np.float32)
        empty_mask = np.zeros((256, 256, 1), dtype=np.float32)
        return empty_image, empty_mask

In [None]:
def tf_create_mask(image_path, json_path):
    [image, mask] = tf.numpy_function(create_mask, [image_path, json_path], [tf.float32, tf.float32])
    image.set_shape([256, 256, 3])
    mask.set_shape([256, 256, 1])
    return image, mask

image_paths = df['image_path'].values
json_paths = df['json_path'].values

dataset = tf.data.Dataset.from_tensor_slices((image_paths, json_paths))
dataset = dataset.map(tf_create_mask, num_parallel_calls=tf.data.AUTOTUNE)

dataset_size = len(image_paths)
train_size = int(0.8 * dataset_size)
val_size = dataset_size - train_size

dataset = dataset.shuffle(buffer_size=dataset_size)

train_dataset = dataset.take(train_size)
val_dataset = dataset.skip(train_size)

train_dataset = train_dataset.batch(32)
val_dataset = val_dataset.batch(32)

In [None]:
def unet_model(input_size=(256, 256, 3)):
    inputs = Input(input_size)

    # 인코더
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Conv2D(512, 3, activation='relu', padding='same')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)

    # 디코더
    up5 = UpSampling2D(size=(2, 2))(conv4)
    merge5 = concatenate([conv3, up5], axis=3)
    conv5 = Conv2D(256, 3, activation='relu', padding='same')(merge5)
    conv5 = Conv2D(256, 3, activation='relu', padding='same')(conv5)

    up6 = UpSampling2D(size=(2, 2))(conv5)
    merge6 = concatenate([conv2, up6], axis=3)
    conv6 = Conv2D(128, 3, activation='relu', padding='same')(merge6)
    conv6 = Conv2D(128, 3, activation='relu', padding='same')(conv6)

    up7 = UpSampling2D(size=(2, 2))(conv6)
    merge7 = concatenate([conv1, up7], axis=3)
    conv7 = Conv2D(64, 3, activation='relu', padding='same')(merge7)
    conv7 = Conv2D(64, 3, activation='relu', padding='same')(conv7)

    # 출력
    conv8 = Conv2D(1, (1, 1), activation='sigmoid')(conv7)

    model = Model(inputs=inputs, outputs=conv8)

    return model

unet = unet_model()
unet.compile(optimizer='adam',
             loss=BinaryCrossentropy(),
             metrics=['accuracy', Precision(), Recall()])

In [None]:
early_stopping = EarlyStopping(monitor='val_loss',
                               patience=10,
                               verbose=1,
                               mode='min',
                               restore_best_weights=True)

checkpoint = ModelCheckpoint('model/dog_unet_model_{epoch:02d}.h5',
                             monitor='val_loss',
                             verbose=1,
                             save_best_only=False,
                             mode='min',
                             save_freq='epoch')

In [None]:
from tensorflow.keras.models import load_model

saved_model = load_model("model/dog_unet_model_01.h5")

history = saved_model.fit(train_dataset,
                          validation_data=val_dataset,
                          epochs=20,
                          initial_epoch=2,
                          validation_steps=val_size // 32,
                          steps_per_epoch=train_size // 32,
                          callbacks=[checkpoint, early_stopping])

In [None]:
# 훈련 및 검증 손실 그래프
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# 훈련 및 검증 정확도 그래프
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.show()