# Cipher Alignment and Key Recreation Project

This notebook demonstrates the process of aligning historical ciphertexts with their plaintexts and evaluating the recreation of cipher keys using statistical models.


In [None]:
# Imports dependencies for the project
import os
import random
import time
from datetime import datetime
from dataclasses import dataclass
from typing import Union
import pandas as pd
from cipher import Cipher, CipherKey, ErrorSequence
from cipher_alignment_model import CipherAlignmentModel, NLTKIBMModelAdapter, GenericIBMModelAdapter
from alignment import Alignment

In [None]:
#DATACLASSES
@dataclass
class KeyRecreationResult:
    """Dataclass for storing the results of the evaluation of a recreated cipherkey."""
    cipher: Cipher
    evaluations: list[tuple[CipherAlignmentModel, tuple[float, float, float, CipherKey]]] # List of tuples with the model and the evaluation results
    def __str__(self):
        return f"{self.cipher}\n{self.evaluations}"
    
@dataclass
class AlignmentResult:
    """Dataclass for storing the results of the evaluations of alignments."""
    cipher: Cipher 
    evaluations: list[dict]  # Use a list of dictionaries to hold detailed results per alignment

    def __str__(self):
        results_str = '\n'.join(str(evaluation) for evaluation in self.evaluations)
        return f"{self.cipher}\n{results_str}"

In [None]:
# HELPER FUNCTIONS
def sample_lines_from_large_file(file_path, sample_size):
    """Sample random lines from a large file using reservoir sampling."""
    sampled_lines = []
    with open(file_path, 'r', encoding='utf-8') as file:
        for i, line in enumerate(file):
            if i < sample_size:
                sampled_lines.append(line)
            else:
                j = random.randint(0, i)
                if j < sample_size:
                    sampled_lines[j] = line
    return sampled_lines

def create_row(cipher, result, model_name, n_iter)-> dict[str, Union[str, float, int, None]]:
    """Creates a 'row' to be used in a data-table with the information for one cipher."""

    precision = result[0]
    recall = result[1]
    f1_score = result[2]
    recreated_key = result[3]

    return {
        #'homophonicity': cipher.homophonicity,
        'csv_filename': cipher.csv_filename,
        'homophonicity': cipher.homophonicity,
        'error': str(cipher.error.error_type if cipher.error is not None else None),
        'error rate': str(cipher.error.error_rate if cipher.error is not None else None),
        'precision': round(precision, 4) if precision is not None else None,
        'recall': round(recall, 4) if recall is not None else None,
        'f1_score': round(f1_score, 4)if f1_score is not None else None,
        'filename': cipher.filename,
        'model': model_name,
        'segment_size': cipher.seq_size,
        'iterations': n_iter,
        'id_tag': cipher.id_tag,
        'missing_keys': recreated_key.check_missing_keys(cipher.original_reference_key) if recreated_key is not None else None,
        'recreated_key': recreated_key if recreated_key is not None else None
    }
def write_to_file(ciphers_and_results: list[KeyRecreationResult], filename: str) -> None:
    """Write cipher information and evaluation results to a CSV file."""
    data = []
    for result in ciphers_and_results:
        for model, evaluation in result.evaluations:
            data.append(create_row(result.cipher, evaluation, model.NAME, model.n_iter))
    
    df = pd.DataFrame(data)

    # Write the DataFrame to a CSV file
    csv_filename = filename.replace('.xlsx', '.csv')
    df.to_csv(csv_filename, index=False)
    print(f"Results for Cipher key written to {csv_filename}")


def write_alignment_to_file(ciphers_and_results: list[AlignmentResult], filename: str) -> None:
    """Write alignment evaluation results to a CSV file."""
    data = []
    for alignment_result in ciphers_and_results:
        for evaluation in alignment_result.evaluations:
            data.append({
                'cipher_id': alignment_result.cipher.id_tag,
                'csv_filename': alignment_result.cipher.csv_filename,
                'homophonicity': alignment_result.cipher.homophonicity,
                'model': evaluation['model'],
                'error': evaluation['error'],
                'error rate': evaluation['error rate'],
                'length': evaluation['length'],
                'checklength': evaluation['checklength'],
                'iterations': evaluation['iterations'],
                'segment_size': alignment_result.cipher.seq_size,
                'precision': evaluation['precision'],
                'recall': evaluation['recall'],
                'f1_score': evaluation['f1_score'],
                'alignment': evaluation['alignments']
            })

    df = pd.DataFrame(data)

    # Write the DataFrame to a CSV file
    csv_filename = filename.replace('.xlsx', '.csv')
    df.to_csv(csv_filename, index=False)
    print(f"Alignment results written to {csv_filename}")

def generate_xlsx_file_name(name:str) -> str:
    """Generates a file name based on the current date and time."""
    now = datetime.now()
    now_str = now.strftime("%Y-%m-%d_%H-%M")
    return f'{name}_{now_str}.xlsx'

In [None]:
# Model Training and Evaluation Functions
def train_and_evaluate_model(model: CipherAlignmentModel, cipher: Cipher) -> tuple[float, float, float, CipherKey]:
    """Trains the model and evaluates it on the given cipher. Returns the precision, recall and F1-score.
    Also returns the recreated key."""
    reference_key=cipher.original_reference_key
    recreated_key=CipherKey(model.translation_probs)
    precision, recall, f1_score =recreated_key.compare_keys(reference_key)
    return precision, recall, f1_score, recreated_key

def evaluate_alignments(model: CipherAlignmentModel, cipher: Cipher, continuous=False) -> dict:
    """Evaluates the alignments produced by the model for the given cipher."""
    #print("EVALUATING ALIGNMENTS!!!")
    alignments = model.align_sentences(continuous=continuous)
    
    concatenated_alignment_data = []
    for alignment_obj in alignments:
        
        concatenated_alignment_data.extend(alignment_obj.alignment)
    concatenated_alignment = Alignment(concatenated_alignment_data, model.NAME)


    print(f"LEN: {len(concatenated_alignment)}")
    # Calculate metrics on the concatenated alignment
    aer = concatenated_alignment.calculate_aer(cipher.reference_alignment_data)
    ld = concatenated_alignment.calculate_levenshtein(cipher.reference_alignment_data)
    precision, recall, fscore = concatenated_alignment.get_precision_recall_fscore(cipher.reference_alignment_data)

    evaluation_details = {
        "csv_filename": cipher.csv_filename,
        'homophonicity': cipher.homophonicity,
        'error': str(cipher.error.error_type if cipher.error is not None else None),
        'error rate': str(cipher.error.error_rate if cipher.error is not None else None),
        'length': len(cipher.plaintext),
        "checklength": len(concatenated_alignment), 
        "model": model.NAME,
        "iterations": model.n_iter,
        "segment_size": cipher.seq_size,
        "precision": precision,
        "recall": recall,
        "f1_score": fscore,
        #"AER": aer,
        #"Levenshtein Distance": ld,
        "alignments": concatenated_alignment if len(concatenated_alignment) < 1000 else 'Too many to display',
    }
    return evaluation_details

In [None]:
# Main Execution Block: DEFINE PARAMETERS  

seq_sizes = [5]
n_iters = [10]
sample_size = 10 #100

cipherkey_out_filename = generate_xlsx_file_name("cipherkey_evaluation")
alignment_out_filename = generate_xlsx_file_name("alignment_evaluation")

path_to_file = 'test_data100.csv'
key_results = []
alignment_results = []

start = time.time()

In [None]:
# Main Execution Block:  LOAD AND PROCESS DATA. (Continued)
for seq_size in seq_sizes:
    sampled_lines = sample_lines_from_large_file(path_to_file, sample_size)
    list_of_random_ciphers = []
    
    error_types = ["addition", "deletion", "substitution", "duplication", "all"]
    for random_line in sampled_lines:
        line = random_line.strip().split(';')
        cipher_correct = Cipher(id_tag=line[0],plaintext=line[1],str_key=line[2],ciphertext=line[3],filename=line[4], homophonicity=line[5], csv_filename=path_to_file, seq_size=seq_size)
        list_of_random_ciphers.append(cipher_correct)
        for error_type in error_types:
            error = ErrorSequence(line[3], error_type)
            cipher_erroneous = Cipher(id_tag=line[0],plaintext=line[1],str_key=line[2],ciphertext=line[3],filename=line[4], homophonicity=line[5], csv_filename=path_to_file, error=error, seq_size=seq_size)
            list_of_random_ciphers.append(cipher_erroneous)


In [None]:
# TRAIN AND EVALUATE MODELS

for n_iter in n_iters:
    for cipher in list_of_random_ciphers:
        models = [NLTKIBMModelAdapter(cipher.bitext, n_iter, "model2", use_null=False)]
        for model in models:
            key_evaluation = train_and_evaluate_model(model, cipher)
            key_results.append(KeyRecreationResult(cipher, [(model, key_evaluation)]))
            alignment_evaluation = evaluate_alignments(model, cipher, continuous=True)
            alignment_results.append(AlignmentResult(cipher, [alignment_evaluation]))


In [None]:
write_to_file(key_results, cipherkey_out_filename)
write_alignment_to_file(alignment_results, alignment_out_filename)

end = time.time()
print("Evaluation complete")
print(f"Time taken: {end - start} seconds")


## Results and Analysis
The evaluation of the cipher alignment and key recreation models is complete. The results have been saved to the specified csv/Excel files. Below is a brief summary of the findings.


In [None]:
import pandas as pd

ck_filename=cipherkey_out_filename.replace('.xlsx', '.csv')
# Load the CSV file into a pandas DataFrame
df_cipher = pd.read_csv(ck_filename)

# Display the first few rows of the DataFrame
df_cipher.head()
# Display the last few rows of the DataFrame
df_cipher.tail()


In [None]:
import matplotlib.pyplot as plt

# Example: Plot precision, recall, and f1_score for different models
df_cipher.plot(x='model', y=['precision', 'recall', 'f1_score'], kind='bar')
plt.title('Model Evaluation Metrics')
plt.xlabel('Model')
plt.ylabel('Scores')
plt.show()
