In [1]:
import os
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Input, Conv2D, UpSampling2D, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tqdm.notebook import tqdm
from tensorflow.keras.mixed_precision import Policy, set_global_policy
from IPython.display import display, HTML

from google.colab import drive
drive.mount('/content/drive')



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!ls /content/drive/MyDrive/open


sample_submission  test.csv  test_input  train.csv  train_gt  train_input


In [2]:
# Google Drive 절대 경로 설정
BASE_DIR = '/content/drive/MyDrive/open/'

train_csv_path = BASE_DIR + 'train.csv'
# https://drive.google.com/file/d/1qBWbf0Z4ofV_fmK48g--diwkA4Lec5NM/view?usp=drive_link
test_csv_path = BASE_DIR + 'test.csv'

# Mixed Precision 활성화
policy = Policy('mixed_float16')
set_global_policy(policy)

In [3]:
# CSV 파일 로드
train_metadata = pd.read_csv(train_csv_path)
test_metadata = pd.read_csv(test_csv_path)

# 데이터 경로 업데이트
train_metadata['input_image_path'] = train_metadata['input_image_path'].apply(lambda x: BASE_DIR + x[2:])
train_metadata['gt_image_path'] = train_metadata['gt_image_path'].apply(lambda x: BASE_DIR + x[2:])

In [4]:
# 이미지 로드 함수 정의
def load_image(image_path, target_size=(224, 224), grayscale=False):
    img = load_img(image_path, target_size=target_size, color_mode='grayscale' if grayscale else 'rgb')
    img = img_to_array(img) / 255.0  # [0, 1]로 정규화
    return img

# 데이터 로드 파이프라인 생성
def create_dataset(metadata, batch_size=32, image_size=(224, 224)):
    def generator():
        for _, row in metadata.iterrows():
            input_image = load_image(row['input_image_path'], target_size=image_size, grayscale=True)
            gt_image = load_image(row['gt_image_path'], target_size=image_size, grayscale=False)
            yield input_image.astype('float32'), gt_image.astype('float32')

    dataset = tf.data.Dataset.from_generator(
        generator,
        output_signature=(
            tf.TensorSpec(shape=(image_size[0], image_size[1], 1), dtype=tf.float32),
            tf.TensorSpec(shape=(image_size[0], image_size[1], 3), dtype=tf.float32),
        )
    )
    return dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)

In [5]:
# 학습 및 검증 데이터셋 생성
train_dataset = create_dataset(train_metadata, batch_size=16)
test_dataset = create_dataset(test_metadata, batch_size=16)

# MobileNetV2 기반 모델 생성
def build_lightweight_model(input_shape=(224, 224, 1)):
    base_model = MobileNetV2(input_shape=(224, 224, 3), alpha=0.35, include_top=False, weights='imagenet')

    # 입력을 3채널로 변환
    inputs = Input(shape=input_shape)
    x = Concatenate()([inputs, inputs, inputs])

    # MobileNetV2 Encoder
    x = base_model(x, training=False)

    # Decoder
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    outputs = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

    # 최종 모델
    model = Model(inputs, outputs)
    return model

In [7]:
# 모델 생성
model = build_lightweight_model()

# 옵티마이저 및 손실 함수
optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.MeanSquaredError()

# 학습 및 검증 함수 정의
@tf.function
def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)
        loss = loss_fn(targets, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

@tf.function
def validate_step(inputs, targets):
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)
    return loss

# 학습 시작
epochs = 1
train_steps = len(train_metadata) // 32
val_steps = len(test_metadata) // 32

In [None]:
for epoch in range(1, epochs + 1):
    # 학습 정보 출력 (HTML 형식)
    epoch_html = f"""
    <div style="font-size:20px; font-weight:bold; color:######;">
        Epoch {epoch}/{epochs} - Training Parameters
    </div>
    """
    display(HTML(epoch_html))

    # 학습 파라미터 DataFrame
    df_epoch = pd.DataFrame({
        "Epoch": [epoch],
        "Train Steps": [train_steps],
        "Validation Steps": [val_steps],
        "Batch Size": [16],
        "Input Shape": ["(224, 224, 1)"],
        "Output Shape": ["(224, 224, 3)"],
        "Optimizer": ["Adam"],
        "Loss Function": ["Mean Squared Error"]
    })
    display(df_epoch)

    # Training 진행률 표시
    train_loss_sum = 0.0
    train_pbar = tqdm(total=train_steps, desc="Training", bar_format="{l_bar}{bar} | {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
    for step, (inputs, targets) in enumerate(train_dataset):
        train_loss = train_step(inputs, targets)
        train_loss_sum += train_loss.numpy()
        train_pbar.set_postfix({"Loss": f"{train_loss.numpy():.4f}"})
        train_pbar.update(1)
        if step + 1 >= train_steps:
            break
    train_pbar.close()

    # Validation 진행률 표시
    val_loss_sum = 0.0
    val_pbar = tqdm(total=val_steps, desc="Validating", bar_format="{l_bar}{bar} | {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
    for step, (inputs, targets) in enumerate(test_dataset):
        val_loss = validate_step(inputs, targets)
        val_loss_sum += val_loss.numpy()
        val_pbar.set_postfix({"Val Loss": f"{val_loss.numpy():.4f}"})
        val_pbar.update(1)
        if step + 1 >= val_steps:
            break
    val_pbar.close()

    # Epoch별 평균 손실 출력
    train_loss_avg = train_loss_sum / train_steps
    val_loss_avg = val_loss_sum / val_steps

    # 결과 출력 (HTML 형식)
    result_html = f"""
    <div style="font-size:18px; font-weight:bold; color:#d9534f;">
        Epoch {epoch} Results:
        <ul>
            <li>Train Loss = {train_loss_avg:.4f}</li>
            <li>Validation Loss = {val_loss_avg:.4f}</li>
        </ul>
    </div>
    """
    display(HTML(result_html))

Unnamed: 0,Epoch,Train Steps,Validation Steps,Batch Size,Input Shape,Output Shape,Optimizer,Loss Function
0,1,925,3,16,"(224, 224, 1)","(224, 224, 3)",Adam,Mean Squared Error


Training:   0%|           | 0/925 [00:00<?]

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def ssim_score(true, pred):
  # 전체 RGB 이미지를 사용해 SSIM 계산 (channel_axis=-1)
  ssim_value = ssim(true, pred, channel_axis=-1, data_range=pred.max() - pred.min())
  return ssim

def massked_ssim_score(true, pred, maxk):
  # 손실 영역의 좌표에서만 RGB 채널별 픽셀 값 추출
  true_masked_pixels = true[mask >0]
  pred_masked_pixels = pred[mask >0]

  # 손실 영역 픽셀 만으로 SSIM 계산(채널축 사용)
  ssim_value = ssim(
      true_masked_pixels,
      pred_masked_pixels,
      channel_axis = -1,
      data_range   = pred.max() - pred.min()
  )
  return ssim_value

def histogram_similarity(true, pred):
  # RGB 이미지를 HSV로 변환
  true_hsv = cv2.cvtColor(True, cv2.COLOR_RGB2HSV)
  pred_hsv = cv2.cvtColor(pred, cv2.COLOR_RGB2HSV)

  # H 채널에서 히스토그램 계산 및 정규화
  hist_true = cv2.calcHist([true_hsv], [0], None, [180], [0, 180])
  hist_pred = cv2.calcHist([pred_hsv], [0], None, [180], [0, 180])
  hist_true = cv2.normalize(hist_true, hist_true).flatten()
  hist_pred = cv2.normalize(hist_pred, hist_pred).flatten()

  # 히스토그램 간 유사도 계산 (상관 계수 사용)
  similarity = cv2.compareHist(hist_true, hist_pred, cv2.HISTCMP_CORREL)
  return similarity
