In [51]:
import torch
from PIL import Image
import os
import torchvision.transforms as transforms
import torch
from torch.utils.data import DataLoader
from sentence_transformers import SentenceTransformer

In [52]:
# Path to the directory containing the images
dir_path = './frames'

# Loop through all the files in the directory
for path in os.listdir(dir_path):
    for filename in os.listdir(f'{dir_path}/{path}'):
        # Check if the file is an image
        if filename.endswith('.jpg') or filename.endswith('.png'):
            img_path = f'{dir_path}/{path}/{filename}'
            
            # Open the image and resize it to 128 x 128
            with Image.open(img_path) as img:
                if img.size != (128, 128):
                    img = img.resize((128, 128))
                    img.save(img_path)
                # make sure the image is RGB
                if img.mode != 'RGB':
                    img = img.convert('RGB')
                    img.save(img_path)

In [53]:
# create a dataset to load the images and labels
# the subdirectory names are the labels
# for each image, add a random amount of Gaussian noise 10 times, selecting from a range of 0 to 100 time steps
# add each noisy image to the dataset with the label of the original image and the noise
# Path: diffusionmodel.ipynb

import torch
from torch.utils.data import Dataset
from PIL import Image
import os
import numpy as np

class DiffusionDataset(Dataset):
    def __init__(self, dir_path, transform=None):
        self.dir_path = dir_path
        self.transform = transform
        self.labels = []
        self.noisy_images = []
        self.noises = []
        self.timesteps = []
        
        # Loop through all the files in the directory
        for path in os.listdir(dir_path):
            for filename in os.listdir(f'{dir_path}/{path}'):
                # Check if the file is an image
                if filename.endswith('.jpg') or filename.endswith('.png'):
                    img_path = f'{dir_path}/{path}/{filename}'
                    
                    # Open the image and resize it to 128 x 128
                    with Image.open(img_path) as img:
                        if self.transform:
                            img = self.transform(img)
                    
                    # add 1 noisy images to the dataset
                    for _ in range(1):
                        # add a random amount of Gaussian noise
                        r = np.random.randint(0, 100)
                        noise = torch.randn(img.shape) * r
                        noisy_img = img + noise
                        self.labels.append(path)
                        self.noises.append(noise)
                        self.noisy_images.append(noisy_img)
                        self.timesteps.append(r)
                        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.labels[idx], self.noisy_images[idx], self.noises[idx], self.timesteps[idx]

In [54]:
# create a dataset
ds = DiffusionDataset('./frames', transform=transforms.ToTensor())

In [55]:
# create a dataloader to load the dataset
dl = DataLoader(ds, batch_size=32, shuffle=True)

In [61]:
# create a model to guess the noise added to the image
import torch
import torch.nn as nn
import torch.nn.functional as F

class DiffusionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(128 * 128 * 3 + 768 + 1, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 128 * 128 * 3)
        
    def forward(self, img, sentence, timestep):
        # use the image, sentence, and timestep to predict the noise
        img = img.view(img.size(0), -1)
        sentence = sentence.view(sentence.size(0), -1)
        timestep = timestep.view(timestep.size(0), -1)
        x = torch.cat((img, sentence, timestep), dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        
        # reshape the noise to 3x128x128
        x = x.view(x.size(0), 3, 128, 128)
        return x

In [62]:
embedding_model = SentenceTransformer('princeton-nlp/sup-simcse-bert-base-uncased')

No sentence-transformers model found with name C:\Users\david/.cache\torch\sentence_transformers\princeton-nlp_sup-simcse-bert-base-uncased. Creating a new one with MEAN pooling.


In [69]:
print(len(dl))

857


In [72]:
# create a model
model = DiffusionModel()

# create a loss function
criterion = nn.MSELoss()

# create an optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(1):
    for i, (labels, images, noises, timesteps) in enumerate(dl):
        # get the sentence embeddings
        sentences = [label for label in labels]
        embeddings = embedding_model.encode(sentences)
        zero_embeddings = torch.zeros(embeddings.shape)
        
        # get the predicted noise
        predicted_noises = model(images, torch.tensor(embeddings), timesteps)
        # calculate the loss
        loss = criterion(predicted_noises, noises)
        # backpropagate the loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # get the predicted noise
        predicted_noises = model(images, torch.tensor(zero_embeddings), timesteps)
        # calculate the loss
        loss = criterion(predicted_noises, noises)
        # backpropagate the loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # print the loss every 10 batches
        if i % 50 == 0:
            print(f'Epoch {epoch}, Batch {i}, Loss {loss.item()}')

  predicted_noises = model(images, torch.tensor(zero_embeddings), timesteps)


Epoch 0, Batch 0, Loss 3405.795654296875
Epoch 0, Batch 50, Loss 3827.010009765625
Epoch 0, Batch 100, Loss 3254.554443359375
Epoch 0, Batch 150, Loss 3499.979736328125
Epoch 0, Batch 200, Loss 2647.005126953125
Epoch 0, Batch 250, Loss 3147.673583984375
Epoch 0, Batch 300, Loss 3179.4765625
Epoch 0, Batch 350, Loss 2947.805419921875
Epoch 0, Batch 400, Loss 3676.226318359375
Epoch 0, Batch 450, Loss 2872.381103515625
Epoch 0, Batch 500, Loss 1993.609375
Epoch 0, Batch 550, Loss 3204.145751953125
Epoch 0, Batch 600, Loss 3143.997314453125
Epoch 0, Batch 650, Loss 2125.020751953125
Epoch 0, Batch 700, Loss 3083.550537109375
Epoch 0, Batch 750, Loss 3410.318359375
Epoch 0, Batch 800, Loss 3258.860107421875
Epoch 0, Batch 850, Loss 3231.862548828125


In [77]:
# generate random noise
noise = torch.randn(3, 128, 128)

# get the sentence embeddings for an input sentence
sentence = input('Enter a sentence: ')
embedding = embedding_model.encode([sentence])

for i in range(100):
    # the timestep is 100 - i
    t = 100 - i
    
    # get the predicted noise
    predicted_noise = model(noise.unsqueeze(0), torch.tensor(embedding), torch.tensor([t]))[0]
    
    # remove the predicted noise from the image
    noise -= predicted_noise
    
# show noise as an image
noise_img = transforms.ToPILImage()(noise)
noise_img.show()

AttributeError: 'Tensor' object has no attribute '__array_interface__'