In [None]:
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [None]:
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)

# Config

In [None]:
test_split = 0.05
validation_split = 0.05

learning_rate = 1e-3
epochs = 5
batch_size = 128

input_size = 300
bottleneck_size = 200

use_hidden_layer = False
hidden_size = 225

noise_type = 'masking' # additive, masking or none
noise_factor = 0.3

# Only when noise_type = masking
alpha_weight = 1 
beta_weight = 0.5

experiment_name = f"autoencoder_300to{bottleneck_size}_v3.1"
dataset = "data/embeddings/base/clipped.glove.6B.300d.txt"

In [None]:
gpu = torch.cuda.is_available()
device = torch.device("cuda" if gpu else "cpu")

In [None]:
print(gpu, device)

# Load & Prepare Embeddings for Training

In [None]:
words = []
vectors = []
with open(dataset, "r", encoding='utf8') as fp:
    for line in fp:
        line = line.split()
        word = line[0]
        vector = np.asarray(line[1:], 'float32')
        words.append(word)
        vectors.append(vector)
vectors = torch.from_numpy(np.asarray(vectors))

In [None]:
test_split = int(test_split * len(words))
validation_split = int(validation_split* len(words))

In [None]:
print(len(words), vectors.shape)
print(test_split, validation_split)

In [None]:
train_vectors, test_vectors, train_words, test_words = train_test_split(
    vectors, words, test_size=test_split, random_state=seed
)

In [None]:
train_vectors, validation_vectors, train_words, validation_words = train_test_split(
    train_vectors, train_words, test_size=validation_split, random_state=seed
)

In [None]:
train_vectors.shape, validation_vectors.shape, test_vectors.shape

In [None]:
# Note: We don't actually use these words since the model doesn't care about them.
# We just compute them in case we want to check some particular word or something.
len(train_words), len(validation_words), len(test_words)

In [None]:
train_dataloader = torch.utils.data.DataLoader(
    train_vectors, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=gpu
)
validation_dataloader = torch.utils.data.DataLoader(
    validation_vectors, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=gpu
)
test_dataloader = torch.utils.data.DataLoader(
    test_vectors, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=gpu
)

# Model Time

In [None]:
class AutoEncoderWithoutHiddenLayer(nn.Module):
    def __init__(self, input_size, bottleneck_size):
        super().__init__()
        self.encoder = nn.Linear(in_features=input_size, out_features=bottleneck_size)
        self.encoder_activation = nn.Tanh()
        
        self.decoder = nn.Linear(in_features=bottleneck_size, out_features=input_size)
        self.decoder_activation = nn.Tanh()
        
        self.decoder.weight = nn.Parameter(self.encoder.weight.transpose(0,1))
    
    def forward(self, features):
        latent_representation = self.encoder_activation(self.encoder(features))
        reconstructed_input = self.decoder_activation(self.decoder(latent_representation))
        return reconstructed_input
    
    def encode(self, features):
        return self.encoder_activation(self.encoder(features))

In [None]:
class AutoEncoderWithHiddenLayer(nn.Module):
    def __init__(self, input_size, hidden_size, bottleneck_size):
        super().__init__()
        self.encoder_input = nn.Linear(in_features=input_size, out_features=hidden_size)
        self.encoder_input_activation = nn.ReLU(True)
        self.encoder_hidden = nn.Linear(in_features=hidden_size, out_features=bottleneck_size)
        self.encoder_hidden_activation = nn.Tanh()
        
        self.decoder_hidden = nn.Linear(in_features=bottleneck_size, out_features=hidden_size)
        self.decoder_hidden_activation = nn.ReLU(True)
        self.decoder_output = nn.Linear(in_features=hidden_size, out_features=input_size)
        self.decoder_output_activation = nn.Tanh()
        
        self.decoder_hidden.weight = nn.Parameter(self.encoder_hidden.weight.transpose(0,1))
        self.decoder_output.weight = nn.Parameter(self.encoder_input.weight.transpose(0,1))
        
        self.encoder = [
            self.encoder_input, self.encoder_input_activation, self.encoder_hidden, self.encoder_hidden_activation
        ]
        self.decoder = [
            self.decoder_hidden, self.decoder_hidden_activation, self.decoder_output, self.decoder_output_activation
        ]
    
    def forward(self, features):
        # Encoder
        for layer in self.encoder:
            features = layer(features)
        # Decoder
        for layer in self.decoder:
            features = layer(features)
        return features
    
    def encode(self, features):
        for layer in self.encoder:
            features = layer(features)
        return features

In [None]:
if use_hidden_layer:
    model = AutoEncoderWithHiddenLayer(input_size, hidden_size, bottleneck_size).to(device)
else:
    model = AutoEncoderWithoutHiddenLayer(input_size, bottleneck_size).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

In [None]:
model

In [None]:
def create_masking_noise(shape, amount):
    """
    Shape: Shape of the noise matrix
    Amount: The amount of the matrix that should be masked (zero'd) out
    """
    mask = np.ones(shape)
    amount = int(shape[0] * amount)
    mask[:, :amount] = 0
    for x in mask:
        np.random.shuffle(x)
    return torch.from_numpy(mask.astype(np.float32))

In [None]:
for epoch in range(epochs):
    train_loss = 0
    validation_loss = 0
    
    # Training Loop
    for iteration, batch in enumerate(tqdm(train_dataloader)):
        # Reset gradients back to zero for this iteration
        optimizer.zero_grad()
        
        if noise_type == 'additive':
            # Add noise to our inputs
            noise = torch.randn(batch.shape) * noise_factor
            noisy_batch = torch.clamp(batch + noise, -1, +1)
            
            # Move batch to device
            noisy_batch = noisy_batch.to(device)

            # Run our model & get outputs
            outputs = model(noisy_batch)
            
            # Calculate reconstruction loss
            batch_loss = criterion(outputs, batch)
        elif noise_type == 'masking':
            # Create masking noise and mask inputs
            noise = create_masking_noise(batch.shape, noise_factor)
            masked_batch = np.multiply(batch, noise)
            
            # Move batch to device
            masked_batch = masked_batch.to(device)
            
            # Run model & get outputs
            outputs = model(masked_batch)
            
            # Calculate reconstruction loss
            # We calculate the error for the masked dimensions separately from the unmasked ones
            # We can then assign a weight to each of the two components 
            unmasked_error = criterion(outputs * noise, masked_batch)
            masked_error = criterion(outputs * (1 - noise), batch * (1 - noise))
            batch_loss = (alpha_weight * masked_error) + (beta_weight * unmasked_error)
        else:
            # Move batch to device
            batch = batch.to(device)

            # Run our model & get outputs
            outputs = model(batch)
            
            # Calculate reconstruction loss
            batch_loss = criterion(outputs, batch)
                  
        # Backprop
        batch_loss.backward()
        
        # Update our optimizer parameters
        optimizer.step()
        
        # Add the batch's loss to the total loss for the epoch
        train_loss += batch_loss.item()
        
    # Validation Loop
    with torch.no_grad():
        for iteration, batch in enumerate(tqdm(validation_dataloader)):
            # Move batch to device
            batch = batch.to(device)

            # Run our model & get outputs
            outputs = model(batch)

            # Calculate reconstruction loss
            batch_loss = criterion(outputs, batch)

            # Add the batch's loss to the total loss for the epoch
            validation_loss += batch_loss.item()
    
    # Compute the average losses for this epoch
    train_loss = train_loss / len(train_dataloader)
    validation_loss = validation_loss / len(validation_dataloader)
    
    # Print Metrics
    print(
        f"Epoch: {epoch+1}/{epochs}, Train Reconstruction Loss = {train_loss}, \
        Validation Reconstruction Loss = {validation_loss}"
    )

# Test Model

In [None]:
model.eval()

In [None]:
reconstruction_loss = 0

# Testing Loop
with torch.no_grad():
    for iteration, batch in enumerate(tqdm(test_dataloader)):
        # Move batch to device
        batch = batch.to(device)
        
        # Run our model & get outputs
        outputs = model(batch)

        # Calculate reconstruction loss
        batch_loss = criterion(outputs, batch)

        # Add the batch's loss to the total loss for the epoch
        reconstruction_loss += batch_loss.item()

# Compute the average losses for this epoch
reconstruction_loss = reconstruction_loss / len(test_dataloader)

# Print Metrics
print(
    f"Test Reconstruction Loss = {reconstruction_loss}"
)

# Generate Latent Embeddings

In [None]:
latent_vectors = {}
with torch.no_grad():
    for i, (word, vector) in enumerate(tqdm(zip(words, vectors))):
        latent_vectors[word] = model.encode(vector)

In [None]:
len(latent_vectors)

In [None]:
latent_vectors['the']

# Save Model & Embeddings

In [None]:
torch.save(model.state_dict(), f"models/{experiment_name}.pt")

In [None]:
# Need to convert the latent embeddings into the glove format
# word dim1 dim2 dim3 dim4 ... dimX
lines = []
for i, (word, vector) in tqdm(enumerate(latent_vectors.items())):
    line = [word] + [str(x) for x in vector.tolist()]
    lines.append(' '.join(line))

In [None]:
with open(f"data/embeddings/trained/{experiment_name}.glove.6B.300d.txt", "w", encoding="utf-8") as fp:
    fp.write("\n".join(lines))