In [1]:
import torch
import torch.nn as nn
#from DCGan import *
import os
from torch.autograd import Variable
import cv2
from tqdm import tqdm
import torchvision.transforms as transforms
from torchvision.utils import *
from torchvision import datasets
import matplotlib.pyplot as plt
import matplotlib.animation as anim
from IPython.display import HTML

In [2]:
import numpy as np
class DCDiscriminator(nn.Module):
    def __init__(self, img_size:int=32, channels:int=1):
        super().__init__()
        def discriminator_block(in_filters, out_filters, bn=True):
            block = [nn.Conv2d(in_filters, out_filters, 3, 2, 1), nn.LeakyReLU(0.2, inplace=True), nn.Dropout2d(0.25)]
            if bn:
                block.append(nn.BatchNorm2d(out_filters, 0.8))
            return block

        self.model = nn.Sequential(
            *discriminator_block(channels, 16, bn=False),
            *discriminator_block(16, 32),
            *discriminator_block(32, 64),
            *discriminator_block(64, 128),
        )
        ds_size = img_size // 2 ** 4
        self.adv_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 1), nn.Sigmoid())

    def forward(self, x):
        x = self.model(x)
        x = x.view(x.shape[0], -1)
        x = self.adv_layer(x)
        return x
    
class DCGenerator(nn.Module):
    def __init__(self, img_size:int=32, latent_dim:int=100, channels:int=1):
        super().__init__()
        self.init_size = img_size // 4
        self.L1 = nn.Sequential(nn.Linear(latent_dim, 128 * self.init_size ** 2))

        self.conv_blocks = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, channels, 3, stride=1, padding=1),
            nn.Tanh(),
        )

    def forward(self, x):
        x = self.L1(x)
        x = x.view(x.shape[0], 128, self.init_size, self.init_size)
        x = self.conv_blocks(x)
        return x
    
class NewGenerator(nn.Module):
    def __init__(self, 
                 latent:int=100, #size latent vector z
                 ngf:int=64, #size of feature map in generator
                 nc:int=3 #num channell img
                 ) -> None:       
        super().__init__()
        self.model_generator = nn.Sequential(
            nn.ConvTranspose2d(latent, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),
            
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),
            
            nn.ConvTranspose2d( ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),
            
            nn.ConvTranspose2d( ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            
            nn.ConvTranspose2d( ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh()
        )
    def forward(self, X):
        return self.model_generator(X)

class NewDiscriminator(nn.Module):
    def __init__(self,
                 nc:int=3, #num channell img
                 ndf:int=64 #size of feature map in discriminator
                 ) -> None:
        super().__init__()
        self.model_discriminator = nn.Sequential(
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
           
            nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        return 

def normal_weights(model):
    classname = model.__class__.__name__
    if classname.find("Conv") != -1:
        torch.nn.init.normal_(model.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm2d") != -1:
        torch.nn.init.normal_(model.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(model.bias.data, 0.0)


In [3]:
gen = NewGenerator()
gen

NewGenerator(
  (model_generator): Sequential(
    (0): ConvTranspose2d(100, 512, kernel_size=(4, 4), stride=(1, 1), bias=False)
    (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): ConvTranspose2d(512, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (4): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (7): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU(inplace=True)
    (9): ConvTranspose2d(128, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (10): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): ReLU(inplace=True)
    (12): ConvTranspose2d(64, 3, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (13):

In [4]:
gen1 = DCGenerator()
gen1

DCGenerator(
  (L1): Sequential(
    (0): Linear(in_features=100, out_features=8192, bias=True)
  )
  (conv_blocks): Sequential(
    (0): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (1): Upsample(scale_factor=2.0, mode='nearest')
    (2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): BatchNorm2d(128, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
    (4): LeakyReLU(negative_slope=0.2, inplace=True)
    (5): Upsample(scale_factor=2.0, mode='nearest')
    (6): Conv2d(128, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): BatchNorm2d(64, eps=0.8, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=0.2, inplace=True)
    (9): Conv2d(64, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): Tanh()
  )
)

In [43]:
class Alfa:
    def __init__(self, **kwargs) -> None:
        self.a = kwargs.get("a")
        print(self.a)
    def train(self):
        for j in range(1):
            for i, _ in enumerate((pbar := tqdm(self.a))):
                print(i, _)
        

In [44]:
p = Alfa(a=list(range(100)))


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]


In [None]:
img_list = []
G_losses = []
D_losses = []
iters = 0
real_label = 1.
fake_label = 0.
fixed_noise = torch.randn(64, latent, 1, 1, device=device)
for epoch in range(EPOCHS):
    for i, data in enumerate(dataloader):

        discriminator.zero_grad()
        # Format batch
        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)
        label = torch.full((b_size,), real_label, dtype=torch.float, device=device)
        # Forward pass real batch through D
        output = discriminator(real_cpu).view(-1)
        # Calculate loss on all-real batch
        errD_real = Loss(output, label)
        # Calculate gradients for D in backward pass
        errD_real.backward()
        D_x = output.mean().item()

        ## Train with all-fake batch
        # Generate batch of latent vectors
        noise = torch.randn(b_size, nz, 1, 1, device=device)
        # Generate fake image batch with G
        fake = netG(noise)
        label.fill_(fake_label)
        # Classify all fake batch with D
        output = discriminator(fake.detach()).view(-1)
        # Calculate D's loss on the all-fake batch
        errD_fake = Loss(output, label)
        # Calculate the gradients for this batch, accumulated (summed) with previous gradients
        errD_fake.backward()
        D_G_z1 = output.mean().item()
        # Compute error of D as sum over the fake and the real batches
        errD = errD_real + errD_fake
        # Update D
        optimizerD.step()

        ############################
        # (2) Update G network: maximize log(D(G(z)))
        ###########################
        netG.zero_grad()
        label.fill_(real_label)  # fake labels are real for generator cost
        # Since we just updated D, perform another forward pass of all-fake batch through D
        output = netD(fake).view(-1)
        # Calculate G's loss based on this output
        errG = criterion(output, label)
        # Calculate gradients for G
        errG.backward()
        D_G_z2 = output.mean().item()
        # Update G
        optimizerG.step()

        if (iters % 500 == 0) or ((epoch == num_epochs-1) and (i == len(dataloader)-1)):
            with torch.no_grad():
                fake = netG(fixed_noise).detach().cpu()
            img_list.append(vutils.make_grid(fake, padding=2, normalize=True))

        iters += 1

In [5]:
?enumerate

[1;31mInit signature:[0m [0menumerate[0m[1;33m([0m[0miterable[0m[1;33m,[0m [0mstart[0m[1;33m=[0m[1;36m0[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[1;31mDocstring:[0m     
Return an enumerate object.

  iterable
    an object supporting iteration

The enumerate object yields pairs containing a count (from start, which
defaults to zero) and a value yielded by the iterable argument.

enumerate is useful for obtaining an indexed list:
    (0, seq[0]), (1, seq[1]), (2, seq[2]), ...
[1;31mType:[0m           type
[1;31mSubclasses:[0m     


In [45]:
p.train()

100%|██████████| 100/100 [00:00<00:00, 99888.16it/s]

0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
20 20
21 21
22 22
23 23
24 24
25 25
26 26
27 27
28 28
29 29
30 30
31 31
32 32
33 33
34 34
35 35
36 36
37 37
38 38
39 39
40 40
41 41
42 42
43 43
44 44
45 45
46 46
47 47
48 48
49 49
50 50
51 51
52 52
53 53
54 54
55 55
56 56
57 57
58 58
59 59
60 60
61 61
62 62
63 63
64 64
65 65
66 66
67 67
68 68
69 69
70 70
71 71
72 72
73 73
74 74
75 75
76 76
77 77
78 78
79 79
80 80
81 81
82 82
83 83
84 84
85 85
86 86
87 87
88 88
89 89
90 90
91 91
92 92
93 93
94 94
95 95
96 96
97 97
98 98
99 99





In [None]:
import random

In [52]:
class  DCGanTrainer():
    def __init__(self, **kwargs) -> None:
        self.img_size = kwargs.get("img_size")
        #self.dataloader = kwargs.get("dataloader")
        self.discriminator = kwargs.get("discriminator")
        self.generator = kwargs.get("generator")
        ngpu = kwargs.get("ngpu")
        self.lr = kwargs.get("lr")
        self.betas = kwargs.get("betas") #tuple: (0.5, 0.999)
        self.batch_size = kwargs.get("batch_size")
        self.latent_dim = kwargs.get("latent_dim")
        self.epochs = kwargs.get("epochs")
        self.optim = kwargs.get("optimizer")
        self.Loss = kwargs.get("loss")
        self.Tensor = torch.cuda.FloatTensor if torch.cuda.is_available() else torch.FloatTensor
        self.vid = kwargs.get("video")
        seed = kwargs.get("seed")
        self.nworker = kwargs.get("nworker")
        self.path2data = kwargs.get("path_to_data")
        if self.vid:
            self.VideoGenerator = VideoGenerator(size=(self.img_size, self.img_size))
        
        if seed is not None:
            random.seed(seed)
            torch.manual_seed(seed=seed)
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.discriminator.to(self.device)
        self.generator.to(self.device)

        def parallel(model):
            model = nn.DataParallel(model, list(range(ngpu)))

        def create_dataloader(self, path_dataset:str, nworker:int=1, batch_size:int=128, img_size:int=32):
                    dataset = datasets.ImageFolder(root=path_dataset, 
                                                transform=transforms.Compose([
                                                    transforms.Resize(img_size),
                                                    transforms.CenterCrop(img_size),
                                                    transforms.ToTensor(),
                                                    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                                ]))
                    return torch.utils.data.DataLoader(dataset, 
                                                    batch_size=batch_size, 
                                                    shuffle=True,
                                                    num_workers=nworker)
        if kwargs.get("parallel"):
            parallel(self.discriminator)
            parallel(self.generator)

        self.generator.apply(normal_weights)
        self.discriminator.apply(normal_weights)

        self.optimizer_G = self.optim(self.generator.parameters(), lr=self.lr, betas=self.betas)
        self.optimizer_D = self.optim(self.discriminator.parameters(), lr=self.lr, betas=self.betas)
        
        self.dataloader = create_dataloader(self.path2data, self.nworker, self.batch_size, self.img_size)
    def train(self):
        f=0
        for epoch in range(self.epochs):
            for i, (imgs, _) in enumerate((pbar := tqdm(self.dataloader))):

            
                valid = Variable(self.Tensor(imgs.shape[0], 1).fill_(1.0), requires_grad=False)
                fake = Variable(self.Tensor(imgs.shape[0], 1).fill_(0.0), requires_grad=False)

                
                real_imgs = Variable(imgs.type(self.Tensor))
                self.optimizer_G.zero_grad()

            
                z = Variable(self.Tensor(np.random.normal(0, 1, (imgs.shape[0], self.latent_dim))))

            
                gen_imgs = self.generator(z)

                g_loss = self.Loss(self.discriminator(gen_imgs), valid)

                g_loss.backward()
                self.optimizer_G.step()

            
                self.optimizer_D.zero_grad()

                real_loss = self.Loss(self.discriminator(real_imgs), valid)
                fake_loss = self.Loss(self.discriminator(gen_imgs.detach()), fake)
                d_loss = (real_loss + fake_loss) / 2

                d_loss.backward()
                self.optimizer_D.step()

                #print(
                #    "[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]"
                #    % (epoch, n_epochs, i, len(dataloader), d_loss.item(), g_loss.item())
                #)
        
                if self.vid:
                    self.VideoGenerator.add_image(make_grid(gen_imgs, padding=2, normalize=True))
                else:
                    save_image(gen_imgs.data[:25], f"images/{f}.png", nrow=5, normalize=True)
                    f+=1
                pbar.set_description(f"LossG {g_loss.item()}\tLossD {d_loss.item()}")
                
    def train_new(self):
        self.img_list = []
        G_losses = []
        D_losses = []
        iters = 0
        real_label = 1.
        fake_label = 0.
        fixed_noise = torch.randn(64, self.latent_dim, 1, 1, device=self.device)
        for epoch in range(self.epochs):
            for i, data in enumerate(self.dataloader):

                self.discriminator.zero_grad()
                # Format batch
                real_cpu = data[0].to(self.device)
                b_size = real_cpu.size(0)
                label = torch.full((b_size,), real_label, dtype=torch.float, device=self.device)
                # Forward pass real batch through D
                output = self.discriminator(real_cpu).view(-1)
                # Calculate loss on all-real batch
                errD_real = self.Loss(output, label)
                # Calculate gradients for D in backward pass
                errD_real.backward()
                D_x = output.mean().item()

                ## Train with all-fake batch
                # Generate batch of latent vectors
                noise = torch.randn(b_size, self.latent_dim, 1, 1, device=self.device)
                # Generate fake image batch with G
                fake = self.discriminator(noise)
                label.fill_(fake_label)
                # Classify all fake batch with D
                output = self.discriminator(fake.detach()).view(-1)
                # Calculate D's loss on the all-fake batch
                errD_fake = self.Loss(output, label)
                # Calculate the gradients for this batch, accumulated (summed) with previous gradients
                errD_fake.backward()
                D_G_z1 = output.mean().item()
                # Compute error of D as sum over the fake and the real batches
                errD = errD_real + errD_fake
                # Update D
                self.optimizer_D.step()

                ############################
                # (2) Update G network: maximize log(D(G(z)))
                ###########################
                self.generator.zero_grad()
                label.fill_(real_label)  # fake labels are real for generator cost
                # Since we just updated D, perform another forward pass of all-fake batch through D
                output = self.discriminator(fake).view(-1)
                # Calculate G's loss based on this output
                errG = self.Loss(output, label)
                # Calculate gradients for G
                errG.backward()
                D_G_z2 = output.mean().item()
                # Update G
                self.optimizer_G.step()

                if i % 50 == 0:
                    print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                        % (epoch, self.epochs, i, len(self.dataloader),
                            errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
                G_losses.append(errG.item())
                D_losses.append(errD.item())

                if (iters % 500 == 0) or ((epoch == self.epochs-1) and (i == len(self.dataloader)-1)):
                    with torch.no_grad():
                        fake = self.generator(fixed_noise).detach().cpu()
                    self.img_list.append(make_grid(fake, padding=2, normalize=True))

                iters += 1
        

SyntaxError: f-string: empty expression not allowed (4088723116.py, line 80)

In [None]:
datasets.CelebA

In [51]:
class VideoGenerator:
    def __init__(self, size: tuple = (256, 256), result_path:str="./video", format_out:str="MP4V", n_second:int=None):
        self.Images = list()
        self.format = format_out
        self.n_second = n_second
        self.path = result_path
    def add_image(self, image):
        self.Images.append(image)
        
    def generate_video(self):
        if self.n_second is None:
            fps = 1
        else:
            fps = len(self.Images)//self.n_second
        name = "gan_traine/"
        path = os.path.join(self.path, name)
        os.makedirs(path, exist_ok=True)
        vid = cv2.VideoCapture(path)
        #vid_name = os.path.basename(video)
        #total = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
        #fps = vid.get(cv2.CAP_PROP_FPS)
        #w = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
        #h = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
        codec = cv2.VideoWriter_fourcc(*self.format)
        out = cv2.VideoWriter(f'{path}GAN.mp4',codec , fps, self.size)
        for img in self.Images:
            out.write(img)
        out.release()
    def plt_animate(self, in_notebook:bool=False):
        fig = plt.figure(figsize=(10, 10))

        plt.axis("off")
        ims = [[plt.imshow(np.transpose(i, (1,2,0)), animated=True)] for i in self.Images]        
        ani =  anim.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

        if in_notebook:
            HTML(ani.to_jshtml())


In [25]:
Trainer = DCGanTrainer(img_size=32, 
                       dataloader=None, 
                       discriminator=DCDiscriminator(), 
                       generator=DCGenerator(), 
                       ngpu=0, 
                       parallel=False, 
                       optimizer=torch.optim.Adam, 
                       betas=(0.5, 0.999), 
                       batch_size=200,
                       lr=2e-4,
                       epochs=10,
                       latent_dim=100,
                       loss=nn.BCELoss())

In [26]:
Trainer.optimizer_G

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.5, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.0002
    maximize: False
    weight_decay: 0
)

In [1]:
8//3 == int(8/3)

True

In [3]:
2000 % 1909

91