# Diffusion
1. 커스텀 학습 사용방식


사전준비

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
import tensorflow as tf
import os
from PIL import Image
import numpy as np

pathTrain="C:\\Users\\82109\\Desktop\\train" #학습할 이미지 
pathTest="C:\\Users\\82109\\Desktop\\test" #벨리데이션 테스트 이미지
pathGeneratorTest="C:\\Users\\82109\\Desktop\\generatorTest" #생성 테스트용 이미지
pathSave="C:\\Users\\82109\\Desktop\\checkPoint" #모델 저장할 위치
batchSize=1

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.experimental.list_logical_devices('GPU')

이미지 전처리

In [None]:
#해당 디렉토리의 모든 파일명을 가져옴

#경로 반환
def GetFilePath(path,end=".gif"):
  gifFileList=os.listdir(path)
  gifPath=[]
  for name in gifFileList:
    if name.endswith(tuple(end)):
      gifPath.append(os.path.join(path,name))
  return gifPath

#못쓰는 데이터를 걸러줌
def PreprocessGif(path,frame=5):
  gif=Image.open(path)
  size=gif.n_frames
  gif.close()
  if size<frame:
    print(path,": ",size,"사용불가능")
    os.remove(path=path)
  else:
    print(path,": ",size," 사용가능")

#gif를 읽고 넘파이 배열로 노멀라이즈해줌
def LoadGif(path, paddingSize=32):
  gif=Image.open(path)
  flip=np.random.randint(0,7)
  extractFrame=np.random.randint(4,11)
  remainFrame=gif.n_frames-extractFrame
  start=0
  end=0
  if remainFrame<=1:
    start=1
    end=gif.n_frames
  else:
    start=np.random.randint(1,remainFrame+1)
    end=start+extractFrame
  images=[]
  for i in range(start,end):
    gif.seek(i)
    temp=gif.transpose(flip).convert("RGBA")
    temp=np.array(temp)
    height=(paddingSize-temp.shape[0]%paddingSize)%paddingSize
    width=(paddingSize-temp.shape[1]%paddingSize)%paddingSize
    temp=np.pad(temp, pad_width=((0,height),(0,width),(0,0)),mode="constant",constant_values=0)
    images.append(temp)
  gif.close()
  return np.array(images)/255.0

#모든 프레임을 반환
def LoadGifAll(path, paddingSize=32):
  gif=Image.open(path)
  flip=np.random.randint(0,7)

  images=[]
  for i in range(1,gif.n_frames):
    gif.seek(i)
    temp=gif.transpose(flip).convert("RGBA")
    temp=np.array(temp)
    height=(paddingSize-temp.shape[0]%paddingSize)%paddingSize
    width=(paddingSize-temp.shape[1]%paddingSize)%paddingSize
    temp=np.pad(temp, pad_width=((0,height),(0,width),(0,0)),mode="constant",constant_values=0)
    images.append(temp)
  gif.close()
  return np.array(images)/255.0

#인풋데이터와 아웃풋 데이터를 분리
def Divide(arr):
  evens=arr[0::2]
  odds=arr[1::2]
  if evens.shape[0] != odds.shape[0]:
    evens=evens[0:-1]
  return [evens,odds]

  
#데이터셋 제너레이터 생성
def DatasetGenerater(gifPath):
  #gif파일을 반환
  for i in gifPath:
      x1,y= Divide(LoadGif(i))  
      x2=np.random.rand(x1.shape[0],x1.shape[1],x1.shape[2],x1.shape[3])
      step=np.ones((x1.shape[0],x1.shape[1],x1.shape[2],1))
      yield (x1,x2,step,y)

def SaveGif(path, images):
  imgs=[]
  for i in images:
    img=Image.fromarray((i*255).round().astype(np.int8), mode="RGBA")
    imgs.append(img)
  imgs[0].save(path, save_all=True, append_images=imgs[1:], disposal = 2,duration=150, loop=0)
  
  

In [None]:
print("트레인셋 전처리")
#사용불가능 파일 전처리
gifPath=GetFilePath(pathTrain) 
for i in gifPath:
  PreprocessGif(i)
gifPath=GetFilePath(pathTrain)

print("테스트셋 전처리")
gifPathTest=GetFilePath(pathTest) 
for i in gifPathTest:
  PreprocessGif(i)
gifPathTest=GetFilePath(pathTest)


trainDataset=tf.data.Dataset.from_generator(DatasetGenerater,
                               args=[gifPath], output_types=(tf.float32,tf.float32,tf.float32,tf.float32),
                               output_shapes = ((None, None,None,4),(None, None,None,4),(None, None,None,1),(None, None,None,4)))
#(inputImages, outputImages)
trainDataset=trainDataset.shuffle(5).batch(batchSize).prefetch(tf.data.experimental.AUTOTUNE)


testDataset=tf.data.Dataset.from_generator(DatasetGenerater,
                               args=[gifPathTest], output_types=(tf.float32,tf.float32,tf.float32,tf.float32),
                               output_shapes = ((None, None,None,4),(None, None,None,4),(None, None,None,1),(None, None,None,4)))
#(inputImages, outputImages)
testDataset=testDataset.shuffle(5).batch(batchSize).prefetch(tf.data.experimental.AUTOTUNE)


unet 모델 생성

In [None]:

def SeperableConv(filter, input):
    depthwise=tf.keras.layers.Conv3D(input.shape[-1],3,padding="same",groups=input.shape[-1])(input)
    pointwise=tf.keras.layers.Conv3D(filter,1,padding="same")(depthwise)
    return pointwise
    
def Block(filter,input):
    #conv1=tf.keras.layers.Conv3D(filter,3,padding="same")(input)
    conv1=SeperableConv(filter,input)
    layerNorm1=tf.keras.layers.LayerNormalization()(conv1)
    swishAct1=tf.keras.layers.Activation("swish")(layerNorm1)

    #conv2=tf.keras.layers.Conv3D(filter,3,padding="same")(swishAct1)
    conv2=SeperableConv(filter,swishAct1)
    layerNorm2=tf.keras.layers.LayerNormalization()(conv2)
    swishAct2=tf.keras.layers.Activation("swish")(layerNorm2)
    
    return swishAct2


def UnetModel(inputShape=(None, None, None, 4)):
    
    inputImage=tf.keras.Input(shape=inputShape)
    noisyImage=tf.keras.Input(shape=inputShape)
    step=tf.keras.Input(shape=(inputShape[0],inputShape[1],inputShape[2],1))
    input=tf.keras.layers.Concatenate()([inputImage,noisyImage,step])
    
    #인코딩
    e1=Block(32,input)
    e1Pooling=tf.keras.layers.MaxPooling3D(pool_size=(1, 2, 2))(e1)
    
    e2=Block(64,e1Pooling)
    e2Pooling=tf.keras.layers.MaxPooling3D(pool_size=(1, 2, 2))(e2)
    
    e3=Block(128,e2Pooling)
    e3Pooling=tf.keras.layers.MaxPooling3D(pool_size=(1, 2, 2))(e3)
    
    e4=Block(256,e3Pooling)
    e4Pooling=tf.keras.layers.MaxPooling3D(pool_size=(1, 2, 2))(e4)
    
    e5=Block(512,e4Pooling)
    e5Pooling=tf.keras.layers.MaxPooling3D(pool_size=(1, 2, 2))(e5)
    
    #중간
    bottleNeck=Block(1024,e5Pooling)

    d5UpSampling=tf.keras.layers.UpSampling3D(size=(1,2,2))(bottleNeck)
    d5Transpose=SeperableConv(512,d5UpSampling)
    d5Concatenate=tf.keras.layers.Concatenate()([d5Transpose,e5])
    d5=Block(512,d5Concatenate)
    
    d4UpSampling=tf.keras.layers.UpSampling3D(size=(1,2,2))(d5)
    d4Transpose=SeperableConv(256,d4UpSampling)
    d4Concatenate=tf.keras.layers.Concatenate()([d4Transpose,e4])
    d4=Block(256,d4Concatenate)
    
    d3UpSampling=tf.keras.layers.UpSampling3D(size=(1,2,2))(d4)
    d3Transpose=SeperableConv(128,d3UpSampling)
    d3Concatenate=tf.keras.layers.Concatenate()([d3Transpose,e3])
    d3=Block(128,d3Concatenate)
    
    d2UpSampling=tf.keras.layers.UpSampling3D(size=(1,2,2))(d3)
    d2Transpose=SeperableConv(64,d2UpSampling)
    d2Concatenate=tf.keras.layers.Concatenate()([d2Transpose,e2])
    d2=Block(64,d2Concatenate)
    
    d1UpSampling=tf.keras.layers.UpSampling3D(size=(1,2,2))(d2)
    d1Transpose=SeperableConv(32,d1UpSampling)
    d1Concatenate=tf.keras.layers.Concatenate()([d1Transpose,e1])
    d1=Block(32,d1Concatenate)
    
    outputImage=SeperableConv(4,d1)
    
    return tf.keras.Model([inputImage,noisyImage,step],outputImage)
UnetModel().summary()

디퓨전

In [None]:

class DiffusionModel(tf.keras.Model):
    def __init__(self,network:tf.keras.Model,batchSize=1):
        super().__init__()
        self.network= network
        self.batchSize=batchSize
    
    def train_step(self, images):

        inputImages, noises, step, outputImages=images
        diffusionTime = tf.random.uniform(
            shape=(self.batchSize,1, 1, 1, 1), minval=0.0, maxval=1.0
        )
        sigRate, noiseRate= self.DiffusionSchedule(diffusionTime)
        noisyImage = sigRate * outputImages + noiseRate * noises
        step=step*sigRate
        with tf.GradientTape() as tape:
            predNoises=self.network([inputImages,noisyImage,step],training=True)
            loss=self.compiled_loss(noises,predNoises)
        gradients = tape.gradient(loss, self.network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))

        return {m.name: m.result() for m in self.metrics}
    
    def test_step(self, images):
        inputImages, noises, step, outputImages=images
        diffusionTime = tf.random.uniform(
            shape=(self.batchSize,1, 1, 1, 1), minval=0.0, maxval=1.0
        )
        sigRate, noiseRate= self.DiffusionSchedule(diffusionTime)
        noisyImage = sigRate * outputImages + noiseRate * noises
        step=step*sigRate
        predNoises=self.network([inputImages,noisyImage,step],training=False)
        loss=self.compiled_loss(noises,predNoises)
        
        
        return {m.name: m.result() for m in self.metrics}
    
    def DiffusionSchedule(self, diffusionTime):
        #각도가 줄어들음 (y축이 노이즈, x축이 시그널)
        startAng=tf.acos(0.99) #  라디안 단위(180/파이), 약 0, 시그널 최대
        endAng=tf.acos(0.01) # 약 90도, 노이즈 최대
        
        diffusionAng=startAng+diffusionTime*(endAng-startAng) #DFT가 1에 가까울수록 노이즈(1에서 시작)
        sigRate=tf.cos(diffusionAng) # DFT가 1에 가까울수록 0.01
        noiseRate=tf.sin(diffusionAng) # DFT가 1에 가까울수록 0.99
        
        return sigRate, noiseRate
    

    def Generator(self,inputImage:np.ndarray,diffusionStep=20):
        #프레임, 높이, 넓이, 채널 형태의 이미지가 필요
        if inputImage.shape[0]<2:
            print("이미지 갯수가 부족합니다")
            return 
        inputImage=np.expand_dims(inputImage,axis=0)
        stepSize=1.0 / diffusionStep

        noisyImage=np.random.rand(1,inputImage.shape[0],inputImage.shape[1],inputImage.shape[2],inputImage.shape[3])
        ones=np.ones((1,inputImage.shape[0],inputImage.shape[1],inputImage.shape[2],1))
        for step in range(diffusionStep):
            diffusionTime= 1.0 - step * stepSize
            sigRate, noiseRate=self.DiffusionSchedule(diffusionTime)
            sigRate=sigRates.numpy()
            noiseRate=noiseRate.numpy()
            step=sigRate*ones
            predNoise=self.network.predict([inputImage, noisyImage, step])
            predImage=(noisyImage-noiseRate*predNoise)/sigRates
            
            sigRates, noiseRate=self.DiffusionSchedule(diffusionTime-stepSize)
            sigRates=sigRates.numpy()
            noiseRate=noiseRate.numpy()
            noisyImage=sigRates*predImage+noiseRate*predNoise
            
        
        return np.clip(predImage[0], 0, 1)



In [None]:
#모델생성
network=UnetModel(inputShape=(None,None,None,4))
model=DiffusionModel(network=network)
#콜백생성
cpCallback = tf.keras.callbacks.ModelCheckpoint(
    filepath=pathSave, 
    verbose=1, 
    save_weights_only=True)
esCallback=tf.keras.callbacks.EarlyStopping(patience=5)
rlrCallback=tf.keras.callbacks.ReduceLROnPlateau(
    factor=0.1,
    patience=3,
    min_lr=1e-5)

#컴파일
model.compile(
    tf.keras.optimizers.experimental.AdamW(
        learning_rate=1e-2, weight_decay=1e-4
    ),
    loss=tf.keras.losses.mean_squared_error,
              metrics=['accuracy'])



In [None]:
model.fit(
    trainDataset,
    epochs=20,
    batch_size=1,
    validation_data=testDataset,
    validation_batch_size=1,
    callbacks=[
        cpCallback,
        esCallback,
        rlrCallback
    ]
)

In [None]:
#생성
network.load_weights("model path")#모델로드
model=DiffusionModel(network=network)#모델장착

randomVideo=np.random.rand(6,128,128,4)#이미지를 생성
output=model.Generator(randomVideo)#생성
SaveGif("gif path",output)