In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Lambda, Conv2D, Flatten, Reshape, Conv2DTranspose, UpSampling2D
from tensorflow.keras.layers import Layer, ConvLSTM2D, BatchNormalization, Concatenate, TimeDistributed, Dropout
from tensorflow.keras.callbacks import Callback, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.models import Model
from tensorflow.keras.applications import VGG19
import imageio
import random
from PIL import Image
from sklearn.model_selection import train_test_split

In [None]:
# VGG19을 통한 특징 추출기 정의 (block3_conv3 레이어 사용)
vgg = VGG19(weights='imagenet', include_top=False, input_shape=(128, 128, 3))
vgg.trainable = False
feature_extractor = Model(inputs=vgg.input, outputs=vgg.get_layer("block3_conv3").output)
tf.random.set_seed(None)  # None으로 설정하여 매번 다른 난수 생성

In [None]:
# KL 가중치 점진적 증가를 위한 스케줄링 콜백
class KLDivergenceWeightScheduler(Callback):
    def __init__(self, start_weight=1e-5, max_weight=1e-2, increase_rate=1e-5):
        super(KLDivergenceWeightScheduler, self).__init__()
        self.weight = start_weight
        self.max_weight = max_weight
        self.increase_rate = increase_rate

    def on_epoch_end(self, epoch, logs=None):
        # 점진적으로 kl_loss_weight를 증가시키고 최대 한도에서 제한
        if self.weight < self.max_weight:
            self.weight += self.increase_rate
        else:
            self.weight = self.max_weight

In [None]:
# 설정: 잠재 공간 크기 및 이미지 크기 조정
latent_dim = 32 # 잠재 공간 크기를 32로 조정
fixed_length = 10
img_size = (128, 128)  # 이미지 크기를 128x128로 조정

In [None]:
def preprocess_gif(gif_path, img_size=(128, 128)):
    gif = Image.open(gif_path)
    frames = []
    try:
        while True:
            frame = gif.convert('RGB').resize(img_size)
            frame_np = np.array(frame) / 255.0  # [0, 1] 범위로 정규화
            frames.append(frame_np)
            gif.seek(gif.tell() + 1)
    except EOFError:
        pass
    frames = np.array(frames)

    x = frames[0]  # 첫 번째 프레임
    y = frames[1:fixed_length+1] if len(frames) > fixed_length else np.pad(
        frames[1:], ((0, fixed_length - len(frames[1:])), (0, 0), (0, 0), (0, 0)), mode='constant')

    return x, y

In [None]:
# GIF 파일로부터 x, y 데이터셋 생성 함수
def load_gif_dataset(gif_paths, img_size=(128, 128)):
    x_data, y_data = [], []
    for gif_path in gif_paths:
        x, y = preprocess_gif(gif_path, img_size)
        x_data.append(x)
        y_data.append(y)
    return np.array(x_data), np.array(y_data)

In [None]:
def build_encoder(img_shape=(img_size[0], img_size[1], 3), latent_dim=32):
    input_img = Input(shape=img_shape, name='image_input')

    # Conv 레이어와 드롭아웃을 사용하여 공간적 특징을 추출
    x = Conv2D(32, (3, 3), activation='relu', padding='same', strides=(2, 2))(input_img)
    x = BatchNormalization()(x)
    x = Dropout(0.1)(x)  # 드롭아웃 추가
    x = Conv2D(64, (3, 3), activation='relu', padding='same', strides=(2, 2))(x)
    x = BatchNormalization()(x)
    x = Dropout(0.1)(x)  # 드롭아웃 추가
    x = Conv2D(128, (3, 3), activation='relu', padding='same', strides=(2, 2))(x)
    x = BatchNormalization()(x)
    x = Dropout(0.1)(x)  # 드롭아웃 추가
    x = Conv2D(256, (3, 3), activation='relu', padding='same', strides=(2, 2))(x)
    x = BatchNormalization()(x)
    x = Dropout(0.1)(x)  # 드롭아웃 추가
    x = Flatten()(x)

    # 잠재 벡터 추출
    z_mean = Dense(latent_dim, name="z_mean")(x)
    z_log_var = Dense(latent_dim, name="z_log_var")(x)

    # 노이즈 주입을 통해 잠재 벡터의 다양성 증가
    def sampling(args):
        z_mean, z_log_var = args
        epsilon = tf.random.normal(shape=(tf.shape(z_mean)[0], latent_dim), mean=0.0, stddev=1.0)
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon  # 노이즈 추가로 다양성 증가

    z = Lambda(sampling, output_shape=(latent_dim,), name="z")([z_mean, z_log_var])
    encoder = Model(input_img, [z_mean, z_log_var, z], name="encoder")
    return encoder

In [None]:
def build_decoder(latent_dim, output_shape=(fixed_length, img_size[0], img_size[1], 3)):
    decoder_input = Input(shape=(latent_dim,), name='decoder_input')

    # 잠재 벡터 확장 및 재구성 단계
    # 여기서 output_shape의 크기에 맞추어 32x32x128 형태로 맞춥니다.
    units = fixed_length * 16 * 16 * 128  # 원하는 크기 맞춤
    x = Dense(units, activation='relu')(decoder_input)
    x = Reshape((fixed_length, 16, 16, 128))(x)

    # ConvLSTM2D 레이어를 사용하여 시간적 특징을 추출
    x = ConvLSTM2D(128, (3, 3), activation='relu', padding='same', return_sequences=True)(x)
    x = ConvLSTM2D(64, (3, 3), activation='relu', padding='same', return_sequences=True)(x)

    # 업샘플링과 Conv2DTranspose를 사용하여 원하는 출력 크기까지 확장
    x = TimeDistributed(UpSampling2D(size=(2, 2)))(x)
    x = TimeDistributed(Conv2D(64, (3, 3), activation='relu', padding='same'))(x)
    x = TimeDistributed(UpSampling2D(size=(2, 2)))(x)
    x = TimeDistributed(Conv2D(32, (3, 3), activation='relu', padding='same'))(x)
    x = TimeDistributed(UpSampling2D(size=(2, 2)))(x)
    x = TimeDistributed(Conv2D(3, (3, 3), activation='sigmoid', padding='same'))(x)

    decoder = Model(decoder_input, x, name="decoder")
    return decoder


In [None]:
class CVAE(Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(CVAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.kl_weight = tf.Variable(1e-6, trainable=False)
        self.noise_scale = tf.Variable(1.0, trainable=False)

        # VGG 모델의 특정 레이어를 선택
        vgg = VGG19(include_top=False, weights="imagenet")
        self.perceptual_model = Model(inputs=vgg.input, outputs=vgg.get_layer("block5_conv2").output)
        self.perceptual_model.trainable = False  # VGG 모델의 가중치는 고정

        # Perceptual loss 계산을 위한 Mean Squared Error 객체 생성
        self.mse_loss = tf.keras.losses.MeanSquaredError()

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        z = Dropout(0.3)(z)  # 추가된 Dropout
        reconstructed = self.decoder(z)
        return reconstructed

    def train_step(self, data):
        x, y = data
        with tf.GradientTape() as tape:
            # 노이즈 스케일을 조절하며 다양성 증가
            z_mean, z_log_var, z = self.encoder(x)
            noise = tf.random.normal(shape=tf.shape(z), mean=0.0, stddev=self.noise_scale)
            z = z + noise  # 노이즈 추가
            reconstruction = self.decoder(z)

            # Perceptual Loss 계산
            batch_size, time_steps, height, width, channels = reconstruction.shape
            y_reshaped = tf.reshape(y, [batch_size * time_steps, height, width, channels])
            reconstruction_reshaped = tf.reshape(reconstruction, [batch_size * time_steps, height, width, channels])
            y_features = self.perceptual_model(y_reshaped)
            reconstruction_features = self.perceptual_model(reconstruction_reshaped)
            perceptual_loss = self.mse_loss(y_features, reconstruction_features)

            # KL Loss 계산 (β-VAE와 KL-Annealing 적용)
            beta = 4.0
            kl_loss = -0.5 * tf.reduce_mean(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = beta * tf.clip_by_value(kl_loss, 0, 5000)

            # 총 손실 계산 및 KL 가중치 증가
            total_loss = perceptual_loss + self.kl_weight * kl_loss
            self.add_loss(total_loss)  # <-- total_loss를 모델의 손실로 추가
            self.kl_weight.assign(tf.minimum(self.kl_weight + 1e-7, 1e-3))
            self.noise_scale.assign(tf.maximum(self.noise_scale * 0.99, 0.1))  # 노이즈 스케일 감소

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        return {"loss": total_loss, "perceptual_loss": perceptual_loss, "kl_loss": kl_loss}

    def test_step(self, data):
        x, y = data
        z_mean, z_log_var, z = self.encoder(x, training=False)
        reconstruction = self.decoder(z, training=False)

        # Perceptual Loss 계산
        batch_size, time_steps, height, width, channels = reconstruction.shape
        y_reshaped = tf.reshape(y, [batch_size * time_steps, height, width, channels])
        reconstruction_reshaped = tf.reshape(reconstruction, [batch_size * time_steps, height, width, channels])
        y_features = self.perceptual_model(y_reshaped)
        reconstruction_features = self.perceptual_model(reconstruction_reshaped)
        perceptual_loss = self.mse_loss(y_features, reconstruction_features)

        # KL Loss 계산
        beta = 4.0
        kl_loss = -0.5 * tf.reduce_mean(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = beta * tf.clip_by_value(kl_loss, 0, 5000)

        total_loss = perceptual_loss + self.kl_weight * kl_loss
        return {"val_loss": total_loss, "val_perceptual_loss": perceptual_loss, "val_kl_loss": kl_loss}

In [None]:
encoder = build_encoder(img_shape=(img_size[0], img_size[1], 3), latent_dim=latent_dim)
decoder = build_decoder(latent_dim=latent_dim, output_shape=(fixed_length, img_size[0], img_size[1], 3))
cvae = CVAE(encoder, decoder)

# compile()에 더미 손실 설정
cvae.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss=lambda y_true, y_pred: 0.0, run_eagerly=True)

reduce_lr = ReduceLROnPlateau(monitor="loss", factor=0.5, patience=5, min_lr=1e-6)

# KL 가중치 스케줄러 인스턴스
kl_scheduler = KLDivergenceWeightScheduler()

# EarlyStopping 콜백 설정
early_stopping = EarlyStopping(monitor='loss', patience=10, min_delta=0.001, restore_best_weights=True)

In [None]:
# 데이터셋 로드 및 분할
gif_dir = '/content/gifs'
gif_paths = [os.path.join(gif_dir, f) for f in os.listdir(gif_dir) if f.endswith('.gif')]

train_paths, val_paths = train_test_split(gif_paths, test_size=0.2, random_state=42)
x_train, y_train = load_gif_dataset(train_paths, img_size=img_size)
x_val, y_val = load_gif_dataset(val_paths, img_size=img_size)

# 데이터셋 형태 출력
print(f"x_train shape: {x_train.shape}, y_train shape: {y_train.shape}")
print(f"x_val shape: {x_val.shape}, y_val shape: {y_val.shape}")

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(2)
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(2)

x_train shape: (23, 128, 128, 3), y_train shape: (23, 10, 128, 128, 3)
x_val shape: (6, 128, 128, 3), y_val shape: (6, 10, 128, 128, 3)


In [None]:
# 모델 학습
cvae.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=200,
    callbacks=[reduce_lr, early_stopping]
)

Epoch 1/200
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 1s/step - kl_loss: 0.0079 - loss: 1.1000 - perceptual_loss: 1.1000 - val_val_kl_loss: 0.0083 - val_val_loss: 1.1812 - val_val_perceptual_loss: 1.1812 - learning_rate: 1.0000e-04
Epoch 2/200
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 1s/step - kl_loss: 0.0124 - loss: 1.0997 - perceptual_loss: 1.0997 - val_val_kl_loss: 0.0257 - val_val_loss: 1.1807 - val_val_perceptual_loss: 1.1807 - learning_rate: 1.0000e-04
Epoch 3/200
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 1s/step - kl_loss: 0.3183 - loss: 1.0985 - perceptual_loss: 1.0985 - val_val_kl_loss: 1.4765 - val_val_loss: 1.1779 - val_val_perceptual_loss: 1.1779 - learning_rate: 1.0000e-04
Epoch 4/200
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 1s/step - kl_loss: 10.3858 - loss: 1.0913 - perceptual_loss: 1.0912 - val_val_kl_loss: 30.8692 - val_val_loss: 1.1642 - val_val_perceptual_loss: 1.1640 - lear

<keras.src.callbacks.history.History at 0x7bfaab190520>

In [None]:
def generate_frames_from_png(png_path, frame_count=10, latent_dim=32, variation_scale=0.1):
    # PNG 파일을 불러와서 크기 조정 및 정규화
    png_img = Image.open(png_path).convert('RGB').resize((128, 128))
    png_np = np.array(png_img) / 255.0
    png_np = png_np.reshape((1, 128, 128, 3))  # 첫 번째 프레임 형식으로 맞춤

    # 인코더를 통해 잠재 벡터 생성
    z_mean, z_log_var, _ = encoder.predict(png_np)

    # 각 프레임을 생성하기 위해 잠재 벡터에 가우시안 노이즈 추가
    frames = []
    for i in range(frame_count):
        # 잠재 공간에서 샘플링하여 새로운 프레임 생성
        z_sample = z_mean + np.random.normal(0, variation_scale, size=z_mean.shape)
        generated_sequence = decoder.predict(z_sample)  # 시퀀스 형태로 출력됨

        # i번째 프레임 선택
        if generated_sequence.shape == (1, fixed_length, 128, 128, 3):
            frame = generated_sequence[0, i % fixed_length]  # 시퀀스의 i번째 프레임
        else:
            frame = np.zeros((128, 128, 3))

        # 프레임을 [0, 255] 범위로 변환 및 uint8로 캐스팅
        frame = (frame * 255).astype(np.uint8)
        frames.append(Image.fromarray(frame))

    return frames


In [None]:
def save_gif(frames, output_path, duration=100):
    frames[0].save(output_path, save_all=True, append_images=frames[1:], loop=0, duration=duration)

In [None]:
# # 예시 호출
frames = generate_frames_from_png('/content/93000.png', frame_count=10)
save_gif(frames, '/content/93000.gif')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step


In [None]:
# !rm -rf '/content/frames'
# !rm '/content/93000.gif'
# !rm '/content/band.gif'
# !rm '/content/black.gif'
# !rm '/content/pink.gif'

In [None]:
#@title 파일 다운받기
# !zip -r '/content/frames.zip' '/content/frames'

# from google.colab import files
# files.download("/content/frames.zip")

In [None]:
from tensorflow.keras.models import load_model

cvae.save("cvae_model.h5")

# 저장된 모델 로드
cvae_loaded = load_model("cvae_model.h5", custom_objects={"KLDivergenceWeightScheduler": KLDivergenceWeightScheduler})

# 로드한 모델을 사용하여 예측
frames = generate_frames_from_png("/content/93000.png", frame_count=10, latent_dim=32, variation_scale=1)

In [None]:
# encoder.save("encoder_model.h5")
# decoder.save("decoder_model.h5")

# # 인코더와 디코더 로드
# encoder_loaded = load_model("encoder_model.h5")
# decoder_loaded = load_model("decoder_model.h5")

# # 로드된 인코더와 디코더로 CVAE 인스턴스 생성
# cvae_loaded = CVAE(encoder_loaded, decoder_loaded)
# cvae_loaded.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss=lambda y_true, y_pred: 0.0, run_eagerly=True)

# # 로드된 인코더 및 디코더를 사용하여 예측
# frames = generate_frames_from_png("path/to/image.png", frame_count=10, latent_dim=32, variation_scale=1)