In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm

# Pytorch
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader, TensorDataset

# Custom Models
from RBM import RBM
from DAE import DAE

# Display 3D plot
import plotly.graph_objects as go

 # PCA
from scipy.linalg import eigh

In [None]:
# custom 
import matplotlib as mpl
mpl.rcParams['figure.figsize'] = [12, 12]
mpl.rcParams['figure.dpi'] = 100
mpl.rcParams['savefig.dpi'] = 200

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def getAllBatchData(dataLoader):
    data = []
    targets = []
    for d, t in dataLoader:
        data.append(d.numpy())
        targets.append(t.numpy())
    data = np.concatenate(data)
    targets = np.concatenate(targets)
    return data, targets 

## Load MNIST 

In [None]:
flattenTransform = torchvision.transforms.Compose([torchvision.transforms.ToTensor(), torchvision.transforms.Lambda(lambda x: torch.flatten(x))])
MNISTTrain = torchvision.datasets.MNIST(root="./dataset/", train=True, transform=flattenTransform, download=True)
MNISTTest = torchvision.datasets.MNIST(root="./dataset/", train=False, transform=flattenTransform, download=True)

trainDataLoader = DataLoader(MNISTTrain, batch_size=128, shuffle=True)
testDataLoader = DataLoader(MNISTTest, batch_size=128, shuffle=True)

In [None]:
def displayOutput(a, b, dim=(28, 28), title=None, fileName=None):
    fig, axs = plt.subplots(1,2)
    
    for i, data in enumerate((a, b)):
        viewAsImage = data.view(data.shape[0], 1, dim[0], dim[1])
        img = torchvision.utils.make_grid(viewAsImage.data).detach().cpu().numpy()
        axs[i].imshow(np.transpose(img, (1, 2, 0)))
        axs[i].set_axis_off()
        axs[i].autoscale(enable=True)
    
    fig.tight_layout()
    if title is not None: fig.suptitle(title)
    if fileName is not None: plt.savefig(fileName)
    plt.show()

def makeGrid(array, dim=(28, 28), nCols=8):
    width, height = dim
    nindex, dim = array.shape
    assert width*height == dim
    nRows = nindex//nCols
    assert nindex == nRows*nCols
    return (array.reshape(nRows, nCols, height, width)
              .swapaxes(1,2)
              .reshape(height*nRows, width*nCols))
    return result

def displayOutputNumpy(a, b, dim=(28, 28), title=None, fileName=None, cmap=cm.gray):
    fig, axs = plt.subplots(1,2)
    
    for i, data in enumerate((a, b)):
        axs[i].imshow(makeGrid(data, dim, nCols = 8), cmap=cmap)
        axs[i].set_axis_off()
        axs[i].autoscale(enable=True)
    
    fig.tight_layout()
    if title is not None: fig.suptitle(title)
    if fileName is not None: plt.savefig(fileName)
    plt.show()

In [None]:
# custom batch MSE loss
class batchMSELoss(torch.nn.Module):
    def __init__(self, batchSize):
        super(batchMSELoss, self).__init__()
        self.batchSize = batchSize
        self.mse = torch.nn.MSELoss(reduction='sum')
    def forward(self, outputs, targets):
        return self.mse(outputs, targets) / self.batchSize

In [None]:
def trainRBM(rbm, loss, dataLoader, epochs, learningRate, weightDecay=2e-4):
    paddingLength = 1+int(math.log10(epochs)) # for padded print
    
    for epoch in range(epochs):
        epochLoss = 0
        for i, (data, _) in enumerate(dataLoader): # we will use only data
            Vp0 = data.to(device)
            # Vs0 = torch.bernoulli(Vp0)
            # V: Visible | H: Hidden
            # s: sampling | p: probabilities
            # 0: start | k:end
            
            Vpk, Vsk = rbm.gibbsSampling(Vp0, iterations = 1) #Vs0
              
            Hp0, _ = rbm.sampleHidden(Vp0) #Vs0
            Hpk, _ = rbm.sampleHidden(Vpk) #Vsk
            
            rbm.contrastiveDivergence(Vp0, Vpk, Hp0, Hpk, learningRate = learningRate, weightDecay = weightDecay, momentumDamping = 0.5 if epoch < 5 else 0.9)

            epochLoss += loss(Vp0, Vpk)

        if epoch % (epochs/10) == (epochs/10-1):
            print(f"  Epoch[{epoch+1:>{paddingLength}}] Avg. Loss: {epochLoss / len(dataLoader):.5f}")
        
    return

In [None]:
def genNewDataSet(rbm, device, dataLoader):
    # rederive new data loader based on hidden activations of trained model
    newData = []
    for data, _ in dataLoader:
        Hp, _ = rbm.sampleHidden(data.to(device))
        newData.append(Hp.detach().cpu().numpy())
    newData = np.concatenate(newData)
    fakesLabels = np.zeros((len(newData), 1))
    return DataLoader(TensorDataset(torch.Tensor(newData).to(device), torch.Tensor(fakesLabels).to(device)), batch_size=dataLoader.batch_size, shuffle=True)

In [None]:
def bestSquareDim(size): #used to find the rectangle dim close as possible to a square that length*width = size
    start = int(math.sqrt(size))+1
    end = start-1
    while start * end != size:
        start-=1
        while start * end < size:
            end+=1
            # print(f"({start}, {end}) {start * end}")
    return (start, end)
        
# use a Gaussian distribution for the last hidden layer to let it take advantage of continuous values
def layersInfos(layersDim, epochsF = lambda x: (x+1)*10, lrF = lambda x: 10**(-x/2-1)):
    layersInfos = []
    for i, (vDim, hDim) in enumerate(zip(layersDim[:-1], layersDim[1:])):
        displayWidth = int(math.sqrt(vDim))
        layersInfos.append({
            'visibleDim': vDim,
            'hiddenDim': hDim,
            'epochs': epochsF(i),
            'learningRate': lrF(i),
            'useGaussian': i == len(layersInfos)-1,
            'displayDim': bestSquareDim(vDim)
        })
    return layersInfos

## Training RBM layers

In [None]:
def TrainsRBMLayers(layersDim, loss, dataLoader, Display = True, Verbose = True, savePrefixe = None):
    RBMLayers = []
    
    if Verbose:
        print(f"Train layers : {layersDim}")
    for config in layersInfos(layersDim, epochsF = lambda x: (x+1)*5):
        if Verbose:
            print(f"\nTrain layer {config['visibleDim']}-{config['hiddenDim']} ({config['epochs']} Epochs):")
        # create rbm layers
        rbm = RBM(device, config['visibleDim'], config['hiddenDim'], config['useGaussian'], useMomentum = True)
        
        if Verbose: # print initial loss
            data = next(iter(dataLoader))[0].to(device)
            reconstructedVp, _ = rbm.reconstruct(data)
            print(f"  Initial batch loss: {loss(data, reconstructedVp):.5f}")
        
        trainRBM(rbm, loss, dataLoader, config['epochs'], config['learningRate'], weightDecay=2e-4)
        
        # display sample output
        if Display:
            
            data = next(iter(dataLoader))[0].to(device)
            reconstructedVp, _ = rbm.reconstruct(data)
            displayOutput(data, reconstructedVp, config['displayDim'], 
                        title=f"train layer {config['visibleDim']}-{config['hiddenDim']} Avg MSE: {loss(data, reconstructedVp).item()}",
                        fileName = f"./imgs/training/{savePrefixe}_RBMTraining_{config['visibleDim']}-{config['hiddenDim']}.png" if savePrefixe is not None else None)
        RBMLayers.append(rbm)
        dataLoader = genNewDataSet(rbm, device, dataLoader) # generate new data based on this layer
    
    return RBMLayers

In [None]:
RBMLayers = TrainsRBMLayers([784, 1000, 500, 250, 3], batchMSELoss(trainDataLoader.batch_size), trainDataLoader, savePrefixe = "01")

## Build & fine-tune autoencoder

In [None]:
def trainPytorchModel(model, optimizer, loss, dataLoader, epochs, Verbose = True):
    epochsLoss = []
    paddingLength = 1+int(math.log10(epochs)) # for padded print
    for epoch in range(epochs):
        epochLoss = 0
        for batchIdx, (data, _) in enumerate(dataLoader):
            data = data.to(device) # to device

            optimizer.zero_grad() # zero the parameters gradients

            outputs = model(data) # forward
            # outputs = torch.tensor(self.prediction.iloc[idx, :],dtype=torch.long)

            lossValue = loss(data, outputs) # compute loss

            epochLoss += lossValue.item() # record loss

            lossValue.backward() # backward
            optimizer.step()

        epochLoss /= len(dataLoader)
        epochsLoss.append(epochLoss) # record loss

        if Verbose and epoch % (epochs/10) == (epochs/10-1):
            print(f"Epoch[{epoch + 1:>{paddingLength}}] Complete: Avg. Loss: {epochLoss:.8f}")
    return epochsLoss

In [None]:
DAEModel = DAE(RBMLayers).to(device)

In [None]:
batchSize = 256 # use bigger batch size for fine tinning
dataLoader = DataLoader(MNISTTrain, batch_size=batchSize, shuffle=True)

optimizer = torch.optim.Adam(DAEModel.parameters(), lr=1e-3)
loss = batchMSELoss(batchSize)
epochsLoss = trainPytorchModel(DAEModel, optimizer, loss=loss, dataLoader=dataLoader, epochs=50)

# display recored loss values
plt.plot(epochsLoss, color='blue')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

In [None]:
def batchLossReport(dataLoader, model, loss):
    lossValues = []
    model.eval() # Disable some specific layers/parts(Dropouts Layers, BatchNorm Layers, ...)
    with torch.no_grad():
        for inputs, _ in dataLoader:
            inputs = inputs.to(device=device)
            outputs = model(inputs)
            lossValues.append(loss(inputs, outputs).item())
    model.train()
    return np.array(lossValues)

In [None]:
trainBatchLoss = batchLossReport(trainDataLoader, DAEModel, batchMSELoss(trainDataLoader.batch_size))
testBatchLoss = batchLossReport(testDataLoader, DAEModel, batchMSELoss(testDataLoader.batch_size))

plt.plot(trainBatchLoss, color='blue', label = f"train, Avg MSE : {trainBatchLoss.mean()}")
plt.plot(testBatchLoss, color='red', label = f"test, Avg MSE : {testBatchLoss.mean()}")
plt.xlabel('Batch')
plt.ylabel('Avg batch MSE Loss')
plt.legend(loc='upper right')
plt.show()

## Display results

In [None]:
data, labels = getAllBatchData(testDataLoader)

In [None]:
%%capture
DAEModel.eval()
with torch.no_grad():
    DAECompression = DAEModel.encode(torch.from_numpy(data).float().to(device)).detach().cpu().numpy()
    DAEDecompression = DAEModel(torch.from_numpy(data).float().to(device)).detach().cpu().numpy()
DAEModel.train()

In [None]:
DAEDecompression = (DAEDecompression*255).astype(np.uint8)
displayOutputNumpy(DAEDecompression[:80], data[:80], dim=(28, 28),
                   title = f"DAE Reconstruction {DAEModel.layersStr()}",
                   fileName = f"./imgs/training/RBMReconstruct({DAEModel.layersStr()}).png")

In [None]:
fig = go.Figure(data=[go.Scatter3d(x=DAECompression[:, 0], y=DAECompression[:, 1], z=DAECompression[:, 2], text=labels,
            mode='markers',
           marker=dict(size=4,opacity=0.5, color=labels*2,colorscale='Turbo'), 
           name="data",
           hoverinfo='text',
      )])
# https://plotly.com/python/builtin-colorscales/
fig.show()

## PCA 

In [None]:
# data mean normalisation
dataMean = data.mean(axis = 0)
normalizedData = data - dataMean

covMat = np.cov(normalizedData, rowvar=False)

values, vectors = eigh(covMat) # finding eigen-values and corresponding eigen-vectors 
nb = 3

explainedVarianceSum = np.cumsum(values[::-1] / np.sum(values))[:nb]
principalEv = vectors[:, -nb:].T # keep only 3 components
print(f'Explained variance of {nb} main components : {explainedVarianceSum[-1]}')

projected = data @ principalEv.T
backProjected = projected @ principalEv
# back normalisation
backProjected =  backProjected + dataMean

In [None]:
displayOutputNumpy(backProjected[:80], data[:80], dim=(28, 28),
                   title = f"PCA Reconstruction {nb} (Explained variance: {explainedVarianceSum[-1]})",
                   fileName = f"./imgs/training/PCA({nb}).png")

In [None]:
fig = go.Figure(data=[go.Scatter3d(x=projected[:, 0], y=projected[:, 1], z=projected[:, 2], text=labels,
            mode='markers', 
           marker=dict(size=3,opacity=0.5, color=labels*2,colorscale='Turbo'), 
           name="data",
           hoverinfo='text',
      )])
fig.show()

# 

## 784-1000-500-250-30

In [None]:
RBMLayers30 = TrainsRBMLayers([784, 1000, 500, 250, 100, 30], batchMSELoss(trainDataLoader.batch_size), trainDataLoader,  savePrefixe = "02")

In [None]:
DAEModel30 = DAE(RBMLayers30).to(device)

In [None]:
batchSize = 256 # use bigger batch size for fine tinning
dataLoader = DataLoader(MNISTTrain, batch_size=batchSize, shuffle=True)

optimizer = torch.optim.Adam(DAEModel30.parameters(), lr=1e-3)
loss = batchMSELoss(batchSize)
epochsLoss = trainPytorchModel(DAEModel30, optimizer, loss=loss, dataLoader=dataLoader, epochs=150)

# display recored loss values
plt.plot(epochsLoss, color='blue')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

In [None]:
data, labels = getAllBatchData(testDataLoader)

In [None]:
%%capture
DAEModel30.eval()
with torch.no_grad():
    DAEDecompression30 = DAEModel30(torch.from_numpy(data).float().to(device)).detach().cpu().numpy()
DAEModel30.train()

In [None]:
DAEDecompression30 = (DAEDecompression30*255).astype(np.uint8)
displayOutputNumpy(DAEDecompression30[:80], data[:80], dim=(28, 28),
                   title = f"DAE Reconstruction {DAEModel30.layersStr()}",
                   fileName = f"./imgs/training/RBMReconstruct({DAEModel30.layersStr()}).png")

In [None]:
# PCA keep only 30 components
nb = 30
# data mean normalisation
dataMean = data.mean(axis = 0)
normalizedData = data - dataMean

covMat = np.cov(normalizedData, rowvar=False)

values, vectors = eigh(covMat)
explainedVarianceSum = np.cumsum(values[::-1] / np.sum(values))[:nb]
comps = vectors[:, -nb:].T
print(f'Explained variance of {nb} main components : {explainedVarianceSum[-1]}')

projected = data @ principalEv.T
backProjected = (data @ comps.T) @ comps
backProjected =  backProjected + dataMean

In [None]:
displayOutputNumpy(backProjected[:80], data[:80], dim=(28, 28),
                   title = f"PCA Reconstruction {nb} (Explained variance: {explainedVarianceSum[-1]})",
                   fileName = f"./imgs/training/PCA({nb}).png")

In [None]:
trainBatchLoss = batchLossReport(trainDataLoader, DAEModel30, batchMSELoss(trainDataLoader.batch_size))
testBatchLoss = batchLossReport(testDataLoader, DAEModel30, batchMSELoss(testDataLoader.batch_size))

plt.plot(trainBatchLoss, color='blue', label = f"train, Avg MSE : {trainBatchLoss.mean()}")
plt.plot(testBatchLoss, color='red', label = f"test , Avg MSE : {testBatchLoss.mean()}")
plt.xlabel('Batch')
plt.ylabel('Avg batch MSE Loss')
plt.legend(loc='upper right')
plt.show()