In [1]:
import torch
from torchvision import transforms
from datasets import load_dataset
from torch.utils.data import DataLoader
from PIL import Image
from torch.cuda.amp import autocast, GradScaler
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler, DiffusionPipeline, AutoencoderKL
from diffusers.utils import make_image_grid
from diffusers.optimization import get_scheduler

|특성|StableDiffusionPipeline|DiffusionPipeLine|
|---|---|---|
|설계 목적|Stable Diffusion에 특화된 작업 수행|범용 Diffusion 모델 관리 및 커스터마이징|
|구성 요소|Stable Diffusion에 필요한 모든 구성요소 포함|사용자가 커스터마이징 가능|
|사용 편의성|간단하게 텍스트-이미지 생성가능|직접 구성 요소를 설정해야 할 수도 있음|
|적용 가능 모델|Stable Diffusion모델|다양한 Diffusion 모델|
|커스터마이징|제한적(Stable Diffusion 구조 내에서만 가능)|완전히 사용자 정의 가능|

모델 사전 다운 -> huggingface 로그인해서 해도 됌
https://huggingface.co/stable-diffusion-v1-5/stable-diffusion-v1-5

In [2]:
#!git lfs install
#!git clone https://huggingface.co/stable-diffusion-v1-5/stable-diffusion-v1-5

#!huggingface-cli login # 로그인 필요 없음: 공개 모델 사용 시 생략 가능

In [3]:
print(torch.__version__)  # PyTorch 버전 확인
print(torch.cuda.is_available())  # CUDA 사용 가능 여부 확인

2.3.1
True


In [4]:
# 이미지 전처리 데이터 증강 데이터 많으면 필요없음
"""import os
import cv2
import numpy as np
from torchvision import transforms
from PIL import Image

def prepare_dataset(input_dir, output_dir, augment=False):
    '''
    Prepare dataset with optional augmentation.
    '''
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    for label in ["Winding", "Good"]:
            class_dir = os.path.join(input_dir, label)
            output_class_dir = os.path.join(output_dir, label)
            if not os.path.exists(output_class_dir):
                os.makedirs(output_class_dir)

            for file_name in os.listdir(class_dir):
                # Ensure the file is an image
                if not file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
                    print(f"Skipping non-image file: {file_name}")
                    continue

                file_path = os.path.join(class_dir, file_name)
                img = cv2.imread(file_path)

                if img is None:
                    print(f"Failed to load image: {file_path}")
                    continue

                # Save original image
                output_path = os.path.join(output_class_dir, file_name.replace(".bmp", ".jpg"))
                #output_path = os.path.join(output_class_dir, file_name)
                cv2.imwrite(output_path, img)

                if augment:
                    # Example augmentations: flip, rotate, noise
                    flipped = cv2.flip(img, 1)
                    rotated = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
                    noisy = cv2.add(img, np.random.normal(0, 25, img.shape).astype(np.uint8))
                    
                    # Save augmented images
                    #cv2.imwrite(os.path.join(output_class_dir, f"flipped_{file_name}"), flipped)
                    #cv2.imwrite(os.path.join(output_class_dir, f"rotated_{file_name}"), rotated)
                    #cv2.imwrite(os.path.join(output_class_dir, f"noisy_{file_name}"), noisy)
                    cv2.imwrite(os.path.join(output_class_dir, f"flipped_{file_name.replace('.bmp', '.jpg')}"), flipped, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
                    cv2.imwrite(os.path.join(output_class_dir, f"rotated_{file_name.replace('.bmp', '.jpg')}"), rotated, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
                    cv2.imwrite(os.path.join(output_class_dir, f"noisy_{file_name.replace('.bmp', '.jpg')}"), noisy, [int(cv2.IMWRITE_JPEG_QUALITY), 90])


# Prepare dataset
prepare_dataset("D:/Data", "D:/edu/output", augment=True)
"""

'import os\nimport cv2\nimport numpy as np\nfrom torchvision import transforms\nfrom PIL import Image\n\ndef prepare_dataset(input_dir, output_dir, augment=False):\n    \'\'\'\n    Prepare dataset with optional augmentation.\n    \'\'\'\n    if not os.path.exists(output_dir):\n        os.makedirs(output_dir)\n    \n    for label in ["Winding", "Good"]:\n            class_dir = os.path.join(input_dir, label)\n            output_class_dir = os.path.join(output_dir, label)\n            if not os.path.exists(output_class_dir):\n                os.makedirs(output_class_dir)\n\n            for file_name in os.listdir(class_dir):\n                # Ensure the file is an image\n                if not file_name.lower().endswith((\'.png\', \'.jpg\', \'.jpeg\', \'.bmp\')):\n                    print(f"Skipping non-image file: {file_name}")\n                    continue\n\n                file_path = os.path.join(class_dir, file_name)\n                img = cv2.imread(file_path)\n\n               

In [5]:
# 모델 파이프라인 로드
pipe = StableDiffusionPipeline.from_pretrained(
    "model/stable-diffusion-v1-5",
    #revision="fp16",
    torch_dtype=torch.float16
).to("cuda")

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

In [6]:

# 데이터셋 로드
#dataset = load_dataset("imagefolder", data_dir="D:/edu/output", streaming=True)
dataset = load_dataset("imagefolder", data_dir="D:/edu/output")
expected_size = pipe.unet.config.sample_size
# 이미지 변환 정의
transform = transforms.Compose([
    transforms.Resize((expected_size, expected_size)),  # 이미지 크기 조정 -> 그래픽 카드 사양에 따라 조정 높을 수록 정밀도 올라감
    transforms.ToTensor(),          # 텐서로 변환
    transforms.Normalize([0.5], [0.5])  # 정규화
])

# 데이터 전처리 함수
def preprocess_dataset(example):
    """
    데이터셋의 이미지를 변환하는 함수.
    """
    if isinstance(example["image"], list):  # 이미지가 리스트로 제공되는 경우
        # 리스트에 포함된 각 이미지에 대해 변환을 적용
        transformed_images = [transform(image) for image in example["image"]]
        example["image"] = torch.stack(transformed_images)  # 변환된 이미지들을 스택으로 쌓음
    else:  # 이미지가 PIL.Image 형식으로 제공되는 경우
        example["image"] = transform(example["image"])  # transform 적용

    return example


# map 메서드로 변환 적용
#dataset = dataset.map(preprocess_dataset, batched=True, batch_size= 4) # 배치사이즈가 낮을 수록 메모리 사용 적음음
dataset = dataset.map(preprocess_dataset, batched=False) # 배치사이즈가 낮을 수록 메모리 사용 적음음

Resolving data files:   0%|          | 0/4492 [00:00<?, ?it/s]

In [None]:
# collate_fn 함수: 배치 처리
def collate_fn(batch):
    # Ensure images are tensors
    images = torch.stack([torch.tensor(item["image"]).float() if not isinstance(item["image"], torch.Tensor) else item["image"] for item in batch])
    labels = torch.tensor([item["label"] for item in batch])
    return {"image": images, "label": labels}

In [10]:
dataloader = DataLoader(dataset["train"], batch_size=4, collate_fn=collate_fn, shuffle=True)

In [74]:
def fine_tune_model(pipe, dataloader, epochs=5, learning_rate=5e-5):
    
    prediction_type = None # 'epsilon'  or 'v_prediction'
    train_losses = []
    
    pipe.unet.train()  # UNet 학습 모드로 설정
    #optimizer = torch.optim.AdamW(pipe.unet.parameters(), lr=learning_rate)
    optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, pipe.unet.parameters()), lr=learning_rate)
    
    scaler = GradScaler()  # AMP용 GradScaler 생성

    # 학습률 스케줄러 (옵션, 필요 시 사용)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

    for epoch in range(epochs):
        total_loss = 0.0
        for batch in dataloader:
            images = batch["image"].to("cuda").half()  # .half()를 사용하여 float16으로 변환
            labels = batch["label"]  # 라벨 (필요에 따라 사용)

            # VAE를 통해 latent 공간으로 매핑
            #latents = pipe.vae.encode(images).latent_dist.sample() * 0.18215  # Scale factor
            latents = pipe.vae.encode(images).latent_dist.sample() * pipe.vae.config.scaling_factor  # Scale factor

            # 노이즈 추가
            noise = torch.randn_like(latents)
            timesteps = torch.randint(0, pipe.scheduler.config.num_train_timesteps, (latents.shape[0],), device=latents.device).long()
            noisy_latents = pipe.scheduler.add_noise(latents, noise, timesteps)

            # Get the text embedding for conditioning, [0]: last_hidden_state
            encoder_hidden_states = pipe.text_encoder(batch["input_ids"].to('cuda') )[0]

            # 노이즈 스케쥴러에 예측 타입이 노이즈인지 이미지인지에 따라 타겟 설정
            if prediction_type is not None:
                # set prediction_type of scheduler if defined
                pipe.noise_scheduler.register_to_config(prediction_type=prediction_type)

            if pipe.noise_scheduler.config.prediction_type == "epsilon":
                target = noise
            elif pipe.noise_scheduler.config.prediction_type == "v_prediction":
                target = pipe.noise_scheduler.get_velocity(latents, noise, timesteps)
            else:
                raise ValueError(f"Unknown prediction type {pipe.noise_scheduler.config.prediction_type}")

            # Predict the noise residual and compute loss
            model_pred = pipe.unet(noisy_latents, timesteps, encoder_hidden_states).sample

            loss = torch.nn.functional.mse_loss(model_pred.float(), target.float(), reduction="mean")


            with torch.no_grad():
                train_losses.append(loss.item())
            '''
            noisy_latents_resized = torch.nn.functional.interpolate(noisy_latents, size=(128, 128), mode='bilinear', align_corners=False)

            optimizer.zero_grad()

            with autocast():  # 자동 혼합 정밀도 사용
                noise_pred = pipe.unet(noisy_latents, timesteps, None).sample
                loss = torch.nn.functional.mse_loss(noise_pred, noise)

            # loss.backward() 시 AMP 스케일러 사용
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            # 메모리 비우기 (필요한 경우)
            torch.cuda.empty_cache()

            total_loss += loss.item()
            '''

        # loss.backward()
        # optimizer.step()
        # lr_scheduler.step()
        # optimizer.zero_grad()

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        get_scheduler('linear', optimizer= optimizer, num_warmup_steps=500, num_training_steps=5 * len(dataloader)).step()
        optimizer.zero_grad()
           
        '''
        # Epoch마다 출력
        print(f"Epoch {epoch + 1}/{epochs} - Avg Loss: {total_loss / len(dataloader)}")
        '''
        avg_loss = sum(train_losses[-100:])/100
        print(f'Finished epoch {epoch+1}. Average of the last 100 loss values: {avg_loss:05f}')

        # 학습률 스케줄러 업데이트 (옵션, 필요 시 사용)
        scheduler.step()

    # 학습된 가중치 저장
    pipe.unet.save_pretrained("fine_tuned_model/unet")
    pipe.vae.save_pretrained("fine_tuned_model/vae")
    pipe.text_encoder.save_pretrained("fine_tuned_model/text_encoder")


In [8]:
#from huggingface_hub import notebook_login
#notebook_login()
#!huggingface-cli login  # 로그인
#!git lfs install        # Git LFS 설치 확인

In [75]:
# 모델 훈련
torch.autograd.set_detect_anomaly(True)
fine_tune_model(pipe, dataloader)

# Fine-tuned 모델 저장
pipe.save_pretrained("fine_tuned_model")


RuntimeError: Given groups=1, weight of size [320, 4, 3, 3], expected input[4, 768, 8, 8] to have 4 channels, but got 768 channels instead

In [None]:
# 사전 학습한 모델 추가 default : stable-diffusion v1.5
model_id = "stable-diffusion-v1-5/stable-diffusion-v1-5"
pipeline = DiffusionPipeline.from_pretrained(model_id, use_safetensors=True).to("cuda")

In [None]:
prompt = "Make NG"

In [None]:
# prompt를 통해 이미지 1장 생성
generator = torch.Generator("cuda").manual_seed(0)
image = pipeline(prompt, generator=generator).images[0]
image

In [None]:
pipeline = DiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16, use_safetensors=True)
pipeline = pipeline.to("cuda")
image = pipeline(prompt, generator=generator).images[0]
image

In [None]:
pipeline.scheduler = DPMSolverMultistepScheduler.from_config(pipeline.scheduler.config)

In [None]:
image = pipeline(prompt, generator=generator, num_inference_steps=20).images[0]
image

In [None]:
def get_inputs(batch_size=1):
    generator = [torch.Generator("cuda").manual_seed(i) for i in range(batch_size)]
    prompts = batch_size * [prompt]
    num_inference_steps = 20

    return {"prompt": prompts, "generator": generator, "num_inference_steps": num_inference_steps}

In [None]:
images = pipeline(**get_inputs(batch_size=4)).images
make_image_grid(images, 2, 2)

In [None]:
pipeline.enable_attention_slicing()

In [None]:
images = pipeline(**get_inputs(batch_size=8)).images
make_image_grid(images, rows=2, cols=4)

In [None]:
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=torch.float16).to("cuda")
pipeline.vae = vae
images = pipeline(**get_inputs(batch_size=8)).images
make_image_grid(images, rows=2, cols=4)

In [None]:
# promt 작성
prompt += ", tribal panther make up, blue on red, side profile, looking away, serious eyes"
prompt += " 50mm portrait photography, hard rim lighting photography--beta --ar 2:3  --beta --upbeta"

In [None]:
images = pipeline(**get_inputs(batch_size=8)).images
make_image_grid(images, rows=2, cols=4)

In [None]:
# 예시 프롬프트를 참고하여 작성
prompts = [
    "portrait photo of the oldest warrior chief, tribal panther make up, blue on red, side profile, looking away, serious eyes 50mm portrait photography, hard rim lighting photography--beta --ar 2:3  --beta --upbeta",
    "portrait photo of an old warrior chief, tribal panther make up, blue on red, side profile, looking away, serious eyes 50mm portrait photography, hard rim lighting photography--beta --ar 2:3  --beta --upbeta",
    "portrait photo of a warrior chief, tribal panther make up, blue on red, side profile, looking away, serious eyes 50mm portrait photography, hard rim lighting photography--beta --ar 2:3  --beta --upbeta",
    "portrait photo of a young warrior chief, tribal panther make up, blue on red, side profile, looking away, serious eyes 50mm portrait photography, hard rim lighting photography--beta --ar 2:3  --beta --upbeta",
]

generator = [torch.Generator("cuda").manual_seed(1) for _ in range(len(prompts))]
images = pipeline(prompt=prompts, generator=generator, num_inference_steps=25).images
make_image_grid(images, 2, 2)