#### Import Libaries and set directories

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
from scipy import signal
from scipy.io import wavfile
import io
from PIL import Image
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

test_file = '/home/george/Documents/george_vae/testing/Y9_44814.45096055_9_10_12_31_36.wav'
all_audio_files = '/home/george/Documents/george_vae/testing/all_audio_files/' # path to all audio files
root = '/home/george/Documents/george_vae/testing/'

#### From the segments folder directory, extract all the files out of their day wise folders

In [None]:
for folder in os.listdir('/home/george/Documents/george_vae/testing/segments/'):
    # move every file from folder to the segments folder
    for file in os.listdir('/home/george/Documents/george_vae/testing/segments/' + folder):
        os.rename('/home/george/Documents/george_vae/testing/segments/' + folder + '/' + file, '/home/george/Documents/george_vae/testing/segments/' + str(folder) + file)

#### Take files from the segment folder and move it into the testing and training folders, I actually think there might be a bug and the split between training and testing is not occuring correctly although its not a big deal

In [None]:
# training data, validation, testing split
train_p = .7
test_p = .3

# get the amount of files in segments
# and split them into the three sets
file_list = os.listdir('/home/george/Documents/george_vae/testing/segments')
file_list = [file for file in file_list if file.endswith('.png')]

# randomize the order of the files
np.random.shuffle(file_list)

file_list_len = len(file_list)

num_train = file_list_len * train_p
num_train = int(math.floor(num_train))

num_validation = file_list_len * validation_p
num_validation = int(math.floor(num_validation))

num_test = file_list_len * test_p
num_test = int(math.floor(num_test))

# remove the rounding error files into the test category
num_test += file_list_len - (num_train + num_validation + num_test)

# move the num_train items from segments to root+ train 
for i in range(num_train):
    os.rename('/home/george/Documents/george_vae/testing/segments/' + file_list[i], '/home/george/Documents/george_vae/testing/train/1/' + file_list[i])

# move the num_validation items from segments to root+ validation
for i in range(num_validation):
    os.rename('/home/george/Documents/george_vae/testing/segments/' + file_list[i + num_train], '/home/george/Documents/george_vae/testing/validation/1/' + file_list[i + num_train])

#### Parameters for neural net!

In [None]:
latent_dims = 32
num_epochs = 100 #usually 100
batch_size = 32
learning_rate = 1e-3
variational_beta = 1
use_gpu = True

#### dataloaders 

In [None]:
import torchvision.transforms as transforms
import torchvision
from torch.utils.data import DataLoader

# pytorch, get the data from /train folder
# and transform it into a tensor
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Grayscale(num_output_channels=1),
    # normalize
    # transforms.Normalize((0.5,), (0.5,))
])

train_dataset = torchvision.datasets.ImageFolder(root=root+'train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

test_dataset = torchvision.datasets.ImageFolder(root=root+'test', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

## the actual neural network

In [None]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.batchNorm1 = nn.BatchNorm2d(1)
        self.conv1 = nn.Conv2d(1, 8, 3, 1, padding=1)
        self.batchNorm2 = nn.BatchNorm2d(8)
        self.conv2 = nn.Conv2d(8, 8, 3, 2, padding=1)
        self.batchNorm3 = nn.BatchNorm2d(8)
        self.conv3 = nn.Conv2d(8,16,3, 1, padding=1)
        self.batchNorm4 = nn.BatchNorm2d(16)
        self.conv4 = nn.Conv2d(16,16,3,2, padding=1)
        self.batchNorm5 = nn.BatchNorm2d(16)
        self.conv5 = nn.Conv2d(16,24,3,1, padding=1)
        self.batchNorm6 = nn.BatchNorm2d(24)
        self.conv6 = nn.Conv2d(24,24,3,2, padding=1)
        self.batchNorm7 = nn.BatchNorm2d(24)
        self.conv7 = nn.Conv2d(24,32,3,1, padding=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(8192, 1024)
        self.fc2 = nn.Linear(1024, 256)
        self.fc3 = nn.Linear(256, 64)
        
        self.fc_mu = nn.Linear(in_features=64, out_features=latent_dims)
        self.fc_logvar = nn.Linear(in_features=64, out_features=latent_dims)
            
    def forward(self, x):
        x = F.relu(self.conv1(self.batchNorm1(x)))
        x = F.relu(self.conv2(self.batchNorm2(x)))
        x = F.relu(self.conv3(self.batchNorm3(x)))
        x = F.relu(self.conv4(self.batchNorm4(x)))
        x = F.relu(self.conv5(self.batchNorm5(x)))
        x = F.relu(self.conv6(self.batchNorm6(x)))
        x = F.relu(self.conv7(self.batchNorm7(x)))

        x = self.flatten(x)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))

        x_mu = self.fc_mu(x)
        x_logvar = self.fc_logvar(x)

        return x_mu, x_logvar

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(in_features=latent_dims, out_features=64)
        self.fc2 = nn.Linear(in_features=64, out_features=256)
        self.fc3 = nn.Linear(in_features=256, out_features=1024)
        self.fc4 = nn.Linear(in_features=1024, out_features=8192)
        self.unflatten = nn.Unflatten(1, (32, 16, 16))

        self.conv1 = nn.ConvTranspose2d(32,24,3,1,padding=1)
        self.conv2 = nn.ConvTranspose2d(24,24,3,2,padding=1,output_padding=1)
        self.conv3 = nn.ConvTranspose2d(24,16,3,1,padding=1)
        self.conv4 = nn.ConvTranspose2d(16,16,3,2,padding=1,output_padding=1)
        self.conv5 = nn.ConvTranspose2d(16,8,3,1,padding=1)
        self.conv6 = nn.ConvTranspose2d(8,8,3,2,padding=1,output_padding=1)
        self.conv7 = nn.ConvTranspose2d(8,1,3,1,padding=1)
        self.batchNorm1 = nn.BatchNorm2d(32)
        self.batchNorm2 = nn.BatchNorm2d(24)
        self.batchNorm3 = nn.BatchNorm2d(24)
        self.batchNorm4 = nn.BatchNorm2d(16)
        self.batchNorm5 = nn.BatchNorm2d(16)
        self.batchNorm6 = nn.BatchNorm2d(8)
        self.batchNorm7 = nn.BatchNorm2d(8)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        
        x = self.unflatten(x)

        x = F.relu(self.conv1(self.batchNorm1(x)))
        x = F.relu(self.conv2(self.batchNorm2(x)))
        x = F.relu(self.conv3(self.batchNorm3(x)))
        x = F.relu(self.conv4(self.batchNorm4(x)))
        x = F.relu(self.conv5(self.batchNorm5(x)))
        x = F.relu(self.conv6(self.batchNorm6(x)))
        x = F.relu(self.conv7(self.batchNorm7(x)))

        x = F.relu(x) # last layer before output is sigmoid, since we are using BCE as reconstruction loss

        return x


## the code below was heavily inspired by a VAE tutorial, but I can't find it and thus can't credit it :(
class VariationalAutoencoder(nn.Module):
    def __init__(self):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
    
    def forward(self, x):
        latent_mu, latent_logvar = self.encoder(x)

        latent = self.latent_sample(latent_mu, latent_logvar)

        x_recon = self.decoder(latent)

        return x_recon, latent_mu, latent_logvar
    
    def latent_sample(self, mu, logvar):
        if self.training:
            # the reparameterization trick
            std = logvar.mul(0.5).exp_()
            eps = torch.empty_like(std).normal_()
            return eps.mul(std).add_(mu)
        else:
            return mu
    
def vae_loss(recon_x, x, mu, logvar):
    flatten = nn.Flatten()
    flattened_recon = flatten(recon_x)
    flattened_x = flatten(x)
    recon_loss = F.mse_loss(flattened_recon, flattened_x, reduction='sum')
    kldivergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    
    return recon_loss + kldivergence * variational_beta
    
    
vae = VariationalAutoencoder()

device = torch.device("cuda:0" if use_gpu and torch.cuda.is_available() else "cpu")
vae = vae.to(device)

num_params = sum(p.numel() for p in vae.parameters() if p.requires_grad)
print('Number of parameters: %d' % num_params)

#### Training steps

In [None]:
optimizer = torch.optim.Adam(params=vae.parameters(), lr=learning_rate, weight_decay=1e-5)

# set to training mode
vae.train()


print('Training ...')
for epoch in range(num_epochs):

    train_loss_avg = []
    validation_loss = []

    for image_batch, _ in train_loader:
        
        image_batch = image_batch.to(device)

        # vae reconstruction
        image_batch_recon, latent_mu, latent_logvar = vae(image_batch)

        # reconstruction error
        loss = vae_loss(image_batch_recon, image_batch, latent_mu, latent_logvar)
        
        # backpropagation
        optimizer.zero_grad()
        loss.backward()
        
        # one step of the optmizer (using the gradients from backpropagation)
        optimizer.step()
        
        train_loss_avg.append(loss.item())

    for image_batch, _ in test_loader:
        image_batch = image_batch.to(device)
        image_batch_recon, latent_mu, latent_logvar = vae(image_batch)
        loss = vae_loss(image_batch_recon, image_batch, latent_mu, latent_logvar)
        validation_loss.append(loss.item())

    if epoch % 10 == 0:
        torch.save(vae.state_dict(), '/home/george/Documents/george_vae/testing/training_checkpoints/vae_epoch_{}.pth'.format(epoch))

    print(f'Epoch: {epoch}')
    print(f"train loss: {np.mean(train_loss_avg)}")
    print(f"validation loss: {np.mean(validation_loss)}")

#### If you want to load a previous checkpoint

In [None]:
# load torch model
vae.load_state_dict(torch.load('/home/george/Documents/george_vae/testing/training_checkpoints/vae_epoch_20.pth'))

#### Before and after reconstruction visual

In [None]:
import numpy as np
import matplotlib.pyplot as plt
plt.ion()

import torchvision.utils

vae.eval()

# This function takes as an input the images to reconstruct
# and the name of the model with which the reconstructions
# are performed
def to_img(x):
    x = x.clamp(0, 1)
    return x

def show_image(img):
    img = to_img(img)
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))

def visualise_output(images, model):

    with torch.no_grad():
    
        images = images.to(device)
        images, _, _ = model(images)
        images = images.cpu()
        images = to_img(images)
        np_imagegrid = torchvision.utils.make_grid(images[1:50], 10, 5).numpy()
        plt.imshow(np.transpose(np_imagegrid, (1, 2, 0)))
        plt.show()

images, labels = iter(test_loader).next()

# First visualise the original images
print('Original images')
show_image(torchvision.utils.make_grid(images[1:50],10,5))
plt.show()

# Reconstruct and visualise the images using the vae
print('VAE reconstruction:')
visualise_output(images, vae)

# The next part of the notebook is concerned with creating UMAP representaions 

In [None]:
import pandas as pd

# contains a list of batches which contain the latent space variables
list_of_latent_averages = []
list_of_images = []

for image_batch, _ in train_loader:
    image_batch = image_batch.to(device)

    # returns tuple of average and std of latent space in that order
    output = vae.encoder(image_batch)
    output = output[0]

    # for the visualization
    image_batch.squeeze_(1)

    # convert to numpy array
    output = output.cpu()
    output = output.detach().numpy()
    image_batch = image_batch.cpu()
    image_batch = image_batch.detach().numpy()

    for latent_vector in output:
        list_of_latent_averages.append(latent_vector)

    for image in image_batch:
        list_of_images.append(image)

# convert list of latent averages to a numpy array
list_of_latent_averages = np.array(list_of_latent_averages)
list_of_images = np.array(list_of_images)

In [None]:
import umap.umap_ as umap

reducer = umap.UMAP(random_state=42, n_neighbors=20, min_dist=0.1, n_components=2, metric='euclidean')
reducer.fit(list_of_latent_averages)

In [None]:
embedding = reducer.transform(list_of_latent_averages)
# Verify that the result of calling transform is
# idenitical to accessing the embedding_ attribute
assert(np.all(embedding == reducer.embedding_))
embedding.shape

In [None]:
plt.scatter(embedding[:, 0], embedding[:, 1], cmap='Spectral', s=5)
plt.gca().set_aspect('equal', 'datalim')
plt.colorbar(boundaries=np.arange(11)-0.5).set_ticks(np.arange(10))
plt.title('UMAP projection ~230ms pre-transection data', fontsize=24);

plt.show()

In [None]:
from io import BytesIO
from PIL import Image
import base64
import pickle
import io

def embeddable_image(data):
    data = (data * 255).astype(np.uint8)
    # convert to uint8
    data = np.uint8(data)
    image = Image.fromarray(data)
    image = image.convert('RGB')
    # show PIL image
    im_file = BytesIO()
    img_save = image.save(im_file, format='PNG')
    im_bytes = im_file.getvalue()

    img_str = "data:image/png;base64," + base64.b64encode(im_bytes).decode()
    return img_str

base64_str = embeddable_image(list_of_images[0])


# save base64_str to a file
with open('base64_str.txt', 'w') as f:
    f.write(base64_str)


from bokeh.plotting import figure, show, output_notebook, output_file, save
from bokeh.models import HoverTool, ColumnDataSource, CategoricalColorMapper
from bokeh.palettes import Spectral10

output_file(filename='umap.html')

digits_df = pd.DataFrame(embedding, columns=('x', 'y'))
digits_df['image'] = list(map(embeddable_image, list_of_images))

datasource = ColumnDataSource(digits_df)

plot_figure = figure(
    title='UMAP',
    plot_width=600,
    plot_height=600,
    tools=('pan, wheel_zoom, reset')
)

plot_figure.add_tools(HoverTool(tooltips="""
<div>
    <div>
        <img src='@image' style='float: left; margin: 5px 5px 5px 5px'/>
    </div>
</div>
"""))

plot_figure.circle(
    'x',
    'y',
    source=datasource,
    fill_color = 'gray'
)

save(plot_figure)

## Remove the files that fall outside of the bounding box

In [None]:
import torchvision.transforms.functional as TF

# produces a list of files that are good for training
# iterate through src folder, put each of the files through the VAE and get the latent space variables
# then create an embedding through a reducer
# if the values are within the range x1, x2, y1, y2 then add the file to a text file that contains the list of files to train on 

x1 = -4
x2 = 12
y1 = -10
y2 = 6 

imgs = '/home/george/Documents/george_vae/testing/DoubleDerivative/'

# delete good_files.txt if it exists
try:
    os.remove('good_files.txt')
except OSError:
    pass

array_of_latent_vectors = []
array_of_names = []

for folder in os.listdir(imgs):
    for file in os.listdir(imgs + folder):
        image = Image.open(imgs + folder + '/' + file)
        image = TF.to_grayscale(image, num_output_channels=1)
        image = TF.to_tensor(image)
        # gray scale the image
        image = image.to(device)
        # make 32 copies of the image in a single batch dimension
        image = image.unsqueeze(0).repeat(32, 1, 1, 1)

        latent_vector = vae.encoder(image)
        # get the mean of the latent vector
        latent_vector = latent_vector[0]
        # get the first image of the batch, the bottom and above statements are NOT redundant
        latent_vector = latent_vector[0]
        latent_vector = latent_vector.cpu()
        latent_vector = latent_vector.detach().numpy()
        array_of_latent_vectors.append(latent_vector)
        array_of_names.append(str(folder + ':' + file))


embeddings = reducer.transform(array_of_latent_vectors)

for embedding in embeddings:
    if embedding[0] > x1 and embedding[0] < x2 and embedding[1] > y1 and embedding[1] < y2:
        # get the index of the embedding
        index = np.where(embeddings == embedding)
        # get the name of the image
        name = array_of_names[index[0][0]]
        # write the name to a file
        with open('good_files.txt', 'a') as f:
            f.write(name)
            f.write('\n')

## Creates a GIF 

In [None]:
import pandas as pd
import torchvision.transforms.functional as TF
import datetime as dt 

dir = '/home/george/Documents/george_vae/testing/UMAP_Over_Time_Y11/'
imageDir = '/home/george/Documents/george_vae/testing/UMAP_Over_Time_Images/'

list_of_latent_vectors = []
list_of_embeddings = []

# set the start date to october 1st 2022
start_date = dt.datetime(2022, 10, 11)

for file in range(1, 37):
    list_of_latent_vectors = []

    dir = '/home/george/Documents/george_vae/testing/UMAP_Over_Time_Y11/' + str(file) + '/'
    for x in os.listdir(dir):
        image = Image.open(dir + str(x))
        image = TF.to_grayscale(image, num_output_channels=1)
        image = TF.to_tensor(image)
        # gray scale the image
        image = image.to(device)
        # make 32 copies of the image in a single batch dimension
        image = image.unsqueeze(0).repeat(32, 1, 1, 1)

        latent_vector = vae.encoder(image)
        # get the mean of the latent vector
        latent_vector = latent_vector[0]
        # get the first image of the batch, the bottom and above statements are NOT redundant
        latent_vector = latent_vector[0]
        latent_vector = latent_vector.cpu()
        latent_vector = latent_vector.detach().numpy()
        list_of_latent_vectors.append(latent_vector)

    # convert list of latent averages to a numpy array
    list_of_latent_vectors = np.array(list_of_latent_vectors)
    embedding = reducer.transform(list_of_latent_vectors)
    list_of_embeddings.append(embedding)


    if file < 10:
        plt.scatter(embedding[:, 0], embedding[:, 1], cmap='Spectral', s=5, color='blue')
    else:
        plt.scatter(embedding[:, 0], embedding[:, 1], cmap='Spectral', s=5, color='red')

    # add a title, start_date to string
    plt.title('date:' + str(start_date.date()), fontsize=24)

    # iterate date by one day 
    start_date = start_date + dt.timedelta(days=1)

    # plt.scatter(before_embedding[:, 0], before_embedding[:, 1], cmap='Spectral', s=5, color='red')
    plt.gca().set_aspect('equal', 'datalim')
    # set axis from -10 to 15 both x and y 
    plt.xlim(-10, 25)
    plt.ylim(-10, 25)
    # save
    plt.savefig(imageDir + str(file) + 'UMAP.png')
    # reset plt
    plt.clf()
    plt.cla()

## Creates an interactive plot for selective dates

In [None]:
import pandas as pd
import torchvision.transforms.functional as TF
import datetime as dt 

UMAP_file_num = 7

dir = '/home/george/Documents/george_vae/testing/UMAP_Over_Time_Y11/' + str(UMAP_file_num) + '/'

list_of_images = []
list_of_latent_vectors = []

for file in os.listdir(dir):
    # read the image
    image = Image.open(dir + str(file))
    # convert to grayscale
    image = Image.open(dir + str(file))
    image = TF.to_grayscale(image, num_output_channels=1)
    image = TF.to_tensor(image)
    # gray scale the image
    # squeeze the image
    x = image.squeeze(0)
    x = np.array(x)
    list_of_images.append(x)

    image = image.to(device)
    image = image.unsqueeze(0).repeat(32, 1, 1, 1)
    latent_vector = vae.encoder(image)
    # get the mean of the latent vector
    latent_vector = latent_vector[0]
    # get the first image of the batch, the bottom and above statements are NOT redundant
    latent_vector = latent_vector[0]
    latent_vector = latent_vector.cpu()
    latent_vector = latent_vector.detach().numpy()
    list_of_latent_vectors.append(latent_vector)

# convert list of latent averages to a numpy array
list_of_latent_vectors = np.array(list_of_latent_vectors)
list_of_images = np.array(list_of_images)
embedding = reducer.transform(list_of_latent_vectors)

In [None]:
from io import BytesIO
from PIL import Image
import base64
import pickle
import io

def embeddable_image(data):
    data = (data * 255).astype(np.uint8)
    # convert to uint8
    data = np.uint8(data)
    image = Image.fromarray(data)
    image = image.convert('RGB')
    # show PIL image
    im_file = BytesIO()
    img_save = image.save(im_file, format='PNG')
    im_bytes = im_file.getvalue()

    img_str = "data:image/png;base64," + base64.b64encode(im_bytes).decode()
    return img_str

# base64_str = embeddable_image(list_of_images[0])


# # save base64_str to a file
# with open('base64_str.txt', 'w') as f:
#     f.write(base64_str)


from bokeh.plotting import figure, show, output_notebook, output_file, save
from bokeh.models import HoverTool, ColumnDataSource, CategoricalColorMapper
from bokeh.palettes import Spectral10

output_file(filename='umap' + str(UMAP_file_num) + '.html')

digits_df = pd.DataFrame(embedding, columns=('x', 'y'))
digits_df['image'] = list(map(embeddable_image, list_of_images))

datasource = ColumnDataSource(digits_df)

plot_figure = figure(
    title='UMAP',
    plot_width=600,
    plot_height=600,
    tools=('pan, wheel_zoom, reset')
)

plot_figure.add_tools(HoverTool(tooltips="""
<div>
    <div>
        <img src='@image' style='float: left; margin: 5px 5px 5px 5px'/>
    </div>
</div>
"""))

plot_figure.circle(
    'x',
    'y',
    source=datasource,
    fill_color = 'gray'
)

save(plot_figure)