# Helper functions

In [1]:
import numpy as np
import string
import random
import io

In [2]:
# From Xavier
def strToBits(string):
    res = []
    byte_string = string.encode('utf-8')
    for b in byte_string:
        bit_array = bin(b)[2:]
        bit_array = '00000000'[len(bit_array):] + bit_array
        res.extend(bit_array)
    return res

def stringToChannelInput(string):
    bits = np.array(strToBits(string), dtype='int64')
    return 2*bits - 1

def channelOutputToString(channel_output):
    bits = ((channel_output+1)/2).astype('int64').tolist()
    byte_string = ""
    for char_index in range(len(bits)//8):
        bit_list = bits[char_index*8:(char_index+1)*8]
        byte = chr(int(''.join([str(bit) for bit in bit_list]), 2))
        byte_string += byte
    return byte_string

In [36]:
# From handout
def channel(chan_input):
    chan_input = np.clip(chan_input,-1,1)
    erasedIndex = np.random.randint(3)
    # print("erase index is: " + str(erasedIndex))
    chan_input[erasedIndex:len(chan_input):3] = 0
    return chan_input + np.sqrt(10)*np.random.randn(len(chan_input))

In [3]:
def channelWoNoise(chan_input, erasedIndex):
    """
    Emulates communication channel without noise.
    erasedIndex is used to specify H
    """
    chan_input = np.clip(chan_input,-1,1)
    chan_input[erasedIndex:len(chan_input):3] = 0
    return chan_input

In [5]:
def generateTestString(characters=80):
    """
    Function to generate a random utf-8 encoded string.
    """
    return ''.join(random.choices(string.ascii_letters + string.digits + string.punctuation, k=characters))

In [6]:
def generateTestFile(characters=80, filename="scratch"):
    """
    Function to generate a file containing a random utf-8 encoded string.
    Returns the generated text.
    """
    text = generateTestString(characters)
    with io.open(filename+".txt", "w", encoding='utf8') as f:
        f.write(text)
    return text

In [10]:
def readTestFile(filename="scratch"):
    """
    Function to read a text file as channel input.
    Returns the file text in the channel input format.
    """
    text = ''
    with io.open(filename+".txt", encoding='utf8') as f:
        text = f.read()
    return stringToChannelInput(text)

In [7]:
def encodeChannelInput(chan_input, repetitions):
    """
    Makes channel input ready for transmission.
    Each input bit pair is mapped to a length 3 signal which is then repeated
    """
    # map each input pair to the signal whose two first elements are said pair
    def encodeBitPairs(bits):
        if (np.array_equal(bits, [1, 1])):
            return signal_set[0]
        elif (np.array_equal(bits, [1, -1])):
            return signal_set[1]
        elif (np.array_equal(bits, [-1, 1])):
            return signal_set[2]
        elif (np.array_equal(bits, [-1, -1])):
            return signal_set[3]

    chan_input = np.split(chan_input, len(chan_input)/2)
    chan_input = np.array([encodeBitPairs(bit) for bit in chan_input])

    # repeat each signal for redundancy
    def repeat(bits):
        res = np.empty([repetitions, bits.shape[0]])
        res[:] = bits
        return res

    chan_input = np.array([repeat(bits) for bits in chan_input])

    # flatten result
    chan_input = chan_input.flatten()
    return chan_input

In [37]:
def decodeChannelOutput(chan_output, repetitions):
    """
    Decodes channel output.
    The value of the erasure index is computed for each signal and the most occuring one is kept.
    """
    # split output into repetition blocks
    chan_output = np.split(chan_output, len(chan_output)/(repetitions * len(signal_set[0])))

    # split repetition blocks into signals
    def splitRepetitionBlocks(repetition_block):
        return np.split(repetition_block, len(repetition_block)/len(signal_set[0]))

    chan_output = np.array([splitRepetitionBlocks(repetition_block) for repetition_block in chan_output])

    # decide on erasure index
    def decide_on_erasure_index(bits):
        return np.argmin(np.array([x**2 for x in bits]))
    
    def decide_on_erasure_index_block(repetition_block):
        return np.array([decide_on_erasure_index(bits) for bits in repetition_block])

    erasure_index_candidates = np.array([decide_on_erasure_index_block(repetition_block) for repetition_block in chan_output]).flatten()
    erasure_index = np.bincount(erasure_index_candidates).argmax()

    # print(str(erasure_index))

    # decide on index of codeword
    def decoder_H0(input):
        if (input[1] > 0 and input[2] > 0):
            return 0
        elif (input[1] > 0 and input[2] < 0):
            return 2
        elif (input[1] < 0 and input[2] < 0):
            return 1
        elif (input[1] < 0 and input[2] > 0):
            return 3

    def decoder_H1(input):
        if (input[0] > 0 and input[2] > 0):
            return 0
        elif (input[0] > 0 and input[2] < 0):
            return 1
        elif (input[0] < 0 and input[2] < 0):
            return 2
        elif (input[0] < 0 and input[2] > 0):
            return 3

    def decoder_H2(input):
        if (input[0] > 0 and input[1] > 0):
            return 0
        elif (input[0] > 0 and input[1] < 0):
            return 1
        elif (input[0] < 0 and input[1] < 0):
            return 3
        elif (input[0] < 0 and input[1] > 0):
            return 2

    def block_decoder(repetition_block):
        res = np.array([])
        # add to reduce the impact of the noise
        repetition_block = np.add.reduce(repetition_block)
        if (erasure_index == 0):
            return decoder_H0(repetition_block)
        elif (erasure_index == 1):
            return decoder_H1(repetition_block)
        elif (erasure_index == 2):
            return decoder_H2(repetition_block)

    chan_output = np.array([block_decoder(repetition_block) for repetition_block in chan_output])

    # reconstitute channel input
    def retrieveInput(index):
        return signal_set[index][:2]

    chan_output = np.array([retrieveInput(i) for i in chan_output])
    chan_output = chan_output.flatten()

    return channelOutputToString(chan_output)

In [48]:
signal_set = np.array([[1, 1, 1], [1, -1, -1], [-1, 1, -1], [-1, -1, 1]])
n_repetitions = 10
text_in = generateTestString(characters=80)
print("input: \t\t" + text_in)
chan_input = encodeChannelInput(stringToChannelInput(text_in), n_repetitions)
print("input length: \t" + str(len(chan_input)))
chan_output = channel(chan_input)
text_out = decodeChannelOutput(chan_output, n_repetitions)
print("output: \t" + text_out)

diff = sum(text_in[i] != text_out[i] for i in range(len(text_out)))
print("diff: \t\t" + str(diff))

input: 		`|\zh@s`=h)oo:23Zb[4#8:U""0i/kk*b~V:~Jk|HU#/\5lVUvERbL~-=g}B[7C{\|j6QYR!*`8Ku(8%
input length: 	9600
erase index is: 2
2
output: 	a=\rÈpj	oz+21ßcz(.E¢b4a/ik
p>V{^{NQU"'\ìRYrGbkM~m8gtæYVÇ[\^f?EIÞ)*`8ë=­9$
diff: 		66


# find best number of repetitions

In [89]:
def tryFor(repetitions_candidates, iterations):
    n_tests = np.arange(iterations)
    print("-----------------------------------------")
    for repetitions_candidate in repetitions_candidates:
        errors = 0
        for i in n_tests:
            text_in = generateTestString(characters=80)
            chan_input = encodeChannelInput(stringToChannelInput(text_in), repetitions_candidate)
            chan_output = channel(chan_input)
            text_out = decodeChannelOutput(chan_output, repetitions_candidate)
            diff = sum(text_in[i] != text_out[i] for i in range(len(text_out)))
            errors = errors + diff
        if errors == 0:
            print("No mistakes made with " + str(repetitions_candidate) + " repetitions!")
            break
        average = errors / iterations
        print("Average " + str(average) + "\t mistakes with \t" + str(repetitions_candidate) + " repetitions" )
    print("-----------------------------------------")

In [92]:
tryFor(np.arange(10, 500, 10), 50)

-----------------------------------------
Average 62.16	 mistakes with 	10 repetitions
Average 40.62	 mistakes with 	20 repetitions
Average 23.2	 mistakes with 	30 repetitions
Average 15.18	 mistakes with 	40 repetitions
Average 7.68	 mistakes with 	50 repetitions
Average 4.46	 mistakes with 	60 repetitions
Average 2.76	 mistakes with 	70 repetitions
Average 1.6	 mistakes with 	80 repetitions
Average 0.84	 mistakes with 	90 repetitions
Average 0.6	 mistakes with 	100 repetitions
Average 0.28	 mistakes with 	110 repetitions
Average 0.18	 mistakes with 	120 repetitions
Average 0.14	 mistakes with 	130 repetitions
Average 0.02	 mistakes with 	140 repetitions
Average 0.04	 mistakes with 	150 repetitions
Average 0.02	 mistakes with 	160 repetitions
Average 0.02	 mistakes with 	170 repetitions
No mistakes made with 180 repetitions!
-----------------------------------------


In [110]:
def computeAccuracy(tests, repetitions, tolerance):
    init = True
    errors = 0
    average = 0
    for i in np.arange(tests):
        text_in = generateTestString(characters=80)
        chan_input = encodeChannelInput(stringToChannelInput(text_in), repetitions)
        if (init):
            print("Input length is n=" + str(len(chan_input)))
            init = False
        chan_output = channel(chan_input)
        text_out = decodeChannelOutput(chan_output, repetitions)
        diff = sum(text_in[i] != text_out[i] for i in range(len(text_out)))
        if (diff > tolerance):
            errors += 1
        average += diff
    percentage = (tests - errors) / tests * 100
    print("Percentage of decodings with less than " + str(tolerance) + " mistakes in " + str(tests) + " attempts is " + str(percentage) + "%")

In [150]:
computeAccuracy(tests=100, repetitions=62, tolerance=10)

Input length is n=59520
Percentage of decodings with less than 10 mistakes in 100 attempts is 100.0%
