In [None]:
import sys
!{sys.executable} -m pip install torch
!{sys.executable} -m pip install transformers
!{sys.executable} -m pip install datasets
!{sys.executable} -m pip install nltk
!{sys.executable} -m pip install rouge
!{sys.executable} -m pip install deep_translator

In [None]:
# Importing necessary libraries
import torch
import numpy as np
import pandas as pd
from datasets import load_dataset
import nltk
from nltk.translate.bleu_score import sentence_bleu
from nltk.tokenize import word_tokenize
from transformers import MarianMTModel, MarianTokenizer
from rouge import Rouge
from sklearn.metrics.pairwise import cosine_similarity
import copy
import torch.nn.functional as F
from deep_translator import GoogleTranslator
nltk.download('punkt')

DEVICE = torch.device("cuda")
print("Device: ", DEVICE)

In [None]:
# Loading the dataset for German to English translation
dataset = load_dataset("kaitchup/opus-German-to-English")
model_name = "Helsinki-NLP/opus-mt-de-en"
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)

In [None]:
#Functions

#Add noise from N(mean, std_dev) distribution
def add_noise(encoder_outputs, std_dev=0.1):
    encoder_output_return = encoder_outputs
    encoder_hidden_states = encoder_output_return.last_hidden_state
    noise = torch.randn(encoder_hidden_states.size()) * std_dev + 0 #standard deviation = std_dev but mean is always 0, +0 for illustrative purpose
    noisy_hidden_states = encoder_hidden_states + noise.to(encoder_hidden_states.device)
    encoder_output_return.last_hidden_state = noisy_hidden_states
    return encoder_output_return

#Determine Bleu Score
def bleu_score_calc(reference, candidate):
    reference_tokens = word_tokenize(reference.lower())
    candidate_tokens = word_tokenize(candidate.lower())
    length = len(reference_tokens)
    if length <= 3:
        weights = [1]
    elif length <= 8:
        weights = [0.5, 0.5]
    else: #length <= 15:
        weights = [0.33, 0.33, 0.33]
    return sentence_bleu([reference_tokens], candidate_tokens, weights = weights)

#Determine Rouge Score
def rouge_score_calc(reference, candidate):
    rouge = Rouge()
    scores = rouge.get_scores(candidate, reference)
    return scores


In [None]:
# Experiment 1: Evaluate the effect of noise on translation accuracy using a range of noise levels.
# This experiment involves translating texts, adding noise to the translations, and then evaluating the impact using BLEU and ROUGE scores.

file_path = '/content/NLP Class Project Data - OfficialInputData.csv'
df = pd.read_csv(file_path)

# List to store results from each noise level trial
big_data = []

# Define the standard deviations for noise to be tested
std_dev_values = [0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.5]

# Loop through each standard deviation value to test its impact
for std_dev in std_dev_values:
    print("TRIAL: ", std_dev)
    print("_________________________________________________________________________")
    # Initialize a dictionary to store results for this trial
    data = {
        'r1_r': [],
        'r1_p': [],
        'r1_f': [],
        'r2_r': [],
        'r2_p': [],
        'r2_f': [],
        'rl_r': [],
        'rl_p': [],
        'rl_f': [],
        'bleu_scores': [],
        'normal_text': [],
        'noise_text': []
    }
    # Process each sentence in the dataset
    for i in range(len(df)):
        print(i)
        input_text = df['German Google Translate (input)'][i]
        # Tokenize the input text
        inputs = tokenizer(input_text, return_tensors="pt")
        # Get encoder outputs
        encoder_outputs = model.get_encoder()(input_ids=inputs['input_ids'])

        # Generate translations from clean encoder outputs
        generated_ids_normal = model.generate(
            input_ids=inputs['input_ids'],
            encoder_outputs=encoder_outputs
        )

        # Add noise to the encoder outputs and generate translations from noisy outputs
        noisy_encoder_outputs = add_noise(encoder_outputs, std_dev)

        generated_ids_noise = model.generate(
            input_ids=inputs['input_ids'],
            encoder_outputs=noisy_encoder_outputs
        )

        # Decode translations to text
        normal_decoded_text = tokenizer.decode(generated_ids_normal[0], skip_special_tokens=True)
        noise_decoded_text = tokenizer.decode(generated_ids_noise[0], skip_special_tokens=True)
        # Calculate BLEU and ROUGE scores
        bleu_score = bleu_score_calc(normal_decoded_text, noise_decoded_text)
        rouge_score = rouge_score_calc(normal_decoded_text, noise_decoded_text)

        # Append results to data dictionary
        data['normal_text'].append(normal_decoded_text)
        data['noise_text'].append(noise_decoded_text)
        data['bleu_scores'].append(bleu_score)
        data['r1_r'].append(rouge_score[0]['rouge-1']['r'])
        data['r1_p'].append(rouge_score[0]['rouge-1']['p'])
        data['r1_f'].append(rouge_score[0]['rouge-1']['f'])
        data['r2_r'].append(rouge_score[0]['rouge-2']['r'])
        data['r2_p'].append(rouge_score[0]['rouge-2']['p'])
        data['r2_f'].append(rouge_score[0]['rouge-2']['f'])
        data['rl_r'].append(rouge_score[0]['rouge-l']['r'])
        data['rl_p'].append(rouge_score[0]['rouge-l']['p'])
        data['rl_f'].append(rouge_score[0]['rouge-l']['f'])
    # Store results for this trial in the main list
    big_data.append(data)

In [None]:
# Export Data to Excel File For Plotting and Analysis
dfs = {key: pd.DataFrame(value) for key, value in enumerate(big_data)}

with pd.ExcelWriter('output_experiment1_trial#.xlsx') as writer:
    for sheet_name, df in dfs.items():
        df.to_excel(writer, sheet_name=str(sheet_name))

In [None]:
# Experiment 2: Analyze the preservation of semantic information by comparing noisy and clean embeddings in the embedding space.
# Take the embeddings from the original English phrases converted to German, add noise to these embeddings,
# decode them into English, translate them back to German, embed them again, and find cosign similarity between clean and noisy embeddings
# embedding space

file_path = '/content/NLP Class Project Data - OfficialInputData.csv'
df = pd.read_csv(file_path)

# List to store results from each noise level trial
big_data2 = []
std_dev_values = [0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.5]

# Iterate over each standard deviation value to test its impact
for std_dev in std_dev_values:
    print("TRIAL: ", std_dev)
    print("_________________________________________________________________________")
    # Dictionary to store results for this trial
    data = {
        'similarities': [],
        'similarities_squared': [],
        'original_english': [],
        'noisy_english': [],
        'noisy_german_translated': []
    }
    # Process each sentence in the dataset
    for i in range(len(df)):
        print(i)
        input_text = df['German Google Translate (input)'][i]
        # Tokenize the input text
        inputs = tokenizer(input_text, return_tensors="pt")
        # Get encoder outputs
        encoder_outputs = model.get_encoder()(input_ids=inputs['input_ids'])
        # Clone the encoder outputs to preserve original embeddings for comparison
        saved_encoder_outputs = {attr: getattr(encoder_outputs, attr).clone() if isinstance(getattr(encoder_outputs, attr), torch.Tensor) else getattr(encoder_outputs, attr) for attr in encoder_outputs.__annotations__.keys()}

        # Add noise to encoder outputs
        noisy_encoder_outputs = add_noise(encoder_outputs, std_dev)

        # Generate translations from noisy encoder outputs
        generated_ids_noise = model.generate(
            input_ids=inputs['input_ids'],
            encoder_outputs=noisy_encoder_outputs
        )

        # Decode noisy translations to text
        noise_decoded_text = tokenizer.decode(generated_ids_noise[0], skip_special_tokens=True)
        # Translate noisy English text back to German using Google Translate
        translated_noisy = GoogleTranslator(source='en', target='de').translate(noise_decoded_text)

        # Tokenize and re-encode the translated noisy German text
        inputs_noise = tokenizer(translated_noisy, return_tensors="pt", max_length=512, truncation=True)
        noise_encoded = model.get_encoder()(input_ids=inputs_noise['input_ids'])

        # Calculate the mean of the embeddings for both original and noisy texts - mean reduction
        noise_tensor = noise_encoded.last_hidden_state
        original_tensor = saved_encoder_outputs['last_hidden_state']
        original_vector = original_tensor.mean(dim=1)
        noise_vector = noise_tensor.mean(dim=1)

        # Compute cosine similarity and squared cosine similarity between original and noisy embeddings
        similarity = F.cosine_similarity(original_vector, noise_vector, dim=1)
        similarity_squared = similarity ** 2

        # Store results in dictionary
        data['original_english'].append(df['English (gold output)'][i])
        data['noisy_english'].append(noise_decoded_text)
        data['noisy_german_translated'].append(translated_noisy)
        data['similarities'].append(similarity)
        data['similarities_squared'].append(similarity_squared)
    # Append trial data to the main list
    big_data2.append(data)



In [None]:
# Export Data to Excel File For Plotting and Analysis
dfs = {key: pd.DataFrame(value) for key, value in enumerate(big_data2)}

with pd.ExcelWriter('output_experiment2_trial#.xlsx') as writer:
    for sheet_name, df in dfs.items():
        df.to_excel(writer, sheet_name=str(sheet_name))