In [2]:
import pandas as pd
import statistics
import matplotlib.pyplot as plt
import torch
import torchvision
import os
from fastai.vision.all import *
from fastai.vision.gan import *
from tqdm.auto import tqdm
from diffusers import DDPMScheduler, UNet2DModel, DDPMPipeline, DDIMScheduler,StableDiffusionPipeline,DPMSolverMultistepScheduler
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

In [3]:
df=pd.read_csv('ArtEmisv1.csv')

In [4]:
def modo(x):
    if  x.value_counts()[0]>=sum(x.value_counts())*1.0 :
        return pd.Series.mode(x)
    else:
        return x.value_counts()[:3].index.tolist()

In [5]:
def emtype(x):
    if x.emotion=='sadness':
        return "negative"
    elif x.emotion=='fear':
        return "negative"
    elif x.emotion=='disgust':
        return "negative"
    elif x.emotion=='anger':
        return "negative"
    elif x.emotion=='contentment':
        return "positive"
    elif x.emotion=='awe':
        return "positive"
    elif x.emotion=='amusement':
        return "positive"
    elif x.emotion=='excitement':
        return "positive"
    else:
        return "something else"
    

In [6]:
dfemo=df
dfemo['emotype']= dfemo.apply(emtype,axis=1)


In [None]:
dfemo=dfemo.groupby(["art_style","painting"])["emotype"].agg(modo).reset_index()


In [None]:
t=type(dfemo.emotype[0])
dfemo=dfemo[dfemo["emotype"].apply(lambda x: type(x) !=t )].reset_index()
dfemo = dfemo.drop('index', axis=1)


In [None]:
dfemo=dfemo[dfemo.emotype!="something else"].reset_index()
dfemo = dfemo.drop('index', axis=1)


In [None]:
dfemo['path']= dfemo.apply(lambda x: 'dataset\\wikiart\\'+x['art_style']+"\\"+ x['painting']+".jpg", axis=1)


In [None]:
#frac=1-(dfemo.emotype.value_counts()[1]/dfemo.emotype.value_counts()[0])

#dfemo = dfemo.drop(dfemo[dfemo['emotype'] == "positive"].sample(frac=frac).index)


In [None]:
plt.figure(figsize=(10,10))
plt.hist(dfemo.emotype,bins=range(0,3), rwidth=0.8,align="left")
plt.title('Histogram of Classes')

In [None]:
class SquarePad:
    def __call__(self, image):
        w, h = image.size
        max_wh = np.max([w, h])
        hp = int((max_wh - w) / 2)
        vp = int((max_wh - h) / 2)
        padding = (hp, vp, hp, vp)
        return torchvision.transforms.functional.pad(image, padding, 255, 'constant')

# now use it as the replacement of transforms.Pad class
transform=torchvision.transforms.Compose([
    SquarePad(),
    torchvision.transforms.Resize(256),
    torchvision.transforms.CenterCrop(256),
    torchvision.transforms.ToTensor()
])

In [None]:
ds=torchvision.datasets.ImageFolder(root="diffset",transform=transform)


In [None]:
# Feed it into a dataloader (batch size 8 here just for demo)
train_dataloader = DataLoader(ds, batch_size=16, shuffle=True)

# View some examples
x, y = next(iter(train_dataloader))
print('Input shape:', x.shape)
print('Labels:', y)
plt.imshow(torchvision.utils.make_grid(x).permute(1,2,0))


In [None]:
class ClassConditionedUnet(nn.Module):
  def __init__(self, num_classes=2, class_emb_size=2):
    super().__init__()
    
    # The embedding layer will map the class label to a vector of size class_emb_size
    self.class_emb = nn.Embedding(num_classes, class_emb_size)

    # Self.model is an unconditional UNet with extra input channels to accept the conditioning information (the class embedding)
    self.model = UNet2DModel(
        sample_size=64,           # the target image resolution
        in_channels=3 + class_emb_size, # Additional input channels for class cond.
        out_channels=3,           # the number of output channels
        layers_per_block=2,       # how many ResNet layers to use per UNet block
        block_out_channels=(128, 128, 256,256,512,512), 
        down_block_types=( 
            "DownBlock2D",
            "DownBlock2D",
            "DownBlock2D",
            "DownBlock2D",
            "AttnDownBlock2D",
            "DownBlock2D",
        ), 
        up_block_types=(
            "UpBlock2D", 
            "AttnUpBlock2D", 
            "UpBlock2D",      
            "UpBlock2D",
            "UpBlock2D", 
            "UpBlock2D",
          ),
    )

  # Our forward method now takes the class labels as an additional argument
  def forward(self, x, t, class_labels):
    # Shape of x:
    bs, ch, w, h = x.shape
    
    # class conditioning in right shape to add as additional input channels
    class_cond = self.class_emb(class_labels) # Map to embedding dinemsion
    class_cond = class_cond.view(bs, class_cond.shape[1], 1, 1).expand(bs, class_cond.shape[1], w, h)
    # x is shape (bs, 1, 28, 28) and class_cond is now (bs, 4, 28, 28)

    # Net input is now x and class cond concatenated together along dimension 1
    net_input = torch.cat((x, class_cond), 1) # (bs, 5, 28, 28)

    # Feed this to the unet alongside the timestep and return the prediction
    return self.model(net_input, t).sample # (bs, 1, 28, 28)

In [None]:
# Create a scheduler
noise_scheduler = DDPMScheduler(num_train_timesteps=1000, beta_schedule='squaredcos_cap_v2')
     

In [2]:
net=DDPMPipeline.from_pretrained("google/ddpm-bedroom-256").to("cuda")
noise_scheduler= DDIMScheduler.from_pretrained("google/ddpm-bedroom-256")
noise_scheduler.set_timesteps(num_inference_steps=500)

NameError: name 'DDPMPipeline' is not defined

In [None]:
# Redefining the dataloader to set the batch size higher than the demo of 8
train_dataloader = DataLoader(ds, batch_size=4, shuffle=True)

# How many runs through the data should we do?
n_epochs = 50

# Our network 
#net = ClassConditionedUnet().to("cuda")

# Our loss finction
loss_fn = pytorch_msssim.SSIM()

# The optimizer
opt = torch.optim.Adam(net.unet.parameters(), lr=1e-6) 

# Keeping a record of the losses for later viewing
losses = []

# The training loop
for epoch in range(n_epochs):
    for x, y in tqdm(train_dataloader):
        
        # Get some data and prepare the corrupted version
        x = x.to("cuda") *2 -1 # Data on the GPU (mapped to (-1, 1))
        y = y.to("cuda")
        noise = torch.randn_like(x)
        #timesteps = torch.randint(0, 999, (x.shape[0],)).long().to("cuda")
        timesteps=torch.randint(0, net.scheduler.num_train_timesteps, (x.shape[0],)).long().to("cuda")
        noisy_x = net.scheduler.add_noise(x, noise, timesteps)
        
        # Get the model prediction
        
        pred = net.unet(noisy_x, timesteps,y)[0] # Note that we pass in the labels y
        
        # Calculate the loss
        #loss = 1-loss_fn(pred, noise) # How close is the output to the noise
        loss=F.mse_loss(pred,noise)
        # Backprop and update the params:
        opt.zero_grad()
        loss.backward()
        opt.step()

        # Store the loss for later
        losses.append(loss.item())

    # Print our the average of the last 100 loss values to get an idea of progress:
    avg_loss = sum(losses[-100:])/100
    print(f'Finished epoch {epoch}. Average of the last 100 loss values: {avg_loss:05f}')

# View the loss curve
plt.plot(losses)

In [None]:
# Prepare random x to start from, plus some desired labels y
x = torch.randn(10, 3, 256, 256).to('cuda')
y = torch.tensor([[i]*5 for i in range(2)]).flatten().to('cuda')

# Sampling loop
for i, t in tqdm(enumerate(noise_scheduler.timesteps)):

    # Get model pred
    with torch.no_grad():
        residual = net.unet(x, t,y)["sample"]  # Again, note that we pass in our labels y

    # Update sample with step
    x = noise_scheduler.step(residual, t, x).prev_sample

# Show the results
fig, ax = plt.subplots(1, 1, figsize=(48, 48))
ax.imshow(torchvision.utils.make_grid(x.detach().cpu().clip(0, 255), nrow=8).permute(1,2,0))

In [None]:
net.save_pretrained("D://EmotionalArtGeneration//pretrained10epoch")