Importing my libraries

In [None]:
!pip install transformers
import tarfile
import numpy as np
import cupy as cp
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

import torch
import random
import numpy as np

SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True





In [47]:
# transformers and the tokenzier for transformers
from transformers import BertTokenizer, BertModel
from transformers import CamembertTokenizer, CamembertModel
import torch.nn as nn
# the french tokenizer

tokenizer_en = BertTokenizer.from_pretrained('bert-base-uncased')
tokenizer_fr = CamembertTokenizer.from_pretrained('camembert-base')

# declaring special tokens

bert_en = BertModel.from_pretrained('bert-base-uncased')
bert_fr = CamembertModel.from_pretrained('camembert-base')

# Freeze the BERTs model
for param in bert_en.parameters():
    param.requires_grad = False


# Freeze the BERT model
for param in bert_fr.parameters():
    param.requires_grad = False

def tokenize_and_cut(sentence, tokenizer, max_input_length = 512):
    tokens = tokenizer.tokenize(sentence)
    if len(tokens) > max_input_length: # start and end toks
        # Truncate the tokens
        tokens = tokens[:max_input_length]
    elif len(tokens) < max_input_length:
        # Pad the tokens
        tokens += [tokenizer.pad_token] * (max_input_length - len(tokens))
    return tokens
# the tokenizer function

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [None]:
#Test and tokenzie
import torch
tokens = tokenize_and_cut("Comel is  are you?", tokenizer_fr)
print(tokens)
input_ids = tokenizer_fr.convert_tokens_to_ids(tokens)
print(input_ids)
input_tensor = torch.tensor([input_ids])

embedding = bert_fr(input_tensor)[0]
print(embedding)

['▁Com', 'el', '▁is', '▁are', '▁you', '?']
[2650, 647, 2856, 9581, 4835, 197]
tensor([[[ 0.0057,  0.1593,  0.1688,  ..., -0.0633, -0.0175,  0.0133],
         [ 0.0319,  0.6169, -0.0538,  ..., -0.0099,  0.0261,  0.1721],
         [ 0.0476,  0.1551, -0.0816,  ...,  0.0124, -0.0178,  0.3162],
         [ 0.0437,  0.0374, -0.1425,  ...,  0.0432, -0.0914,  0.2645],
         [ 0.2930,  0.0210,  0.0599,  ..., -0.0242,  0.0299,  0.1882],
         [ 0.1255, -0.0644, -0.1305,  ..., -0.0192,  0.2340,  0.2966]]],
       grad_fn=<NativeLayerNormBackward0>)


#Creating helper functions to process the data

In [77]:
# data loaded from the en and fr files in the project
import torch.nn.functional as F

def load_data(file_path):
    with open(file_path, encoding='utf-8') as file:
        lines = [line.strip().lower() for line in file if line.strip()]
    return lines

def preprocess(lang_sentences, percentages, berts, bert_tokenizers, DEVICE="cuda", max_len=10):
    assert len(berts) == len(bert_tokenizers) == len(percentages), "The lengths of berts, tokenizer, and percentages must be the same."

    for i in range(len(berts)):

      berts[i].to(DEVICE)

    languages_mashed = []
    for sentences in zip(*lang_sentences):
        sentence_embeddings = []
        for lang_index, sentence in enumerate(sentences):
            # Tokenize and cut the sentence
            tokens = tokenize_and_cut(sentence, bert_tokenizers[lang_index], 10)
            input_ids = bert_tokenizers[lang_index].convert_tokens_to_ids(tokens)

            input_tensor = torch.tensor([input_ids]).to(DEVICE)

            embedding = berts[lang_index](input_tensor)[0].detach()
            sentence_embeddings.append(embedding)

        combined_embedding = sum([emb * perc for emb, perc in zip(sentence_embeddings, percentages)])
        languages_mashed.append(combined_embedding)
    return languages_mashed

#PreProccessing and creating data

In [125]:

# Load English and French data
import numpy as np

english_sentences = load_data('europarl-v7.fr-en.en')
french_sentences = load_data('europarl-v7.fr-en.fr')
print(len(english_sentences))
print(len(french_sentences))
print(english_sentences[1])
print(french_sentences[1])
# **mish mash** with 0.5 and 0.5 percentage points
mashed_sentences = preprocess([english_sentences, french_sentences],
                          [0.5, 0.5],
                          [bert_en, bert_fr],
                          [tokenizer_en, tokenizer_fr],
                          "cuda")
print(mashed_sentences)
np.save('mashed_sentences_fr_en_50_50_w_bert_no_rand.npy', mashed_sentences) # saving data so doesnt haev to be loaded again

263192
213802
i declare resumed the session of the european parliament adjourned on friday 17 december 1999, and i would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.
je déclare reprise la session du parlement européen qui avait été interrompue le vendredi 17 décembre dernier et je vous renouvelle tous mes vux en espérant que vous avez passé de bonnes vacances.


KeyboardInterrupt: 

#Here after processing the data it can be loaded with pytorch data loader object

In [79]:
from torch.utils.data import Dataset, DataLoader

class MashedDataset(Dataset):
  def __init__(self, data):
        self.data = data

  def __len__(self):
      return len(self.data)

  def __getitem__(self, idx):
      return self.data[idx]

mashed_sentences = np.load('mashed_sentences_fr_en_50_50_w_bert_no_rand.npy')
print(mashed_sentences.shape)

#mashed_sentences = mashed_sentences[:]
# only using first 100
print(mashed_sentences.shape)

mashed_sentences_dataset = MashedDataset(torch.from_numpy(mashed_sentences))
# make sure to from numpy it

batch_size = 32
mashed_sentences_data_loader = DataLoader(mashed_sentences_dataset,
                                   batch_size=batch_size,
                                   shuffle=True,
                                    drop_last=True)

(2, 1, 10, 768)
(2, 1, 10, 768)


#Here is the onehot outputs for characters and their encodings

In [118]:
one_hot_characters = ['a', 'à','â', 'æ', 'b', 'c', 'ç', 'd', 'e', 'é', 'è', 'ê',
                      'ë', 'œ', 'f', 'g', 'h', 'i', 'î', 'ï', 'j', 'k', 'l',
                      'm', 'n', 'o', 'ô', 'p', 'q', 'r', 's', 't', 'u', 'ù','û',
                      'ü', 'v', 'w', 'x', 'y', 'ÿ' 'z', "'", ' ', ' ', ' ']

                      # space character and ' included forcing more spaces
# this is both english and french characters discluding the overlap
# capitals are **banned** and arent used
# helper dictionaries for conversions
char_to_index = {char: index for index, char in enumerate(one_hot_characters)}
index_to_char = {index: char for index, char in enumerate(one_hot_characters)}


#Generator Architecture

The way this works is it will take in some length vector and then from it it will create the one hot matrix which represents the generated mish mashed sentence


In [119]:
class Generator(nn.Module):
    def __init__(self, input_size=100, seq_length=60):
        # since only at max 10 words maybe 60 characters can be outputted max
        super(Generator, self).__init__()
        self.seq_length = seq_length
        self.gru = nn.GRU(input_size, 256, num_layers=1, batch_first=True)
        self.relu = nn.ReLU()
        self.linear = nn.Linear(256, len(one_hot_characters))
        self.softmax = nn.Softmax(dim=2)

        # Initialize weights
        self.init_weights()

    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.GRU):
                nn.init.xavier_uniform_(m.weight_ih_l0)
                nn.init.xavier_uniform_(m.weight_hh_l0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)

    def forward(self, x, temperature=1.0):
    # Repeat the input noise vector seq_length times to create a sequence
      x = x.repeat(1, self.seq_length, 1)
      out, _ = self.gru(x)  # Only take the output, ignore the hidden state
      out = self.relu(out)
      out = self.linear(out)
      out = out / temperature  # Apply the temperature parameter
      out = self.softmax(out)
      return out


#Call the generator to see what it outputs untrained

In [120]:
def generate_sequences(generator, noise, DEVICE='cuda'):
    # Initialize an empty list to store the sequences
    sequences = []
    # Iterate over the batch dimension of the noise tensor
    for i in range(noise.size(0)):
        # Generate a sequence of one-hot vectors for each noise vector
        one_hot_sequence = generator(noise[i].unsqueeze(0).to(DEVICE), 1)
        # Convert the one-hot vectors to character indices
        char_indices = torch.argmax(one_hot_sequence, dim=2)
        # Convert the character indices to characters
        sequence = ''.join(index_to_char[index.item()] for index in char_indices[0])
        sequences.append(sequence)
    return sequences

# this is just for testing not important
noise = torch.randn(2, 100)
generator = Generator().to('cuda')
sequences = generate_sequences(generator, noise)

print(sequences)

# ill embed this into english and then french and combine it to see what should
# happen from it

def sequence_to_mash_embed(sequences, percentages, berts, bert_tokenizers, DEVICE="cuda"):
    assert len(berts) == len(bert_tokenizers) == len(percentages), "The lengths of berts, tokenizer, and percentages must be the same."

    # Initialize an empty list to store the embeddings
    mashed_embeddings = []

    for i in range(len(berts)): # the sequences are tensors
      berts[i].to(DEVICE)

    for sequence in sequences:
        sentence_embeddings = []
        for lang_index in range(len((berts))):
            # Tokenize and cut the sentence
            tokens = tokenize_and_cut(sequence, bert_tokenizers[lang_index], 10)
            input_ids = bert_tokenizers[lang_index].convert_tokens_to_ids(tokens)

            input_tensor = torch.tensor([input_ids]).to(DEVICE)

            embedding = berts[lang_index](input_tensor)[0].detach()
            sentence_embeddings.append(embedding)

        combined_embedding = sum([emb * perc for emb, perc in zip(sentence_embeddings, percentages)])
        combined_embedding = combined_embedding.squeeze(0)  # Remove the extra dimension
        mashed_embeddings.append(combined_embedding)
    return mashed_embeddings

mashed_embedding = sequence_to_mash_embed(sequences, [0.5, 0.5],
                          [bert_en, bert_fr],
                          [tokenizer_en, tokenizer_fr],
                          "cuda")
print(mashed_embedding) # this might work
# otherwise some other embedding scheme needs to be defined for the mashed language


['cèèèèèèddddddddddddddddddddddddddddddddddddddddddddddddddddd', 'mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm']
[tensor([[-2.6855e-01, -4.3689e-02,  5.3275e-01,  ...,  2.3556e-02,
         -1.4298e-02,  7.4760e-02],
        [-2.6275e-01, -2.4350e-01,  5.9643e-01,  ...,  1.6972e-02,
         -1.3369e-01, -6.7206e-02],
        [-2.5890e-01, -2.3672e-01,  6.2150e-01,  ..., -1.1403e-02,
         -1.1334e-01, -1.0422e-01],
        ...,
        [-2.8967e-01, -6.2265e-02,  5.1663e-01,  ..., -7.2447e-02,
         -1.1124e-01, -3.1652e-02],
        [-2.8120e-01,  5.8504e-04,  4.5349e-01,  ..., -2.8149e-02,
         -7.1692e-02, -3.6999e-03],
        [-2.5462e-01,  1.4983e-01,  3.5797e-01,  ..., -1.7987e-02,
         -3.8703e-02,  3.3757e-02]]), tensor([[ 1.2189e-01, -9.1069e-02,  7.1602e-02,  ..., -6.9725e-02,
          2.1142e-02,  1.4173e-01],
        [ 2.3811e-01,  2.1341e-01,  1.7054e-01,  ...,  3.0200e-01,
         -1.4037e-01,  5.4780e-02],
        [ 2.2187e-01,  2.3537e-

#Discriminator Architecture

This has to figure out if something is real or fake

In [121]:
class Discriminator(nn.Module):
    def __init__(self, input_size=768, hidden_size=128):
        super(Discriminator, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers=1, batch_first=True)
        self.relu = nn.ReLU()
        self.linear = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out, _ = self.gru(x)  # Only take the output, ignore the hidden state
        out = self.relu(out)
        out = self.linear(out[:, -1, :])  # Only take the last output of the sequence
        out = self.sigmoid(out)
        return out

#Testing the untrained discriminator on the previous embeddings

In [122]:
# Convert the numpy array to a PyTorch tensor

mashed_embedding_tensor = torch.stack(mashed_embedding)

# Pass the tensor to the discriminator
discriminator = Discriminator().to('cuda')
output = discriminator(mashed_embedding_tensor)

# Pass the embeddings through the discriminator
print(output)


tensor([[0.5336],
        [0.5700]], grad_fn=<SigmoidBackward0>)


#Training Setup

In [123]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)
D = Discriminator().to(DEVICE)
G = Generator(seq_length=32).to(DEVICE)
# shoving hopefully everything to the gpu

max_epoch = 50 # going for 50 epochs
step = 0 # step through the data
n_noise = 100 # size of noise vector

criterion = nn.BCELoss()
D_opt = torch.optim.Adam(D.parameters(), lr=10e-10, betas=(0.5, 0.999))
G_opt = torch.optim.Adam(G.parameters(), lr=10e-10, betas=(0.5, 0.999))
# optimizers for both the discriminator and generator alongside a
# binary cross entropy loss

# We will denote real images as 1s and fake images as 0s
# This is why we needed to drop the last batch of the data loader
D_labels = torch.ones([batch_size, 1]).to(DEVICE) # Discriminator label: real
D_fakes = torch.zeros([batch_size, 1]).to(DEVICE) # Discriminator Label: fake

cpu


#Training Loop

In [124]:
import matplotlib.pyplot as plt
import time
# import pyplot to plot images
from google.colab import drive
drive.mount('/content/gdrive')

start_time = time.time()
for epoch in range(max_epoch):
    for idx, word_embeddings in enumerate(mashed_sentences_data_loader):
        # Training Discriminator
        x = word_embeddings.float().to(DEVICE)
        # rehsaping x to have the x dimension in it for the vectores
        x = x.view(batch_size, 1, 768)
        x_outputs = D(x) # input includes labels
        D_x_loss = criterion(x_outputs, D_labels) # Discriminator loss for real images

        z = torch.randn(batch_size, n_noise).to(DEVICE)

        # the generator outputs a sequence that sequence must then be converted into
        # embeddings that are passsed to the dsicriminator
         # the generator and the randomness to make a sequence
        sequences = generate_sequence(G, z)
        z_outputs = D(sequence_to_mash_embed(sequences, DEVICE)) # input to both generator and discriminator includes labels
        D_z_loss = criterion(z_outputs, D_fakes) # Discriminator loss for fake images
        D_loss = D_x_loss + D_z_loss # Total Discriminator loss

        D.zero_grad()
        D_loss.backward()
        D_opt.step()
        # updating the discriminator model

        # Training Generator
        z = torch.randn(batch_size, n_noise).to(DEVICE) # creating the random vector alongside the batch proper
        train_sequences = generate_sequence(G, z)
        z_outputs = D(sequence_to_mash_embed(train_sequences, DEVICE))
        G_loss = -1 * criterion(z_outputs, D_fakes) # Generator loss is negative disciminator loss

        G.zero_grad()
        G_loss.backward()
        G_opt.step()
        # updating the generator model

        if step % 500 == 0:
            print('Epoch: {}/{}, Step: {}, D Loss: {}, G Loss: {} time: '.format(epoch, max_epoch, step, D_loss.item(), G_loss.item(), time.time() - start_time))
            # done to view teh loss
        step += 1

    if epoch+1 in [1, 5, 10, 15, 20, 25, 30, 50]:
      # if in the 1st (done for making sure everything is good)
      # or the 10th or 30th or 50th epoch then display what the
      # generator has so far
      model_save_name = 'discriminator.pt'
      path = F"/content/gdrive/My Drive/{model_save_name}"
      torch.save(D.state_dict(), path)

      model_save_name = 'generator.pt'
      path = F"/content/gdrive/My Drive/{model_save_name}"
      torch.save(G.state_dict(), path)

      print(f"on epoch {epoch + 1}")
      noise = torch.randn(1, 100).to(DEVICE)
      G.eval()  # eval mode
      sequences = generate_sequence(generator, noise)
      print(sequences)
      # show the plot from get sample images
      G.train()
      # back to trianing the genrator



Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
on epoch 1
['llllll                                                      ']
on epoch 5
['rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr']
on epoch 10
['eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee']
on epoch 15
['mmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm']
on epoch 20
["''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''"]
on epoch 25
['sssssssssssaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa']
on epoch 30
['ææææææææææææææææææææææææææææææææææææææææææææææææææææææææææææ']
on epoch 50
['cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc']
