# HMM for decryption


In [1]:
from src.CipherUtils import CipherGenerator
from src.CipherUtils import TextEncoder
from src.ProbabilityMatrix import ProbabilityMatrix
from src.CipherUtils import TextPreProcessor

from src.HMM_utils import map_alphabet_to_numbers
from src.HMM_utils import find_mapping, numbers_to_string, invert_mapping
from src.HMM_utils import convert_numbers_to_letters

# from src.HMM_functions import Baum_Welch
# from src.HMM_functions import compute_f_log, Viterbi_log, reconstruct

import numpy as np
import random 


In [2]:
def similar(a, b):
    """
    Given two strings a, b it returns a percentage of matching characters among the two
    """

    return np.mean(
        np.array(list(a.replace(" ", ""))) == np.array(list(b.replace(" ", "")))
    )


def Homophonic_Cipher_Generator(extended_alphabet = list("abcdefghijklmnopqrstuvwxyz1234567890")):
    """
    Generates a permutation assigning to each letter one or potentially two of the symbols in the extended alphabet provided.
    In this simple example it assigns letters as before + it assigns to some letters the numbers as well (can be generalized) 
    The output is kept as a dictionary.
    """
    letters = extended_alphabet.copy()
    letters = letters[0:26]
    random.shuffle(letters)

    letters_list = [letters[i:i+1] for i in range(0, len(letters), 1)]
    for i in range(len(extended_alphabet) - 26):
        index = random.randint(0, 25) # Select one of the existing letters
        while(len(letters_list[index]) > 1):
            index = random.randint(0, 25) # Select one of the existing letters
        letters_list[index].append(i)

    # print(letters_list)
    d = {k:v for k,v in zip(extended_alphabet[0:26], letters_list)}
    return(d)
    
def Encode_using_homophonic(text, cipher_dict):
    """
    Encodes using the homophonic dictionary provided
    We assume text has already been preprocessed to remove all puntctuation ...
    """
    encoded_text = []
    for char in text:
        if char == ' ':
            encoded_text.append(char)
            continue
            
        if len(cipher_dict[char]) == 1:
            encoded_char = cipher_dict[char]
        else:
            encoded_char = str(cipher_dict[char][random.randint(0,len(cipher_dict[char])-1)])  
        
        if isinstance(encoded_char, list):
            encoded_char = str(encoded_char[0])
        encoded_text.append(encoded_char)
    
    
    return "".join(encoded_text)

def forward_HMM(A, B, pi, observed):
    """
    A: transition
    B: emission
    pi: initial
    n_nodes: number of nodes in the chain
    observed: list containing observed ones.
    """
    n_nodes = len(observed)
    n_states = A.shape[0]
    alpha = np.zeros((n_nodes, n_states))
    c = np.zeros(n_nodes)
    alpha_hat = np.zeros((n_nodes, n_states))

    for j in range(n_states):
        alpha[0, j] = pi[j] * B[j, observed[0]]

    c[0] = np.sum(alpha[0])
    alpha_hat[0] = alpha[0] / np.sum(alpha[0])
    # print("alpa[0]", alpha[0])

    for i in range(1, n_nodes):
        for j in range(n_states):
            for k in range(n_states):
                alpha[i, j] += A[k, j] * B[j, observed[i]] * alpha_hat[i - 1, k]
        c[i] = np.sum(alpha[i])
        alpha_hat[i] = alpha[i] / c[i]
    return alpha_hat, c


def backward_HMM(A, B, observed, c):
    """
    A: transition
    B: emission
    n_nodes: number of nodes in the chain
    observed: list containing observed ones.
    """
    n_nodes = len(observed)
    n_states = A.shape[0]
    beta = np.zeros((n_nodes - 1, n_states))
    beta_hat = np.zeros((n_nodes - 1, n_states))

    for j in range(n_states):
        for k in range(n_states):
            beta[-1, j] += A[j, k] * B[k, observed[n_nodes - 1]]

    beta_hat[-1] = beta[-1] / c[-1]

    for i in range(n_nodes - 3, -1, -1):
        for j in range(n_states):
            for k in range(n_states):
                beta[i, j] += A[j, k] * B[k, observed[i + 1]] * beta_hat[i + 1, k]
        beta_hat[i] = beta[i] / c[i + 1]

    return beta_hat


def compute_all_conditional(alpha, beta):
    """
    alpha: list containing forward messages
    beta: list containing backward messages
    """
    n_nodes = alpha.shape[0]
    n_states = alpha.shape[1]

    gamma = np.zeros((n_nodes, n_states))

    gamma[n_nodes - 1] = alpha[n_nodes - 1] / np.sum(alpha[n_nodes - 1])

    for i in range(n_nodes - 1):
        tmp = alpha[i] * beta[i]
        gamma[i] = tmp / np.sum(tmp)

    return gamma


def divide_row_by_sum(matrix):
    row_sums = np.sum(matrix, axis=1)  # Calculate the sum of each row
    divided_matrix = (
        matrix / row_sums[:, np.newaxis]
    )  # Divide each element by the corresponding row sum
    return divided_matrix


def update_B(gamma, observed):
    # n_nodes = gamma.shape[0]
    n_states = gamma.shape[1]

    B = np.zeros((n_states, 37))

    for i in range(n_states):
        for j in range(37):
            for k in range(len(observed)):
                if observed[k] == j:
                    B[i, j] += gamma[k, i]

    return divide_row_by_sum(B)

def Baum_Welch(A, B_start, pi, observed, maxIter=100, tol = 1e-4):
    B = np.copy(B_start)
    changed = 0 # change is set to 1 whenever at least one coordinate increases by more than tol
    for it in range(maxIter):
        #print("computing alpha")
        alpha_hat, c = forward_HMM(A, B, pi, observed)
        #print("alpha_hat", alpha_hat.sum(axis = 1))
        #print("computing beta")
        beta_hat = backward_HMM(A, B, observed, c)
        #print("beta_hat", beta_hat.sum(axis = 1))
        #print("computing gamma")
        gamma = compute_all_conditional(alpha_hat, beta_hat)
        #print("gamma", gamma.sum(axis = 1))
        B_old = B

        #print("updating B")
        #print(B.sum(axis = 1))
        B = update_B(gamma, observed)
        #print("computing B", B.sum(axis = 1))
        # Check if conerged or still changing
        change = np.abs(B - B_old)
        max_change = np.max(change)

        if(max_change < tol):
            print("Not updating anymore after iteration", it)
            break


        # following lines only for encryption
        B[:, -1] = np.zeros(27)
        B[-1, :] = np.zeros(37)
        B[-1, -1] = 1
    return B


def compute_f_log(A, B, observed):
    """
    It constructs the factors of the HMM which are needed to perform the forward pass of the message passing algorithm.
    Input:
        - A : the transition matrix
        - B : the emission matrix
        - observed: an array containing the observed values
    Output:
        - f0: the factor corresponding to the initial factor to first latent variable message
        - f: an array containig the all the other factors (n_states - 1)
    """
    pi = A[-1]
    n_nodes = len(observed)
    n_states = A.shape[0]
    f = np.zeros((n_nodes - 1, n_states, n_states))

    tmp = np.zeros((n_states, 1))
    for k in range(n_states):
        tmp[k] = np.log(pi[k]) + np.log(B[k, observed[0]])

    f0 = tmp

    for i in range(1, n_nodes):
        tmp = np.zeros((n_states, n_states))

        for j in range(n_states):  # over z1
            for k in range(n_states):  # over z2
                tmp[j, k] = np.log(A[j, k]) + np.log(B[k, observed[i]])

        f[i - 1] = tmp

    return f0, f


def Viterbi_log(f0, f):
    """
    Performs the forward pass of the max plus algorithm (known as Viterbi algorithm for Hidden-Markov models).
    Input: 
        - f0: the factor corresponding to the initial factor to first latent variable message
        - f: an array containig the all the other factors (n_states - 1)
    Output:
        - pmax: the array containing the messages of the forward pass
        - phi: the array storing the most probable preceding state stored during the forward pass
    """
    n_nodes = f.shape[0] + 1
    n_states = f.shape[1]

    pmax = np.zeros((n_nodes, n_states))  # Need one for every node
    phi = np.zeros(
        (n_nodes - 1, n_states)
    )  # Need one for every node other than the first one (no need to reconstruct it)

    pmax[0] = f0.flatten()

    for i in range(1, n_nodes):
        tmp = ((f[i - 1]).T + pmax[i - 1]).T

        pmax[i] = np.max(tmp, axis=0)  # by column

        phi[i - 1] = np.argmax(
            tmp, axis=0
        )  # i-1 cause this contains the reconstruction about the (i-1)th element

    return pmax, phi


def reconstruct(pmax, phi):
    """
    Given the output of a max-plus forward pass it returns the most probable hidden states.
    Input:
        - pmax: the array containing the messages of the forward pass
        - phi: the array storing the most probable preceding state stored during the forward pass
    Output:
        - An array of int that coincides with the most probable latent states
    """
    reconstruction = np.empty(len(phi) + 1)

    curr = np.argmax(pmax[-1])
    reconstruction[-1] = curr

    for i in range(len(phi) - 1, -1, -1):
        curr = int(phi[i, curr])
        reconstruction[i] = curr

    return reconstruction

In [3]:
# List of text file paths to build our corpus (where we learn the transitions probs)
file_paths = [
    "texts/moby_dick.txt",
    "texts/shakespeare.txt",
    "texts/james-joyce-a-portrait-of-the-artist-as-a-young-man.txt",
    "texts/james-joyce-dubliners.txt",
    "texts/james-joyce-ulysses.txt",
]

texts = []
for file_path in file_paths:
    with open(file_path, "r") as file:
        texts.append(file.read())

corpus = "".join(texts)
alphabet = list("abcdefghijklmnopqrstuvwxyz ")

preprocessor = TextPreProcessor()
corpus = preprocessor.lower(text=corpus)
corpus = preprocessor.remove_unknown_chars(
    text=corpus, unknown_chars=preprocessor.unknown_chars(corpus)
)
corpus = preprocessor.remove_additional_spaces(text=corpus)

# compute probabilities
p = ProbabilityMatrix(corpus)
p.compute_probability_matrix()
p.compute_normalized_matrix()


In [4]:
def string_to_numbers_updated(text, mapping):
    """
    We convert a->0, b->1, ..., z->25, 0->26, 1->27, 2->28, ..., 9->35, ' '-> 36
    """
    converted_string = [mapping[char] for char in text]
    return converted_string

In [5]:
preprocess = TextPreProcessor()
def prepare_subtexts(path_to_file, lengths):
    """
    This function returns the subtext to use to assess accuracy in decryption

    Input:
        - path_to_file (str) giving the path to the text file
        - lengths (list of int) giving the text lengths to consider
    """

    with open(path_to_file, "r") as input_file:
        full_text = input_file.read()

    full_text = preprocess.lower(full_text)
    unknown_chars = preprocess.unknown_chars(full_text)
    full_text = preprocess.remove_unknown_chars(full_text, unknown_chars=unknown_chars)
    full_text = preprocess.remove_additional_spaces(full_text)

    subtexts = [
        " ".join(full_text.split()[: lengths[i]]) for i in range(len(lengths))
    ]  # Contains the ones for the varying lengths

    return subtexts

In [15]:
def accuracy_varying_text_length_homophonic(
    subtexts,
    probability_matrix,
    n_iterations = 3,
    maxIter_BM=100,
    tol_BM=10e-3,
):
    """
    This function assesses the accuracy of the MCMC decryption + that of the HMM decryption process
    Input:
        - subtexts (list of str): a vector of text of different length
        - cipher_generator (CipherGenerator instance)
        - probability_matrix
        - extract_top(int) : After cipher breaking how many of the top ones we want to retain
        - n_iterations (int) : number of times we want to repeat the encryption-decryption for each text
        - max_iterations (int) : number of iterations in the cipher breaking procedure
        - nstart (int) : number of starting points in the cipher breaking procedure
        - maxIter_BM (int) = number of maximum number of iterations that can be performed by the BM algorithm
        - tol_BM (float) = after no updates larger than this we stop BM algorithm

    For each subtext it encrypts it and decrypts it n_iteration times.
        Everytime it finishes decrypting it compares the best extract_top (in terms of log likelihood) with the original string,
        and computes the accuracy as the proportion of characters matching for the string which matches best.
        Then for each subtext these are averaged over all n_iterations runs and are stored.
    """
    mean_accuracy_HMM_viterbi = []

    for subtext in subtexts:
        total_iterations_HMM_viterbi = 0

        # We run n_iterations ciphers and measure the averge accuracy on those.
        for _ in range(n_iterations):
            cipher = Homophonic_Cipher_Generator()
            encoded_text = Encode_using_homophonic(subtext, cipher)

            mapping = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4, 'f': 5, 'g': 6, 'h': 7, 'i': 8, 'j': 9,
           'k': 10, 'l': 11, 'm': 12, 'n': 13, 'o': 14, 'p': 15, 'q': 16, 'r': 17, 's': 18,
           't': 19, 'u': 20, 'v': 21, 'w': 22, 'x': 23, 'y': 24, 'z': 25,
           '0': 26, '1': 27, '2': 28, '3': 29, '4': 30, '5': 31, '6': 32, '7': 33, '8': 34,
           '9': 35, ' ': 36}
            
            hidden_ = string_to_numbers_updated(encoded_text, mapping = mapping)
            #print(hidden_)

            observed_ = string_to_numbers_updated(subtext, mapping)
            #print(observed_)


            ######################################################### HMM #########################################################
            B_start = np.zeros((27, 37)) + 1 / 36
            B_start[:, -1] = np.zeros(27) # last column
            B_start[-1, :] = np.zeros(37) # last row
            B_start[-1, -1] = 1           # last entr
            emission = Baum_Welch(
                A=probability_matrix.normalized_matrix,
                B_start=B_start,
                pi=probability_matrix.normalized_matrix[-1, :],
                observed=observed_,
                maxIter=maxIter_BM,
                tol=tol_BM,
            )

            # Now we use Viterbi to obtain the most likely one
            f0, f = compute_f_log(
                A=probability_matrix.normalized_matrix, B=emission, observed=observed_
            )
            pmax, phi = Viterbi_log(f0, f)
            reconstruction = reconstruct(pmax, phi)
            reconstruction = reconstruction.astype(int)
            viterbi_reconstruction = convert_numbers_to_letters(reconstruction)
            total_iterations_HMM_viterbi = total_iterations_HMM_viterbi + similar(
                subtext, viterbi_reconstruction
            )

        mean_accuracy_HMM_viterbi.append(total_iterations_HMM_viterbi / n_iterations)

    return mean_accuracy_HMM_viterbi

In [16]:
# Use it on the war message text
lengths = [10, 25, 50, 100, 250, 500, 1000]
subtexts = prepare_subtexts("texts/eisenhower_speech.txt", lengths)

mean_accuracies = speech_accuracy_HMM_viterbi = accuracy_varying_text_length_homophonic(subtexts, probability_matrix=p)

Not updating anymore after iteration 56


  tmp[k] = np.log(pi[k]) + np.log(B[k, observed[0]])
  tmp[j, k] = np.log(A[j, k]) + np.log(B[k, observed[i]])


Not updating anymore after iteration 56
Not updating anymore after iteration 56
Not updating anymore after iteration 69
Not updating anymore after iteration 69
Not updating anymore after iteration 69
Not updating anymore after iteration 45
Not updating anymore after iteration 45
Not updating anymore after iteration 45
Not updating anymore after iteration 29
Not updating anymore after iteration 29
Not updating anymore after iteration 29
Not updating anymore after iteration 30
Not updating anymore after iteration 30
Not updating anymore after iteration 30
Not updating anymore after iteration 34
Not updating anymore after iteration 34
Not updating anymore after iteration 34
Not updating anymore after iteration 34
Not updating anymore after iteration 34
Not updating anymore after iteration 34


In [18]:
mean_accuracies

[0.425531914893617,
 0.7043478260869566,
 0.7914893617021277,
 0.9473684210526315,
 0.9617723718505647,
 0.9706770159551531,
 0.9709153122326774]