In [1]:
import time
import math
import gmpy2
import random
import numpy as np

In [2]:
class Helper:
    def __init__(self, codebook, n=5, k=3):
        
        """
        codebook: dictionary of characters to their character glyphs
        n: number of characters in each block
        k: the number of primes whose product is maximised in the process
        """
        
        self.codebook = codebook
        self.n = n
        self.k = k
        self.codebook_glyph_count = {c:(len(self.codebook[c]) - 1) for c in self.codebook.keys()}
        self.PERTURBLE_CHARACTERS = []

        for i in range(26):
            self.PERTURBLE_CHARACTERS.append(chr(i + ord("a")))
            self.PERTURBLE_CHARACTERS.append(chr(i + ord("A")))
            if(i<=9):
                self.PERTURBLE_CHARACTERS.append(str(i))
        self.PERTURBLE_CHARACTERS.sort()
        self.END_OF_MESSAGE_BITS = "0000011"


    def check_if_coprime(self, a, b):
        
        """ Helper function for checking if two numbers are coprime to each other """
        
        while(a!=b):
            if(a>b):
                a = a - b
            else:
                b = b - a
    
        if(a==1): return True
        else: return False


    def binary_to_decimal(self, s):

        """ Helper function to convert binary string to a decimal number """
        
        decimal_value = 0
        for i in range(len(s)):
            if(s[i]=='0'): decimal_value *= 2
            else: decimal_value = decimal_value * 2 + 1

        return decimal_value


    def decimal_to_binary(self, n, total_length=None):
        """ Helper function to convert a decimal integer to a l-bit binary string """

        binary_string = ""
        while(n>0):
            binary_string = str(n % 2) + binary_string
            n = math.floor(n / 2)

        if(total_length is not None):
            assert len(binary_string) <= total_length, f"The minimum required bits for binary string({len(binary_string)})" +\
                                                        f"is more than the total length({total_length})"
            while(len(binary_string)<total_length):
                binary_string = "0" + binary_string
            
        return binary_string


    def generate_ASCII_encoding(self, text):
        bit_stream = ""
        i = 0
        for ch in text:
            character_bit_stream = self.decimal_to_binary(ord(ch), 7)
            bit_stream = bit_stream + character_bit_stream

        return bit_stream


    def decode_ASCII_encoding(self, bit_stream):
        bit_stream = bit_stream.rstrip('0')
        while(len(bit_stream)%7!=0):
            bit_stream += "0"

        decoded_string = ""
        for i in range(0, len(bit_stream), 7):
            ascii_bit_stream = bit_stream[i : i + 7]
            if(ascii_bit_stream==self.END_OF_MESSAGE_BITS):
                break
            else:
                ascii_value = self.binary_to_decimal(ascii_bit_stream)
                decoded_string += chr(ascii_value)
            

        return decoded_string
    
    
    def generate_all_possible_coprimes(self, counts, coprimes):

        """
            Generates all possible coprimes for the given set of counts such that the element is
            atmost the corresponding count
        """
        
        generated_coprimes = []
        
        for number in range(1, counts[0]+1):
            possible = True

            for coprime in coprimes:
                if(not self.check_if_coprime(number, coprime)):
                    possible = False
                    break
                    
            if(possible and len(counts)==1):
                generated_coprimes.append([number])

            elif(possible and len(counts)>1):
                coprimes_copy = coprimes[:]
                coprimes_copy.append(number)
                next_coprimes_generated = self.generate_all_possible_coprimes(counts[1:], coprimes_copy)
                if(len(next_coprimes_generated)!=0):
                    for next_coprimes in next_coprimes_generated:
                        new_coprimes = [number]
                        new_coprimes.extend(next_coprimes)
                        generated_coprimes.append(new_coprimes)
                        
            else:
                continue

        return generated_coprimes


    def generate_coprimes(self, block_characters):

        """
            Given a block of characters, generates the set of n coprimes for that block.
        
            block_characters: The set of characters making the block. Must be of size n
        """

        assert len(block_characters)<=self.n
        
        
        block_character_glyph_counts = [self.codebook_glyph_count[letter] for letter in block_characters]
        
        all_coprimes = self.generate_all_possible_coprimes(block_character_glyph_counts, [])

        max_product_coprime = []
        max_product = 0
        for coprimes in all_coprimes:
            sorted_coprimes = coprimes[:]
            sorted_coprimes.sort()

            product = 1
            for number in sorted_coprimes[:self.k]:
                product *= number

            if(product >= max_product):
                max_product = product
                max_product_coprime = coprimes

        return max_product_coprime, max_product


    def calculate_hamming_distance(self, vector1, vector2):

        """ 
            Helper function to calculate the hamming distance between two vectors vector1 and vector2.
            vector1 and vector2 should have same size
        """

        assert len(vector1)==len(vector2), f"Size mismatch. Size of vector1({len(vector1)}) does not match vector2({len(vector2)})"
        hamming_distance:int = 0
        for v1, v2 in zip(vector1, vector2):
            if(v1!=v2): hamming_distance += 1

        return hamming_distance


    def get_codebook_glyph_counts(self):
        return self.codebook_glyph_count

In [3]:
class Encoding(Helper):
    def __init__(self, codebook, n=5, k=3):
        super().__init__(codebook, n, k)
        self.encoding_coprimes = {}

        
    def encode(self, secret_message, text):
        
        """
            Encodes a bit stream in the text
            secret_message: string to embed in the text
            text: input string. This will be perturbed inorder to embed the secret message

            The method uses ASCII encoding to convert the secret message into numerical format.
            All characters in the secret message are converted to 7-bit ASCII representations
        """

        all_text_characters = [*text]

        # Remove characters that cannot be perturbed
        perturble_text_characters = list(filter(lambda x: x in self.PERTURBLE_CHARACTERS, all_text_characters))

        # Generate text blocks
        text_blocks = []
        for i in range(0, len(perturble_text_characters), self.n):
            text_blocks.append(perturble_text_characters[i : i + self.n])

        bit_stream = self.generate_ASCII_encoding(secret_message) + self.END_OF_MESSAGE_BITS

        encoded_blocks = []
        prev_end = 0
        total_length = 0
        for index, block in enumerate(text_blocks):
            block_coprimes, block_product = self.generate_coprimes(block)
            self.encoding_coprimes[index + 1] = block_coprimes

            total_length += math.floor(math.log(block_product, 2))
            
            current_encoded_block = []
            if(prev_end<len(bit_stream)):
                block_bit_stream = bit_stream[prev_end : prev_end + math.floor(math.log(block_product, 2))]
                prev_end += math.floor(math.log(block_product, 2))

                block_integer_to_encode = self.binary_to_decimal(block_bit_stream)
                
                for coprime in block_coprimes:
                    current_encoded_block.append(block_integer_to_encode % coprime)
            else:
                current_encoded_block = [0] * len(block_coprimes)

            encoded_blocks.extend(current_encoded_block)

        if(prev_end<len(bit_stream)-1):
            print("!!!!The secret message is too long and cannot be embedded in the given text!!!!")
            print(f"Can embed strings of atmost {int(prev_end / 7)} character length.")
            return None
        else:
            print(f"Maximum length of secret message: {int(prev_end / 7)}")

        encoded_text = ""
        i = 0
        for ch in all_text_characters:
            if(ch in self.PERTURBLE_CHARACTERS):
                encoded_text = encoded_text + self.codebook[ch][encoded_blocks[i]]
                i += 1
            else:
                encoded_text = encoded_text + ch

        return encoded_text, encoded_blocks


    def get_block_coprimes(self):
        return self.coprimes

In [4]:
def generate_sample_codebook(random_seed=42):

    """ For testing purposes. Generates a random codebook"""
    
    codebook = {}

    random.seed(random_seed)
    for i in range(26):
        num_glyphs_lower_case = random.randint(5, 30)
        lower_case_glyphs = []
        for j in range(num_glyphs_lower_case):
            lower_case_glyphs.append(chr(i + ord('a')) + "_" + str(j))
        codebook[chr(i + ord('a'))] = lower_case_glyphs
        
        num_glyphs_upper_case = random.randint(5, 30)
        upper_case_glyphs = []
        for j in range(num_glyphs_upper_case):
            upper_case_glyphs.append(chr(i + ord('A')) + "_" + str(j))
        codebook[chr(i + ord('A'))] = upper_case_glyphs

        if(i<=9):
            num_glyphs_integer = random.randint(5, 30)
            integer_glyphs = []
            for j in range(num_glyphs_integer):
                integer_glyphs.append(str(i) + "_" + str(j))
            codebook[str(i)] = integer_glyphs

    return codebook

In [5]:
class Decoding(Helper):
    def __init__(self, codebook, n=5, k=3):
        super().__init__(codebook, n, k)
        self.decoding_coprimes = {}


    def decode_block(self, block_coprimes, block_product, code_vector):
        
        """
            Decodes the given code vector as per the coprimes of the block
        """
        
        sorted_index = np.argsort(block_coprimes)
        block_b = []
        for i in range(self.k):
            p_i = block_coprimes[sorted_index[i]]
            P_p_i = block_product / p_i
            b_i = int(gmpy2.invert(gmpy2.mpz(P_p_i), gmpy2.mpz(p_i)))
            block_b.append(b_i)
            
        m_: int = 0
        for i in range(self.k):
            m_ += int(((code_vector[sorted_index[i]] * block_b[i] * block_product) / block_coprimes[sorted_index[i]]))
        m_ = (m_ % block_product)

        code_vector_ = []
        for coprime in block_coprimes:
            code_vector_.append(m_ % coprime)
                
        hamming_distance_ = self.calculate_hamming_distance(code_vector, code_vector_)

        error_detected = False
        if(hamming_distance_!=0):
            error_detected = True
        
        return m_, error_detected

    def minimal_hamming_distance_error_correction(self, block_coprimes, block_product, code_vector):

        """
            Perform error correction for the given block and code vector based on the principle
            of minimizing the hamming distance between the code vector and the possible code words
        """
            
        min_hamming_distance = self.n + 1
        best_m_ = -1
        closest_code_word = []

        for i in range(1, 2 ** math.floor(math.log(block_product, 2))):
            code_word_i = []
            for coprime in block_coprimes:
                code_word_i.append(i % coprime)

            hamming_distance_i = self.calculate_hamming_distance(code_word_i, code_vector)
            if(hamming_distance_i<min_hamming_distance):
                min_hamming_distance = hamming_distance_i
                best_m_ = i
                closest_code_word = code_word_i

        return best_m_, closest_code_word


    def maximum_likelihood_error_correction(self, block_coprimes, block_product, code_vector, softmax_outputs):

        """
            Perform error correction for the given block and code vector based on the principle
            of maximizing the probability of misrecognition of a possible code word as the given code vector
        """
        
        min_hamming_distance = self.n + 1
        closest_code_words = []
        closest_m_ = []

        for i in range(1, 2 ** math.floor(math.log(block_product, 2))):
            code_word_i = []
            for coprime in block_coprimes:
                code_word_i.append(i % coprime)

            hamming_distance_i = self.calculate_hamming_distance(code_word_i, code_vector)
            if(hamming_distance_i<min_hamming_distance):
                min_hamming_distance = hamming_distance_i
                closest_code_words.clear()
                closest_m_.clear()
                closest_code_words.append(code_word_i)
                closest_m_.append(i)
            elif(hamming_distance_i==min_hamming_distance):
                closest_code_words.append(code_word_i)
                closest_m_.append(i)

        true_value_probab = [] # probability that the i-th code word is the true embedded code vector
        for code_word_i in closest_code_words:
            probab_ri_r = 1
            for j, (rij, rj) in enumerate(zip(code_word_i, code_vector)):
                if(rij!=rj):
                    probab_ri_r *= (softmax_output[j][rij])
            true_value_probab.append(probab_ri_r)

        max_probab_index = np.argmax(true_value_probab, axis=0)
        true_code_word = closest_code_words[max_probab_index]
        true_m_ = closest_m_[max_probab_index]

        return true_m_, true_code_word
        
    def decode(self, code_vector, text, perform_error_correction=True, error_correction_method=None, softmax_output=None):

        """
            Decodes the encrypted bit stream given the code vector generated by the CNN network and the text
            Also performs error correction based on hamming distance

            code_vector: Recognised glyph sequence from the CNN network
            text: Original text in which the bit stream was encrypted
            perform_error_correction: bool: whether to perform error correction
            error_correction_method: string:
                option:
                    hamming_distance
                    maximum_likelihood
        """

        all_text_characters = [*text]

        # Remove characters that cannot be perturbed
        perturble_text_characters = list(filter(lambda x: x in self.PERTURBLE_CHARACTERS, all_text_characters))

        # Generate text blocks
        text_blocks = []
        for i in range(0, len(perturble_text_characters), self.n):
            text_blocks.append(perturble_text_characters[i : i + self.n])

        assert len(text_blocks)==(len(code_vector) / self.n), f"Number of blocks in code vector({(len(code_vector) / self.n)}) " + \
                                                        f"does not match the text({len(text_blocks)})."
        
        decoded_bit_stream = ""
        decoding_error_blocks = []
        for index, block in enumerate(text_blocks):
            block_coprimes, block_product = self.generate_coprimes(block)
            self.decoding_coprimes[index + 1] = block_coprimes

            code_vector_ = code_vector[index * self.n : (index + 1) * self.n]
            m_, error_detected = self.decode_block(block_coprimes, block_product, code_vector_)
            if(error_detected and perform_error_correction):
                assert error_correction_method is not None, "Error Correction method not specified"
                decoding_error_blocks.append(index)
                if(error_correction_method=="hamming_distance"):
                    m_, true_code_word_ = self.minimal_hamming_distance_error_correction(block_coprimes, block_product, code_vector_)
                elif(error_correction_method=="maximum_likelihood"):
                    if(softmax_output is not None):
                        m_, true_code_word_ = self.maximum_lieklihoof_error_correction(block_coprimes, block_product, code_vector, softmax_output)
                    else:
                        print("Maximum likelihood error correction chosen but softmax outputs not provided")
                        return None
                else:
                    print(f"{error_correction_method} is not a valid option.")
                    return None
     
            block_bit_stream = self.decimal_to_binary(m_, math.floor(math.log(block_product, 2)))
            decoded_bit_stream += block_bit_stream

        decoded_string = self.decode_ASCII_encoding(decoded_bit_stream)        

        return decoded_string, decoding_error_blocks

In [6]:
def generate_errors(n, codebook_glyph_counts, text, encoded_vector, num_errors, random_seed=42):

    """ For testing purposes. Generates a random set of errors in the given encoding vector """
    
    random.seed(random_seed)
    num_blocks = math.ceil(len(encoded_vector) / n)
    error_blocks = random.sample(range(num_blocks), num_errors)

    for index in error_blocks:
        left = index * n
        right = ((index + 1) * n) - 1
        if(right>=len(encoded_vector)):
            right = len(encoded_vector) - 1
        
        error_index = random.randint(left, right)
        if(encoded_vector[error_index]>0):
            encoded_vector[error_index] -= 1
        else:
            encoded_vector[error_index] += 1
    return encoded_vector

In [7]:
""" Encoding """

random_seed = 420
codebook = generate_sample_codebook(random_seed=random_seed)
n = 5
k = 3

encoder = Encoding(codebook=codebook,
                  n=n,
                  k=k)

text = "Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged."
secret_message = "This is a secret message in dummy text"

encoding_start_time = time.time()
encoded_text, encoded_blocks = encoder.encode(secret_message, text)
encoding_end_time = time.time()
print("Length of text:", len(text))
print("Length of secret message:", len(secret_message))
print("Time taken to encode text:", encoding_end_time - encoding_start_time)

Maximum length of secret message: 39
Length of text: 366
Length of secret message: 38
Time taken to encode text: 3.7673206329345703


In [8]:
""" Decoding without errors """

decoder = Decoding(codebook, n, k)

decoding_start_time = time.time()

decoded_message, erroneous_blocks = decoder.decode(encoded_blocks, text)

decoding_end_time = time.time()

print("Time taken to decode text: ", decoding_end_time - decoding_start_time)
print("Encrypted Message:", decoded_message)
print(f"{len(erroneous_blocks)} errors detected in the following blocks:{erroneous_blocks}")

Time taken to decode text:  3.8058276176452637
Encrypted Message: This is a secret message in dummy text
0 errors detected in the following blocks:[]


In [9]:
""" Decoding with errors """

codebook_glyph_counts = encoder.get_codebook_glyph_counts()
num_errors = 8
encoded_blocks_ = encoded_blocks[ : ]
random_seed = 690
erroneous_encoded_blocks = generate_errors(n, codebook_glyph_counts, text, encoded_blocks_, num_errors, random_seed)

perform_error_correction = True
error_correction_method = "hamming_distance"

decoding_start_time = time.time()

decoded_message, erroneous_blocks = decoder.decode(erroneous_encoded_blocks, text, perform_error_correction, error_correction_method)

decoding_end_time = time.time()

print("Time taken to decode text: ", decoding_end_time - decoding_start_time)
print("Encrypted Message:", decoded_message)
print(f"{len(erroneous_blocks)} errors detected in the following blocks:{erroneous_blocks}")

Time taken to decode text:  3.806405544281006
Encrypted Message: This is a secret message in dummy text
8 errors detected in the following blocks:[4, 11, 20, 29, 30, 46, 50, 58]
