In [20]:
import torch 
import os 
from PIL import Image
from transformers import CLIPProcessor, AutoProcessor, AutoTokenizer

class MultiInputImageDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform=None):
        self.samples = sorted(os.listdir(root_dir))
        self.root_dir = root_dir
        self.transform = transform
        self.processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32")


    def __getitem__(self, idx):

        # folder = os.path.join(self.root_dir, self.samples[idx])
        folder = os.path.join(self.root_dir, "sample1")
        inputs = [Image.open(os.path.join(folder, f"image-{i+1}.jpeg")) for i in range(10)]
        target = Image.open(os.path.join(self.root_dir, "target.jpg"))

        # inputs = [self.processor(images=img, return_tensors="pt") for img in inputs]
        all_inputs = []
        for img in inputs:
            processed_img = self.processor(images=img, return_tensors="pt")
            all_inputs.append(processed_img.pixel_values)

    
        target = self.processor(images = target)
        target = target.pixel_values

        stacked_input = torch.cat(all_inputs, dim=0)  # Shape: [30, H, W] if RGB
        return stacked_input, target

    def __len__(self):
        return len(self.samples)


In [2]:
from config import Configuration as hypm

for k, v in hypm.__dict__.items():
    if not k.startswith("__") and not callable(v):
        print(f"{k}: {v}")


img_encoder: openai/clip-vit-base-patch32
sd_pipeline: CompVis/stable-diffusion-v1-4
save_model: False
save_model_dir: /home/fahimul/Documents/Research/MIDuff/trained_pipeline
dataset_path: /home/fahimul/Documents/Research/Dataset/University-1651
num_of_encoder_img: 10
batch: 1
device: cuda
cuda_set_device: 0
exp_id: -1
save_inference_dir: /home/fahimul/Documents/Research/MIDuff/output
num_inference_steps: 1000
infer_height: 224
infer_width: 224
epochs: 80
lr: 1e-05
noise_time_step: 1000


In [21]:
from torchvision import transforms
image_size = 224
transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),  # Converts to [0,1]
])

In [15]:
from torch.utils.data import DataLoader
data_path = '/home/fahimul/Documents/Research/MIDuff/dataset/drone/' #don't include the / at the end
allData = MultiInputImageDataset(root_dir=data_path)
test_loader = DataLoader(allData, batch_size=5, shuffle=False)

In [18]:
for x, y in test_loader:
    print(x.shape)

torch.Size([2, 10, 3, 224, 224])


# START FROM HERE

In [4]:
import torch
from torch.utils.data import DataLoader
from Uni_dataset import University1652Dataset
dataset_dir = "/home/fahimul/Documents/Research/Dataset/University-1651"

device = "cuda" if torch.cuda.is_available() else "cpu"

dataset = University1652Dataset(root_dir=dataset_dir, mode='train', num_input=10)
train_loader = DataLoader(dataset, batch_size=8, shuffle=False, num_workers=4)

# test_dataset = University1652Dataset(root_dir=dataset_dir, mode='test', num_input=10)
# test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=4)

In [4]:
# for x, y, i in test_loader:
#     print(y)

In [5]:
from transformers import CLIPModel, CLIPProcessor

clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").vision_model
clip_model.eval()
print("1")

1


In [6]:
from diffusers import StableDiffusionPipeline, UNet2DConditionModel
pipe = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4")
unet = pipe.unet
vae = pipe.vae

unet = unet.to(device)
vae = vae.to(device)
clip_model = clip_model.to(device)


  deprecate("Transformer2DModelOutput", "1.0.0", deprecation_message)


Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

In [8]:
def modify_unet(unet):
    for block in unet.down_blocks + unet.up_blocks:
        for attn in block.attentions:
            attn.encoder_hidden_states_dim = 512  # Match CLIP embedding size
    return unet


In [None]:
import torch.nn.functional as F
from tqdm import tqdm


optimizer = torch.optim.AdamW(unet.parameters(), lr=1e-5)
epochs = 2
for i in tqdm(range(epochs)):
    for input_stack, target_image, folder_id in train_loader:
        # Encode 10 images

        inputs = input_stack.chunk(10, dim=1)  # [B, 3, H, W] x 10
        target_image = target_image[0].to(device)
    

        # embeddings = [clip_model(pixel_values=img).last_hidden_state.mean(dim=1) for img in inputs]
        embeddings = []
        for img in inputs:
            img = img.to(device)
            img = torch.squeeze(img)
            embed = clip_model(pixel_values=img).last_hidden_state
            embeddings.append(embed)


        fused_embedding = torch.mean(torch.stack(embeddings), dim=0)
        # VAE encode target image
        target_latents = vae.encode(target_image).latent_dist.sample() * 0.18215

        # Add noise
        noise = torch.randn_like(target_latents)
        timesteps = torch.randint(0, 1000, (target_latents.shape[0],)).long().to(device)

        noisy_latents = pipe.scheduler.add_noise(target_latents, noise, timesteps)
        # UNet prediction
        noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states=fused_embedding).sample

        # Loss and backward
        loss = F.mse_loss(noise_pred, noise)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()


100%|██████████| 2/2 [00:01<00:00,  1.05it/s]


In [10]:
def encode_and_fuse(images):

    with torch.no_grad():
        # features = [clip_model(pixel_values=image).last_hidden_state for image in inputs]
        images = images.to(device)
        # features = []
        embed = clip_model(pixel_values=images).last_hidden_state
        # print(embed.shape)
        # for i in range(images.shape[0]):
        #     embed = clip_model(pixel_values=images[i]).last_hidden_state
        #     print(embed)

    # fused = torch.mean(torch.stack(features), dim=0)  # [1, 512]
    return embed


In [None]:
import os
import torch
from PIL import Image
from glob import glob
from torchvision import transforms as T
from transformers import CLIPModel
from diffusers import UNet2DConditionModel, AutoencoderKL, DDIMScheduler
from Uni_dataset import University1652Dataset_test
from torch.utils.data import DataLoader
# ------------------ Configuration ------------------
device = "cuda" if torch.cuda.is_available() else "cpu"

# # Paths
# input_dir = "your_input_images"  # Directory with input_01.jpg ... input_10.jpg
# unet_path = "path_to_trained_unet"  # e.g., ./checkpoints/unet/
output_path = "output"

# Image generation settings
num_inference_steps = 50
height, width = 224, 224
# -------------------Dataset Loader-----------------
test_dataset_dir = "/home/fahimul/Documents/Research/Dataset/University-1651"
test_dataset = University1652Dataset_test(root_dir=test_dataset_dir, num_input=10)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=4)
# ------------------ Load Models ------------------
print("Loading models...")
# unet = UNet2DConditionModel.from_pretrained(unet_path).to(device)
# vae = AutoencoderKL.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="vae").to(device)
scheduler = DDIMScheduler.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="scheduler")
# clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").vision_model.eval().to(device)

# ------------------ Preprocessing ------------------
# clip_transform = T.Compose([
#     T.Resize((224, 224)),
#     T.ToTensor(),
#     T.Normalize([0.4815, 0.4578, 0.4082], [0.2686, 0.2613, 0.2758])  # CLIP normalization
# ])

# def encode_and_fuse(images):
#     with torch.no_grad():
#         inputs = torch.stack([clip_transform(img) for img in images]).to(device)  # [10, 3, 224, 224]
#         features = [clip_model(pixel_values=img.unsqueeze(0)).last_hidden_state.mean(dim=1) for img in inputs]
#         fused = torch.mean(torch.stack(features), dim=0)  # [1, 512]
#     return fused

# ------------------ Sampling Function ------------------
@torch.no_grad()
def generate_image(fused_embedding, steps=50, height=512, width=512):
    latent = torch.randn((2, 4, height // 8, width // 8)).to(device)
    scheduler.set_timesteps(steps)

    for t in scheduler.timesteps:
        latent_input = scheduler.scale_model_input(latent, t)

        noise_pred = unet(latent_input, t, encoder_hidden_states=fused_embedding).sample
        latent = scheduler.step(noise_pred, t, latent).prev_sample

    decoded = vae.decode(latent / 0.18215).sample
    decoded = (decoded / 2 + 0.5).clamp(0, 1)
    decoded = decoded[0]
    return T.ToPILImage()(decoded.squeeze().cpu())

# ------------------ Inference Entry Point ------------------
def main():

    # data_path = '/home/fahimul/Documents/Research/MIDuff/dataset/drone/' #don't include the / at the end
    # allData = MultiInputImageDataset(root_dir=data_path)
    # img_x, img_y = allData[0]


    for input_stack, target_image, folder_id in test_loader:
        # Encode 10 images
        embeddings = []

        inputs = input_stack.chunk(10, dim=1)  # [B, 3, H, W] x 10
        target_image = target_image[0].to(device)
    

        # embeddings = [clip_model(pixel_values=img).last_hidden_state.mean(dim=1) for img in inputs]
        
        print(f"Encoding and fusing input images of ID: {folder_id}")
        for img in inputs:
            img = img.to(device)
            img = torch.squeeze(img)
            embed = clip_model(pixel_values=img).last_hidden_state
            embeddings.append(embed)

        fused_cond = torch.mean(torch.stack(embeddings), dim=0)

        print(f"Generating image of ID: {folder_id}")
        result = generate_image(fused_cond, steps=num_inference_steps, height=height, width=width)

        # fused_cond = encode_and_fuse(img_x)

        os.makedirs(os.path.dirname(f'{output_path}/{folder_id}.png'), exist_ok=True)
        result.save(f'{output_path}/{folder_id}.png')
        print(f"Saved output of: {folder_id}")

if __name__ == "__main__":
    main()


Loading models...
Encoding and fusing input images of ID: ('0000', '0001', '0003', '0004', '0005')
Generating image of ID: ('0000', '0001', '0003', '0004', '0005')


RuntimeError: shape '[5, -1, 8, 40]' is invalid for input of size 501760

In [22]:
import os
import torch
from PIL import Image
from glob import glob
from torchvision import transforms as T
from transformers import CLIPModel
from diffusers import UNet2DConditionModel, AutoencoderKL, DDIMScheduler

# ------------------ Configuration ------------------
device = "cuda" if torch.cuda.is_available() else "cpu"

# Paths
input_dir = "your_input_images"  # Directory with input_01.jpg ... input_10.jpg
unet_path = "path_to_trained_unet"  # e.g., ./checkpoints/unet/
output_path = "output/generated_image.png"

# Image generation settings
num_inference_steps = 50
height, width = 224, 224
# ---------------------------------------------------


# ------------------ Load Models ------------------
print("Loading models...")
# unet = UNet2DConditionModel.from_pretrained(unet_path).to(device)
# vae = AutoencoderKL.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="vae").to(device)
scheduler = DDIMScheduler.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="scheduler")
# clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").vision_model.eval().to(device)

# ------------------ Preprocessing ------------------
# clip_transform = T.Compose([
#     T.Resize((224, 224)),
#     T.ToTensor(),
#     T.Normalize([0.4815, 0.4578, 0.4082], [0.2686, 0.2613, 0.2758])  # CLIP normalization
# ])

# def encode_and_fuse(images):
#     with torch.no_grad():
#         inputs = torch.stack([clip_transform(img) for img in images]).to(device)  # [10, 3, 224, 224]
#         features = [clip_model(pixel_values=img.unsqueeze(0)).last_hidden_state.mean(dim=1) for img in inputs]
#         fused = torch.mean(torch.stack(features), dim=0)  # [1, 512]
#     return fused

# ------------------ Sampling Function ------------------
@torch.no_grad()
def generate_image(fused_embedding, steps=50, height=512, width=512):
    latent = torch.randn((2, 4, height // 8, width // 8)).to(device)
    scheduler.set_timesteps(steps)

    for t in scheduler.timesteps:
        latent_input = scheduler.scale_model_input(latent, t)

        noise_pred = unet(latent_input, t, encoder_hidden_states=fused_embedding).sample
        latent = scheduler.step(noise_pred, t, latent).prev_sample

    decoded = vae.decode(latent / 0.18215).sample
    decoded = (decoded / 2 + 0.5).clamp(0, 1)
    decoded = decoded[0]
    return T.ToPILImage()(decoded.squeeze().cpu())

# ------------------ Inference Entry Point ------------------
def main():

    # data_path = '/home/fahimul/Documents/Research/MIDuff/dataset/drone/' #don't include the / at the end
    # allData = MultiInputImageDataset(root_dir=data_path)
    # img_x, img_y = allData[0]

    embeddings = []

    for input_stack, target_image in test_loader:
        # Encode 10 images

        inputs = input_stack.chunk(10, dim=1)  # [B, 3, H, W] x 10
        target_image = target_image[0].to(device)
    

        # embeddings = [clip_model(pixel_values=img).last_hidden_state.mean(dim=1) for img in inputs]
        for img in inputs:
            img = img.to(device)
            img = torch.squeeze(img)
            embed = clip_model(pixel_values=img).last_hidden_state
            embeddings.append(embed)

    fused_cond = torch.mean(torch.stack(embeddings), dim=0)

    print(fused_cond.shape)
    print("Encoding and fusing input images...")
    # fused_cond = encode_and_fuse(img_x)


    print("Generating image...")
    result = generate_image(fused_cond, steps=num_inference_steps, height=height, width=width)

    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    result.save(output_path)
    print(f"Saved output to: {output_path}")

if __name__ == "__main__":
    main()


Loading models...
torch.Size([2, 50, 768])
Encoding and fusing input images...
Generating image...
Saved output to: output/generated_image.png


In [None]:
# data_path = '/home/fahimul/Documents/Research/MIDuff/dataset/drone/' #don't include the / at the end
# allData = MultiInputImageDataset(root_dir=data_path)
# x, y = allData[0]
# t = encode_and_fuse(x)



torch.Size([10, 50, 768])


RuntimeError: stack expects a non-empty TensorList

In [None]:

# import os
# from PIL import Image
# from torch.utils.data import Dataset, DataLoader
# from Uni_dataset import University1652Dataset_test
# dataset_dir = "/home/fahimul/Documents/Research/Dataset/University-1651"

# dataset = University1652Dataset_test(root_dir=dataset_dir, num_input=10, num_of_test_img=30)
# loader = DataLoader(dataset, batch_size=8, shuffle=False, num_workers=4)

# for drone_img, satellite_img, folder_id in loader:
#     print("Drone:", drone_img.shape)
#     # print("Satellite:", satellite_img)
#     # print("Folder IDs:", folder_id)
#     # break

ImportError: cannot import name 'University1652Dataset_test' from 'Uni_dataset' (/data/Research/MIDuff/Uni_dataset.py)