In [1]:
import torch
import torchvision
import os
import argparse
import time as t
import numpy as np
import torchvision.datasets as dset
from torch.autograd import Variable
from torch import nn as nn
from torch import optim as optim
from torch.utils import data
from torchvision import transforms
from torchvision import utils
import matplotlib.pyplot as plt
from tqdm import tqdm

# # # * > > DCGAN模型******

In [2]:
#图片集路径
dataset_path='../input/fruit30/fruit30_split'
train_path=os.path.join(dataset_path,"train")
test_path=os.path.join(dataset_path,"val")
#图片预处理和读取
def load_data(args):
    train_trans=transforms.Compose([
          transforms.RandomResizedCrop(32),
          transforms.RandomHorizontalFlip(),
          transforms.ToTensor(),
          transforms.Normalize([0.5,0.5,0.5], [0.5,0.5,0.5])
     ])
   #RCTN
    test_trans=transforms.Compose([
              transforms.Resize(64),
              transforms.CenterCrop(32),
              transforms.ToTensor(),
              transforms.Normalize([0.5,0.5,0.5], [0.5,0.5,0.5])
    ])
    train_data=dset.ImageFolder(train_path,train_trans)
    test_data=dset.ImageFolder(test_path,test_trans)
    train_loader=data.DataLoader(train_data,shuffle=True,batch_size=args.batch_size,drop_last=True )
    test_loader = data.DataLoader(test_data,  batch_size=args.batch_size, shuffle=True)
    return train_loader,test_loader
# #初始化参数权重
# def weights_init(m):
#     classname=m.__class__.__name__
#     if classname.find('Conv')!=-1:
#         torch.nn.init.normal_(m.weight,0.0,0.02)
#     elif classname.find('BatchNorm')!=-1:
#         torch.nn.init.normal_(m.weight,1.0,0.02)
#         torch.nn.init.zeros_(m.bias)
#生成器
class Generator(nn.Module):
    def __init__(self,channels):
        super(Generator,self).__init__()
        self.gen=nn.Sequential(
               #100dim
               nn.ConvTranspose2d(100,1024,kernel_size=4,stride=1,padding=0),
               nn.BatchNorm2d(1024),nn.ReLU(True),
               #4*4*1024
               nn.ConvTranspose2d(1024,512,kernel_size=4,stride=2,padding=1),
               nn.BatchNorm2d(512),nn.ReLU(True),
               #8*8*512
               nn.ConvTranspose2d(512,256,kernel_size=4,stride=2,padding=1),
               nn.BatchNorm2d(256),nn.ReLU(True),
              #16*16*256
               nn.ConvTranspose2d(256,channels,kernel_size=4,stride=2,padding=1),  
        )
        self.out=nn.Tanh()
    def forward(self,x):
        y=self.out(self.gen(x))
        return y
#判别器
class Discriminator(nn.Module):
    def __init__(self,channels):
        super(Discriminator,self).__init__()
        self.D=nn.Sequential(
                 #32*32*c
                 nn.Conv2d(channels,256,kernel_size=4,stride=2,padding=1),
                 nn.LeakyReLU(0.2,inplace=True),
                 #16*16*256
                 nn.Conv2d(256,512,kernel_size=4,stride=2,padding=1),
                 nn.BatchNorm2d(512),nn.LeakyReLU(0.2,inplace=True),
                 #8*8*512
                 nn.Conv2d(512,1024,kernel_size=4,stride=2,padding=1),
                 nn.BatchNorm2d(1024),nn.LeakyReLU(0.2,inplace=True))
        self.D_out=nn.Sequential(
                  #4*4*1024
                 nn.Conv2d(1024,1,kernel_size=4,stride=1,padding=0),nn.Sigmoid())
    def forward(self,x):
        y=self.D_out(self.D(x))
        return y
    def feature_extract(self,x):
        y=self.D(x)
        return y.view(-1,1024*4*4)
      
        


In [None]:
#DCGAN模型
class DCGAN(object):
    def __init__(self,args):
        #通道和生成器，判别器
        self.C=args.channels
        self.G=Generator(args.channels)
        self.D=Discriminator(args.channels)
        #定义其他超参数
        self.cuda=False
        self.cuda_index=0
        self.check_cuda(args.cuda)
        self.epochs = args.epochs
        self.batch_size =args.batch_size
        self.loss=nn.BCELoss()
        self.G_optim=optim.Adam(self.G.parameters(),lr=0.0002,betas=(0.5,0.999))
        self.D_optim=optim.Adam(self.D.parameters(),lr=0.0002,betas=(0.5,0.999))
    def check_cuda(self,cuda_flag=False):
        if cuda_flag:
            self.G.cuda(self.cuda_index)
            self.D.cuda(self.cuda_index)
            self.loss=nn.BCELoss().cuda(self.cuda_index)
            print(f"cuda:{self.cuda}")
    #时间格式转换    
    def time_change(self,time):
        timeArray=t.localtime(time)
        otherStyleTime = t.strftime("%Hh%Mm%Ss", timeArray)
        return   otherStyleTime          
    #保存模型
    def save_model(self):
        print("saving...")
        torch.save(self.G.state_dict(), './generator.pth')
        torch.save(self.D.state_dict(), './discriminator.pth')
        print('Models save to ./generator.pth & ./discriminator.pth') 
#     #预测图图
#     def gen_img_plot(self,model,test_input):
#         prediction = np.squeeze(model(test_input).cpu().detach())
#         fig = plt.figure(figsize=(8, 8))
#         for i in range(16):
#             plt.subplot(4, 4, i+1)
#             plt.imshow(np.transpose(prediction[i],(1,2,0)))
#             plt.axis('off')#关闭坐标
#         plt.show()
    #加载模型   
    def load_model(self, D_model_path, G_model_path):
#         D_model_path = os.path.join(os.getcwd(), D_model_filename)
#         G_model_path = os.path.join(os.getcwd(), G_model_filename)
        self.D.load_state_dict(torch.load(D_model_path))
        self.G.load_state_dict(torch.load(G_model_path))
        print('Generator model loaded from {}.'.format(G_model_path))
        print('Discriminator model loaded from {}-'.format(D_model_path))
    #测试验证
    def test(self,test_iter,D_path,G_path):
        print("testing....")
        self.load_model(D_path,G_path)
        z=Variable(torch.randn(self.batch_size,100,1,1)).cuda(self.cuda_index)
        simple=self.G(z)
        simple=simple.mul(0.5).add(0.5)
        simple=utils.make_grid(simple)
        utils.save_image(simple,f"dcgan_model_result.png")
        print("测试集图片已保存！")    
    def train(self,train_loader):
        G_loss=0
        D_loss=0
        G_iter=0
        G=[]
        D=[]
        e=[]
        print("training...")
        self.t_begin=t.time()
        for epoch in range(self.epochs):
            count=len(train_loader)
            z=torch.randn(self.batch_size,100,1,1)
            z=Variable(z).cuda(self.cuda_index)
            for i,(img,_) in tqdm(enumerate(train_loader)):
                #定义随机噪音
                real_labels=torch.ones(self.batch_size)
                fake_labels=torch.zeros(self.batch_size)
                #GPU训练
                img=Variable(img).cuda(self.cuda_index)
                real_labels,fake_labels=Variable(real_labels).cuda(self.cuda_index),Variable(fake_labels).cuda(self.cuda_index)
                # Train discriminator
                noise_random=Variable(torch.randn(self.batch_size,100,1,1)).cuda(self.cuda_index)
                #true
                self.D_optim.zero_grad()
                real_out=self.D(img).squeeze()
                d_real_loss=self.loss(real_out,real_labels)
                 #false
                fake_img=self.G(noise_random)
                fake_out=self.D(fake_img).squeeze()
                d_fake_loss=self.loss(fake_out,fake_labels)
                d_loss=(d_real_loss+d_fake_loss)/2
                d_loss.backward()
                self.D_optim.step()
                # Train generator
                fake_out_g=self.G(noise_random)
                fake_out=self.D(fake_out_g).squeeze()
                g_loss=self.loss(fake_out,real_labels)
                self.D_optim.zero_grad()
                self.G_optim.zero_grad()
                g_loss.backward()
                self.G_optim.step()
                with torch.no_grad():
                    D_loss+=d_loss
                    G_loss+=g_loss
            with torch.no_grad():
                D_loss/=count
                G_loss/=count 
                G_iter+=1
            G.append(G_loss.cpu().numpy())
            D.append(D_loss.cpu().numpy())
            e.append(epoch+1)
            self.t_epoch_end=t.time()    
            if G_iter%10==0  or epoch==0:
                print(f"训练进度:{epoch+1}/{self.epochs},G_loss:{g_loss:.3f},D_loss:{d_loss:.3f},"
                f"time:{self.time_change(self.t_epoch_end-self.t_begin)}")
#                 self.gen_img_plot(self.G,z)
            if not os.path.exists('training_results/'):
                        os.makedirs('training_results/') 
            #保存生成的图片        
            z = Variable(torch.randn(800, 100, 1, 1)).cuda(self.cuda_index)
            samples = self.G(z)
            samples = samples.mul(0.5).add(0.5)
            samples = samples.data.cpu()[:64]
            grid = utils.make_grid(samples)
            utils.save_image(grid, f'training_results/img_generator_iter{str(G_iter).zfill(2)}.png')
        #计算总耗时以及保存模型
        self.t_end=t.time()
        print(f"总耗时为：{self.time_change(self.t_end-self.t_begin)}")
        plt.title("DCGAN")
        plt.plot(e,G,label="G_loss")
        plt.plot(e,D,label="D_loss")
        plt.grid()
        plt.legend()
        plt.show()
        # 保存
        self.save_model() 
#定义配置类
class config():
    def __init__(self):
        self.batch_size=64
        self.cuda=torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.channels=3
        self.lr=0.0002
        self.epochs=1000
        self.D_path='./discriminator.pth'
        self.G_path='./generator.pth'
#主函数
def main(args):
    model=DCGAN(args)
    train_iter,test_iter=load_data(args)
    model.train(train_iter)
    model.test(test_iter,args.D_path,args.G_path)
if __name__=="__main__":
    args=config()
    main(args)            
        