# 필요한 라이브러리 install 및 import

In [None]:
!pip install kagglehub

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

import zipfile
import os
from PIL import Image

# 압축 해제 및 데이터셋 분할

In [None]:
zip_path = "/content/coral_dataset.zip"
extract_folder = "coral_dataset"

with zipfile.ZipFile(zip_path, 'r') as zipf:
    zipf.extractall(extract_folder)

bleached_folder = os.path.join(extract_folder, "bleached")
normal_folder = os.path.join(extract_folder, "normal")

images = []
labels = []

def load_images_from_folder(folder, label):
    for filename in os.listdir(folder):
        file_path = os.path.join(folder, filename)
        try:
            img = Image.open(file_path).convert("RGB")
            img = img.resize((64, 64))
            images.append(img)
            labels.append(label)
        except Exception as e:
            print(f"Error loading {file_path}: {e}")

load_images_from_folder(bleached_folder, label=1)
load_images_from_folder(normal_folder, label=0)

train_images, temp_images, train_labels, temp_labels = train_test_split(
    images, labels, test_size=0.2, random_state=42
)
val_images, test_images, val_labels, test_labels = train_test_split(
    temp_images, temp_labels, test_size=0.2, random_state=42
)

train_images = np.array(train_images)
train_labels = np.array(train_labels)
val_images = np.array(val_images)
val_labels = np.array(val_labels)
test_images = np.array(test_images)
test_labels = np.array(test_labels)

train_images = np.array(train_images)  # 리스트를 numpy 배열로 변환
print(train_images.shape)  # 결과 확인

print("Train Images:", len(train_images))
print("Validation Images:", len(val_images))
print("Test Images:", len(test_images))

In [None]:
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True
)
val_test_datagen = ImageDataGenerator()

# 본격 모델

직접 제작하고 추가한 함수/메커니즘:
1. Activation Function (i): **OutOut**
2. Activation Function (ii): **YooSeong Function** `(in final classification)`
3. **IOU Loss Function**
4. **Self-Attention Layer**

In [None]:
def outout(inputs, num_units, dropout_rate=0.3, training=True):

    if training:
        inputs = tf.nn.dropout(inputs, rate=dropout_rate)

    return tf.reduce_max(tf.reshape(inputs, [-1, num_units, 2]), axis=-1)

In [None]:
class YooSeongActivation(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(YooSeongActivation, self).__init__(**kwargs)

    def call(self, inputs):
        return (tf.tanh(inputs) ** 2) * tf.exp(inputs) / (tf.exp(inputs) + 1)

In [None]:
def YooSeong(x):
  return (tf.tanh(x) ** 2) * tf.exp(x) / (tf.exp(x) + 1)

In [None]:
def IOU_Loss(y_true, y_pred):
    y_true = tf.cast(y_true, y_pred.dtype)
    intersection = tf.reduce_sum(tf.minimum(y_true, y_pred), axis=-1)
    union = tf.reduce_sum(tf.maximum(y_true, y_pred), axis=-1)
    iou = intersection / (union + 1e-7)
    loss = 1 - iou
    return tf.reduce_mean(loss)

In [None]:
# Self-Attention Layer의 구현은 생성형 AI ChaTGPT의 도움을 받음.
class SelfAttention(layers.Layer):
    def __init__(self, embed_dim):
        super(SelfAttention, self).__init__()
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.softmax = layers.Softmax(axis=-1)

    def call(self, inputs):
        # Query, Key, Value 계산
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)

        # Attention Scores 계산
        attention_scores = tf.matmul(query, key, transpose_b=True)
        attention_weights = self.softmax(attention_scores)

        # Attention Output 계산
        attention_output = tf.matmul(attention_weights, value)
        return attention_output

In [None]:
model = keras.Sequential([

    layers.Conv2D(32, kernel_size=(3, 3), strides=(1, 1), activation='relu', input_shape=(64, 64, 3)),
    layers.MaxPooling2D(pool_size=(2, 2)),

    SelfAttention(embed_dim=32),

    layers.Conv2D(64, kernel_size=(3, 3), strides=(1, 1), activation='relu'),
    layers.MaxPooling2D(pool_size=(2, 2)),

    SelfAttention(embed_dim=64),

    layers.Flatten(),

    layers.Dense(6 * 24 * 24 * 2),
    layers.Lambda(lambda x: outout(x, 6 * 24 * 24, dropout_rate=0.2)),

    layers.Dense(2),
    YooSeongActivation()
])

In [None]:
model.summary()

In [None]:
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

In [None]:
early_stopping = EarlyStopping(monitor='val_accuracy', patience=5, verbose=1, restore_best_weights=True)
model_checkpoint = ModelCheckpoint(filepath='best_model.keras', monitor='val_accuracy', save_best_only=True, verbose=1)
learning_rate_reduction = ReduceLROnPlateau(monitor='val_accuracy', patience=3, verbose=1, factor=0.5, min_lr=0.00001)

In [None]:
history = model.fit(
    train_datagen.flow(train_images, train_labels, batch_size=32),
    validation_data=(val_images, val_labels),
    epochs=50,
    callbacks=[early_stopping, model_checkpoint, learning_rate_reduction]
)

# Test Dataset 시각화

In [None]:
y_pred = model.predict(test_images)
y_pred_classes = np.argmax(y_pred, axis=1)

In [None]:
plt.figure(figsize=(10, 10))
for i, idx in enumerate(np.random.choice(range(len(test_images)), 5)):
    plt.subplot(4, 2, i + 1)
    plt.imshow(test_images[idx], cmap='gray')
    plt.title(f"True: {test_labels[idx]}, Pred: {y_pred_classes[idx]}")
    plt.axis('off')
plt.tight_layout()
plt.show()

# 최종 모델 평가

In [None]:
test_loss, test_accuracy = model.evaluate(test_images, test_labels, verbose=0)
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10, 5))
ax[0].plot(history.history['loss'], label="Training Loss")
ax[0].plot(history.history['val_loss'], label="Validation Loss")
ax[0].legend()

ax[1].plot(history.history['accuracy'], label="Training Accuracy")
ax[1].plot(history.history['val_accuracy'], label="Validation Accuracy")
ax[1].legend()
plt.show()

# Coral_Located

In [None]:
test_images = []
test_folder = "coral_located"
for filename in os.listdir(test_folder):
        file_path = os.path.join(test_folder, filename)
        try:
            img = Image.open(file_path).convert("RGB")
            img = img.resize((64, 64))
            test_images.append(img)
        except Exception as e:
            print(f"Error loading {file_path}: {e}")
test_images = np.array(test_images)
print(test_images.shape)

prediction = model.predict(test_images)
print(prediction)
'''for filename in os.listdir(test_folder):
    file_path = os.path.join(test_folder, filename)
    img_array = Image.open(file_path).convert("RGB")
    if img_array is not None:
        # 배치 차원 추가
        print(img_array)
        img_array = np.expand_dims(img_array, axis=0)
        print(img_array.shape)
        # 모델 예측
        prediction = model.predict(img_array)
        # 백화 확률 (클래스 1에 대한 확률)
        bleached_probability = prediction[0][1]
        # 결과 저장
        results.append({"filename": filename, "bleached_probability": bleached_probability})'''

# 결과를 DataFrame으로 변환
results_df = pd.DataFrame(prediction)

# 결과를 CSV 파일로 저장
results_csv_path = "predictions.csv"
results_df.to_csv(results_csv_path, index=False)

print(f"Predictions saved to {results_csv_path}")