## Evaluation over Sample Size on MNIST

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import CelebA,MNIST
import torchvision.datasets as dset
import matplotlib.pyplot as plt
import numpy as np
from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings("ignore",category=UserWarning)

import timeit
from matplotlib.pyplot import cm

    
import rpy2.robjects.numpy2ri
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
robjects.numpy2ri.activate()
base = importr('base')
#utils = importr('utils')
#utils.install_packages('rvinecopulib')
#utils.install_packages('Rcpp')
rvinecop = importr('rvinecopulib')

from sklearn.mixture import GaussianMixture
from sklearn.neighbors import KernelDensity
from sklearn.model_selection import GridSearchCV

from torch import nn
from torch import distributions
from torch.nn.parameter import Parameter

from AE import *
from Sampling import *
from Metric import *
from RealNVP import *

trans0 = transforms.Compose([
    transforms.ToTensor(),
    transforms.Pad(2),
    transforms.ToPILImage(),
    transforms.ToTensor(),
])
path = %pwd

In [None]:
# Load data
dataset_train = MNIST(path,train=True, transform=trans0, download=True)
dataset_test = MNIST(path,train=False, transform=trans0, download=True)

In [None]:
# Initialize model

model_AE = AE_MNIST(image_size=32, channel_num=1,kernel_num=128, z_size=10)
model_AE.load_state_dict(torch.load('./ae_MNIST_200.pth',map_location=torch.device('cpu')))
model_VAE = VAE_MNIST(image_size=32, channel_num=1, kernel_num=128, z_size=10)
model_VAE.load_state_dict(torch.load('./vae_MNIST_200.pth'
                                          ,map_location=torch.device('cpu')))

In [None]:
interval = 200; nr_model = 8 ; nr_metric = 6
score = np.zeros((10,nr_model,nr_metric)); time = np.zeros((10,nr_model))


dataloader_test = DataLoader(dataset_test, batch_size=2000, shuffle=True)
img_test,attr = next(iter(dataloader_test))
dataloader_train = DataLoader(dataset_train, batch_size=4000, shuffle=True)
img_train,attr = next(iter(dataloader_train))

for n in range(200,2001,200):   
    i = int(n/200-1) 
    
    # Get latent variable
    with torch.no_grad():
        lv = model_AE.encode(img_train)
    lv = lv.detach().numpy()


    # Test set
    
    with torch.no_grad():
            y_test = model_AE.encode(img_test)
    
    for m in [0,1,2,3,4,5,6,7]:
        s0 = timeit.default_timer()
        y_sample=np.zeros((2000,lv.shape[1]))
        
            
        # Beta Copula
        if m == 0: 
            for num in range(0,int(2000/n)):
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                y_sample[num*n:(n)*(num+1),:] = sampleing1(lv_new, lv_new, lv_new.shape[0])
            if 2000%n!=0:
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                y_sample[int(2000/n)*n:,:]=sampleing1(lv_new, lv_new, 2000%n)
                    
            y_sample=torch.Tensor(y_sample)
                
          
        # VAE
        elif m == 1:            
            y_sample=torch.randn(2000, 10)
           

        # Vine copula trun_level=15
        elif m == 2:
            copula_controls = base.list(family_set="tll", trunc_lvl=15, cores=1)
            for num in range(0,int(2000/n)):
                fixed_noise = np.random.rand(n, lv.shape[1]) 
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                vine_obj = rvinecop.vine(lv_new, copula_controls=copula_controls)
                sampled_r = rvinecop.inverse_rosenblatt(fixed_noise, vine_obj)
                y_sample[num*n:(n)*(num+1),:] =torch.Tensor( np.asarray(sampled_r)).view(n, -1).to("cpu")
            if 2000%n!=0:
                fixed_noise = np.random.rand(2000%n, lv.shape[1]) 
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                vine_obj = rvinecop.vine(lv_new, copula_controls=copula_controls)
                sampled_r = rvinecop.inverse_rosenblatt(fixed_noise, vine_obj)
                y_sample[int(2000/n)*n:,:]=torch.Tensor(np.asarray(sampled_r)).view(2000%n, -1).to("cpu")
            del sampled_r  
            y_sample=torch.Tensor(y_sample)
                
                
        # Gauss  
        elif m==3:
            for num in range(0,int(2000/n)):
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                mean = np.mean(lv_new, axis=0)
                cov = np.cov(lv_new, rowvar=0)
                y_sample[num*n:(n)*(num+1),:] = torch.tensor(np.random.multivariate_normal(mean, cov, lv_new.shape[0])).float()
            if 2000%n!=0:
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                mean = np.mean(lv_new, axis=0)
                cov = np.cov(lv_new, rowvar=0)
                y_sample[int(2000/n)*n:,:] = torch.tensor(np.random.multivariate_normal(mean, cov, 2000%n)).float()
            y_sample=torch.tensor(y_sample).float()
                    
                
        # Independent
        if m == 4: 
            for num in range(0,int(2000/n)):
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                y_sample[num*n:(n)*(num+1),:] = indep_sampling(lv_new, lv_new, lv_new.shape[0])
            if 2000%n!=0:
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                y_sample[int(2000/n)*n:,:]=indep_sampling(lv_new, lv_new, 2000%n)
                    
            y_sample=torch.tensor(shuffle_along_axis(y_sample, axis=0)).float()
                
            
        # GMM
        if m == 5: 
            for num in range(0,int(2000/n)):
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                gm = GaussianMixture(n_components=10, random_state=0).fit(lv_new)
                y_sample[num*n:(n)*(num+1),:] = torch.tensor(gm.sample(n_samples=lv_new.shape[0])[0]).float()
            if 2000%n!=0:
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                gm = GaussianMixture(n_components=10, random_state=0).fit(lv_new)
                y_sample[int(2000/n)*n:,:] = torch.tensor(gm.sample(n_samples=2000%n)[0]).float()
                y_sample=torch.tensor(y_sample).float()    
                
                
        # KDE
        if m == 6: 
            for num in range(0,int(2000/n)):
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                grid = GridSearchCV(KernelDensity(),
                {'bandwidth': np.linspace(0.1, 2.0, 40)},
                cv=10) 
                grid.fit(lv_new)
                kde = grid.best_estimator_
                lvnew_kde=kde.sample(n_samples=lv_new.shape[0])
                lvnew_kde=np.array(lvnew_kde,dtype=np.double)           
                y_sample[num*n:(n)*(num+1),:] = torch.tensor(lvnew_kde).float()
                
            if 2000%n!=0:
                randnum=np.random.choice(4000,size=n,replace=False)
                lv_new=lv[randnum]
                grid = GridSearchCV(KernelDensity(),
                {'bandwidth': np.linspace(0.1, 2.0, 40)},
                cv=10) 
                grid.fit(lv_new)
                kde = grid.best_estimator_
                lvnew_kde=kde.sample(n_samples=2000%n)
                lvnew_kde=np.array(lvnew_kde,dtype=np.double)
                y_sample[int(2000/n)*n:,:] = torch.tensor(lvnew_kde).float()
                y_sample=torch.tensor(y_sample).float() 
                
        # Real NVP
        elif m==7:
            randnum=np.random.choice(4000,size=n,replace=False)#np.random.randint(0,2000,size=n)
            lv_new=lv[randnum]
            masks = torch.from_numpy(np.array([[0, 1, 0, 1, 0, 1, 0, 1, 0, 1], [1, 0, 1, 0, 1, 0, 1, 0, 1, 0]] * 3).astype(np.float32))
            nets = lambda: nn.Sequential(nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10), nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Tanh())
            nett = lambda: nn.Sequential(nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10))
            prior = distributions.MultivariateNormal(torch.zeros(10), torch.eye(10))
            flow = RealNVP(nets, nett, masks, prior)
            optimizer = torch.optim.Adam([p for p in flow.parameters() if p.requires_grad==True], lr=1e-4)
            loss_hist = np.array([])
            num_samples=128
            for a in tqdm(range(2001)): 
                helper=np.random.randint(0,n-1,size=num_samples)
                x_np= lv_new[helper]
                x_np = x_np.astype(np.float32)
                loss = -flow.log_prob(torch.from_numpy(x_np)).mean()
                optimizer.zero_grad()
                loss.backward(retain_graph=True)
                optimizer.step()
                    
            x = flow.sample(2000).detach().numpy()
            x=np.reshape(x,newshape=(x.shape[0],x.shape[2]))
            y_sample=torch.tensor(x).float()
            
            
            # Generate iamges by deoding
        if m == 2: 
            img_new = model_VAE.decode(torch.tensor(y_sample).float())
        else:
            img_new = model_AE.decode(torch.tensor(y_sample).float())
                
        with torch.no_grad():
            sc1 = compute_score(img_test,img_new)
            sc2 = compute_score(y_test,y_sample)
            for j in range(3): score[i,m,j] = sc1[j]
            for j in range(3,6): score[i,m,j] = sc2[j-3]
            print('Finished_{}'.format(m))
    
    print("Finished:",n)

In [None]:
from matplotlib.pyplot import cm
score1=score[:10,:,:]
xaxis = [i for i in range(200,2001,200)]
title = ['EMD PIXEL','MMD PIXEL','1NN PIXEL','EMD CONV','MMD CONV','1NN CONV']
label =  ['EBCAE','VAE','VCAE_15','Gauss','Independent','GMM','KDE']
plt.rcParams["font.family"] = "Times New Roman"
fig=plt.figure(figsize=(8.27, 3.8))
k=1
for i in [0,3,1,4,2,5]:
    ax = plt.subplot(2,3,k)
    color = iter(cm.Set2(np.linspace(0, 1, 8)))
    plt.xticks(xaxis[0::2])
    for j in [0,1,2,3,4,5,6,7]:
        c = next(color)
        plt.xticks(fontsize=10)
        plt.yticks(fontsize=10)
        ax.plot(xaxis,score1[:,j,i],'--bo',label=label[j], color=c,markersize=3.5)
        plt.xlabel('epochs',fontsize=10)
        
        
    ax.set_title(title[i])
    k+=1
    
lines_labels = [axi.get_legend_handles_labels() for axi in fig.axes]
lines, labels = [sum(lol, []) for lol in zip(*lines_labels)]   


lines=[lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6]]
labels=[labels[0],labels[1],labels[2],labels[3],labels[4],labels[5],labels[6]]

bbox_transform=fig.transFigure
fig.legend(lines, labels, loc='upper center', bbox_to_anchor=(0.5, 0.05),fancybox=False, markerscale=2
           ,shadow=False, framealpha=0, ncol=7, fontsize=10)    

plt.tight_layout()
plt.savefig("score_MNIST_samplesize_VGL.png",dpi=300,bbox_inches='tight')

##### Small sample sizes

### Time Evaluation

In [None]:
dataloader_test = DataLoader(dataset_test, batch_size=2000, shuffle=True)
img_test,attr = next(iter(dataloader_test))
dataloader_train = DataLoader(dataset_train, batch_size=2000, shuffle=True)
img_train,attr = next(iter(dataloader_train))
for n in range(2000,2001,200):   

    
    # Get latent variable
    s0 = timeit.default_timer()
    
    with torch.no_grad():
        lv = model_AE.encode(img_train)
    lv = lv.detach().numpy()

    s1 = timeit.default_timer()
    encoding_time = s1-s0
    print('Encoding Time: ',encoding_time)

    
    for m in [0,1,2,3,4,5,6,7]:
            print(m)
            
            # Beta Copula
            if m == 0:     
                y_sample,learning_time,sampling_time = sampling1_time(lv, lv, lv.shape[0])#, seed=500)        
                y_sample=torch.Tensor(y_sample)
                

            # VAE
            elif m == 1:
                s0 = timeit.default_timer()
                y_sample=torch.randn(2000, 100)
                sampling_time= timeit.default_timer()-s0
                learning_time=0
           

            # Vine copula trun_level=15
            elif m == 2:
                copula_controls = base.list(family_set="tll", trunc_lvl=15, cores=1)
                s0 = timeit.default_timer()
                vine_obj = rvinecop.vine(lv, copula_controls=copula_controls)
                learning_time= timeit.default_timer()-s0
                
                s0 = timeit.default_timer()
                fixed_noise = np.random.rand(n, lv.shape[1])
                sampled_r = rvinecop.inverse_rosenblatt(fixed_noise, vine_obj)
                y_sample =torch.Tensor( np.asarray(sampled_r)).view(n, -1).to("cpu")
                y_sample=torch.Tensor(y_sample)
                sampling_time= timeit.default_timer()-s0
                
                
            # Gauss 
            elif m==3:
                s0 = timeit.default_timer()
                mean = np.mean(lv, axis=0)
                cov = np.cov(lv, rowvar=0)
                learning_time= timeit.default_timer()-s0
                    
                s0 = timeit.default_timer()
                y_sample = torch.tensor(np.random.multivariate_normal(mean, cov, lv.shape[0])).float()
                sampling_time= timeit.default_timer()-s0

                
            # Independent
            elif m == 4:  
                y_sample,learning_time,sampling_time = indep_time(lv, lv, lv.shape[0])#, seed=500)
               
                    
            # GMM
            elif m == 5: 
                s0 = timeit.default_timer()
                gm = GaussianMixture(n_components=10, random_state=0).fit(lv)
                learning_time= timeit.default_timer()-s0
                
                s0 = timeit.default_timer()
                y_sample = torch.tensor(gm.sample(n_samples=lv.shape[0])[0]).float()
                sampling_time= timeit.default_timer()-s0
               
                    
            # KDE
            elif m == 6: 
                s0 = timeit.default_timer()
                grid = GridSearchCV(KernelDensity(),
                {'bandwidth': np.linspace(0.1, 2.0, 40)},
                cv=10) # 20-fold cross-validation
                grid.fit(lv)
                kde = grid.best_estimator_
                learning_time= timeit.default_timer()-s0
                
                s0 = timeit.default_timer()
                lvnew_kde=kde.sample(n_samples=lv.shape[0])
                lvnew_kde=np.array(lvnew_kde,dtype=np.double)
                y_sample = torch.tensor(lvnew_kde).float()
                sampling_time= timeit.default_timer()-s0
                
            elif m==7:
                s0 = timeit.default_timer()
                masks = torch.from_numpy(np.array([[0, 1, 0, 1, 0, 1, 0, 1, 0, 1], [1, 0, 1, 0, 1, 0, 1, 0, 1, 0]] * 3).astype(np.float32))
                nets = lambda: nn.Sequential(nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10), nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Tanh())
                nett = lambda: nn.Sequential(nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10),nn.Linear(10, 256), nn.LeakyReLU(), nn.Linear(256, 256), nn.LeakyReLU(), nn.Linear(256, 10))
                prior = distributions.MultivariateNormal(torch.zeros(10), torch.eye(10))
                flow = RealNVP(nets, nett, masks, prior)
                optimizer = torch.optim.Adam([p for p in flow.parameters() if p.requires_grad==True], lr=1e-4)
                loss_hist = np.array([])
                num_samples=128
                for a in tqdm(range(2001)): 
                    helper=np.random.randint(0,2000,size=num_samples)
                    x_np= lv[helper]
                    x_np = x_np.astype(np.float32)
                    loss = -flow.log_prob(torch.from_numpy(x_np)).mean()
                    optimizer.zero_grad()
                    loss.backward(retain_graph=True)
                    optimizer.step()
                learning_time= timeit.default_timer()-s0
                
                s0 = timeit.default_timer()
                x = flow.sample(2000).detach().numpy()
                x=np.reshape(x,newshape=(x.shape[0],x.shape[2]))
                y_sample=torch.tensor(x).float()
                sampling_time= timeit.default_timer()-s0
              

            print(m,': ',learning_time,sampling_time,learning_time+sampling_time)
            print('Finished_{}'.format(m))
    
    print("Finished:",n,timeit.default_timer()- s1)