# Rekonstruktion von Morse-Codes mithilfe von LSTM-Netwerken

Ziel dieses Projektes ist es, die Leerzeichen beziehungsweise die Pausen zwischen den Darstellungen einzelner Buchstaben im Morsecode mithilfe eines rekurrenten LSTM-Netzwerkes zu rekonstruieren. Obwohl durch das Weglassen der Leerzeichen im Prinzip Information verloren geht, ergeben meistens nur bestimmte Positionen der Leerzeichen sinnvolle Wörter.

In [774]:
import tensorflow as tf
import numpy as np

## Morse-Kodierung

Zuerst schreiben wir eine Funktion `to_morse`, die ein Wort in Morse-Code übersetzt

In [775]:
morse_code = {
    'A': '.-',     'B': '-...',   'C': '-.-.',   'D': '-..',    'E': '.',      'F': '..-.',
    'G': '--.',    'H': '....',   'I': '..',     'J': '.---',   'K': '-.-',    'L': '.-..',
    'M': '--',     'N': '-.',     'O': '---',    'P': '.--.',   'Q': '--.-',   'R': '.-.',
    'S': '...',    'T': '-',      'U': '..-',    'V': '...-',   'W': '.--',    'X': '-..-',
    'Y': '-.--',   'Z': '--..' }

In [776]:
def to_morse_word(word):
    word = word.upper()
    if not all([x in morse_code for x in word]):
        raise Exception("Wort enthält Zeichen außerhalb des englischen Alphabets")
    return ' '.join([morse_code[x] for x in word])
def to_morse(phrase):
    return 'X'.join([to_morse_word(word) for word in phrase.split(" ")])

Test der Funktion:

In [777]:
to_morse("SOS V")

'... --- ...X...-'

## Eingabe und Ausgabe des neuronalen Netzwerkes

Wir müssen uns zuerst fragen, wie die Ein- und Ausgabe des Netzwerkes kodiert sein soll. Wie alle neuronalen Netze besteht die Eingabe eines LSTM-Modells nicht aus Zeichen, sondern aus Gleitkommazahlen.

Die Eingabe soll ein Morse-Code sein. Da die Leerzeichen weggelassen wurden, besteht dieser nur aus zwei Zeichen. Ein häufiges Verfahren für die Kodierung kategorischer Daten ist das *one-hot encoding*. Die folgende Funktion implementiert diese Codierung der Eingabe, wobei die Leerzeichen im Morsecode automatisch weggelassen werden.

**Look-ahead** Es ist schwierig für das neuronale Netz, zu entscheiden, wo die Morse-Codes unterbrochen werden sollen, ohne wenigstens ein paar Zeichen vorwärts schauen zu können. Daher verschiebe ich einfach Ein- und Ausgabe mithilfe eines einstellbaren look-aheads gegeneinander. Anders gesagt, das neuronale Netz muss erst einige Zeichen später signalisieren, dass ein Leerzeichen eingefügt werden soll.

In [778]:
lookahead = 7

Die Eingabe wird als `float`-Array kodiert, damit man sie direkt ins neuronale Netzwerk einspeisen kann.

In [779]:
def encode_input(morse):
    encodings = { '.': [1, 0, 0], '-': [0, 1, 0], 'X': [0, 0, 1] }
    return np.array([encodings[x] for x in morse + lookahead*"X" if x != ' ']).astype(float)

In [780]:
encode_input(to_morse("SOS"))

array([[1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [0., 1., 0.],
       [0., 1., 0.],
       [0., 1., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [0., 0., 1.],
       [0., 0., 1.],
       [0., 0., 1.],
       [0., 0., 1.],
       [0., 0., 1.],
       [0., 0., 1.],
       [0., 0., 1.]])

Die Ausgabe des Netzwerkes möchte ich so machen, dass das Netz für jedes Eingabezeichen entscheiden soll, ob nach diesem Zeichen ein Leerzeichen wahrscheinlich ist. Die gewünschte Ausgabe ist also `1`, wenn auf ein Zeichen im ursprünglichen Morsecode ein Leerzeichen folgt, und sonst `0`. Für das letzte Zeichen macht es Sinn, `1` vorzuschreiben, da das Wortende ja auch ein Buchstabenende ist. Die folgende Funktion implementiert die gewünschte Ausgabe:

In [781]:
def compute_target(morse):
    return np.array(lookahead*[0] + [int((b == " ") | (b == "X")) for a,b in zip(morse[:-1], morse[1:]) if a != " "] + [1]).astype(float)

In [782]:
compute_target(to_morse("SOS V"))

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 1., 0., 0., 1., 0.,
       0., 0., 0., 1.])

Bei "SOS" haben alle Buchstaben drei Zeichen, deshalb besteht die gewünscht Ausgabe aus drei gleichen Teilen.

## Trainings-Daten generieren

In [783]:
with open("words.txt", "r") as f:
    words = f.read()

In [784]:
[x for x in words if x.upper() not in morse_code and x != "\n"]

[]

In [785]:
words = words.split("\n")[:-1]

In [786]:
words += 300 * ["the", "to", "of", "with", "and"]

In [787]:
def get_random_phrase(length):
    phrase = ""
    while len(phrase) < length:
        if phrase != "":
            phrase += " "
        phrase += words[np.random.randint(len(words))]
    return phrase
def make_training_data(length):
    phrase = get_random_phrase(length)
    morse = to_morse(phrase)
    inputs = encode_input(morse)
    targets = compute_target(morse)
    inputs = inputs[:length]
    targets = targets[:length]
    return inputs, targets

In [788]:
get_random_phrase(50)

'of northern less observe to to suffer of support of'

In [789]:
def make_batches(n_batches, length):
    inputs, targets = [np.zeros((n_batches, length, k)) for k in [3,1]]
    for i in range(n_batches):
        inputs[i], targets[i,:,0] = make_training_data(length)
    return inputs, targets

## Trainings-Daten aus Text

In [790]:
with open("dataset0.txt", "r") as f:
    text = f.read()
text = "".join([x for x in text if x == "\n" or (x.upper() in morse_code)])
text = text.replace("\n", " ")
text = " ".join(text.split())

In [791]:
text[:500]

'TheProjectGutenbergeBookofBedouinsbyJamesHuneker ThiseBookisfortheuseofanyoneanywhereintheUnitedStatesand mostotherpartsoftheworldatnocostandwithalmostnorestrictions whatsoeverYoumaycopyitgiveitawayorreuseitundertheterms oftheProjectGutenbergLicenseincludedwiththiseBookoronlineat wwwgutenbergorgIfyouarenotlocatedintheUnitedStatesyou willhavetocheckthelawsofthecountrywhereyouarelocatedbefore usingthiseBook TitleBedouins AuthorJamesHuneker ReleaseDateMarcheBook LanguageEnglish ProducedbyTimLindell'

In [792]:
def make_batches(length):
    pos = np.random.randint(len(text) - length + 50)
    subtext = text[pos : pos + length + 50]
    idx = subtext[:50].find(" ")
    if idx > 0:
        subtext = subtext[idx : length + idx]
    morse = to_morse(subtext)
    inputs = encode_input(morse)
    targets = compute_target(morse)
    inputs = inputs[:length]
    targets = targets[:length]
    return inputs, targets

def make_batches(n_batches, length):
    inputs, targets = [np.zeros((n_batches, length, k)) for k in [3,1]]
    for i in range(n_batches):
        inputs[i], targets[i,:,0] = make_batches(length)
    return inputs, targets

## Definition und Training des Modells

In [822]:
model = tf.keras.Sequential()
model.add(tf.keras.layers.LSTM(250, return_sequences=True))
#model.add(tf.keras.layers.Dense(150, activation='relu'))
model.add(tf.keras.layers.LSTM(250, return_sequences=True))
#model.add(tf.keras.layers.Dense(500, activation='relu'))
model.add(tf.keras.layers.Dense(500, activation='relu'))
model.add(tf.keras.layers.LSTM(60, return_sequences=True))
model.add(tf.keras.layers.Dense(1))

In [823]:
model.compile(optimizer=tf.optimizers.Adam(), loss = tf.losses.BinaryCrossentropy(from_logits=True))

In [830]:
example_while_training = "These were slim not her singing"

In [None]:
n_iter =1000
for i in range(n_iter):
    inputs, targets = make_batches(400, 150)
    print("STEP {} / {}".format(i, n_iter))
    model.fit(inputs, targets, epochs = 3)
    print(morse_decode(insert_spaces(remove_spaces(to_morse(example_while_training)))))

STEP 0 / 1000
Epoch 1/3
Epoch 2/3
Epoch 3/3
THIS WEL ?UT NOT HER SINGING
STEP 1 / 1000
Epoch 1/3
Epoch 2/3
Epoch 3/3
THIS WEL SLUT NOT HER SINGING
STEP 2 / 1000
Epoch 1/3
Epoch 2/3
Epoch 3/3
THIS WINE SLUT NOT HER SINGING
STEP 3 / 1000
Epoch 1/3
Epoch 2/3
Epoch 3/3
THIS WINE IIDUT NOT HER SINGING
STEP 4 / 1000
Epoch 1/3
Epoch 2/3
Epoch 3/3
DEEHE WERE HDEW NOT HER SINGING
STEP 5 / 1000
Epoch 1/3
Epoch 2/3
Epoch 3/3
DEEHE WINE SEDEAT N? HER SINGING
STEP 6 / 1000
Epoch 1/3
Epoch 2/3

## Morse-Dekodieren zum Testen

In [None]:
morse_inverse = { code: letter for letter, code in morse_code.items() }

In [None]:
def morse_decode_word(with_spaces):
    codes = with_spaces.split(" ")
    return "".join([morse_inverse[x] if x in morse_inverse else "?" for x in codes])
def morse_decode(s):
    return " ".join([morse_decode_word(x) for x in s.split("X")])

In [None]:
def remove_spaces(s):
    return "".join([x for x in s if x != " "])

In [None]:
def insert_spaces(s_no_spaces, output=None):
    inp_encode = encode_input(s_no_spaces)
    if output is None:
        output = model(inp_encode.reshape(1,-1,3)).numpy()[0,:,0][lookahead:]
    with_spaces = ""
    for i in range(len(s_no_spaces)):
        char = s_no_spaces[i]
        with_spaces += char
        if output[i] > 0 and i < len(s_no_spaces) - 1 and s_no_spaces[i+1] != "X":
            with_spaces += " "
    return with_spaces

In [None]:
morse_orig = to_morse("printer test")
print(morse_orig)

In [None]:
nospace = remove_spaces(morse_orig)
print(nospace)

In [None]:
reconstructed = insert_spaces(nospace)
print(reconstructed)

In [None]:
reconstructed2 = insert_spaces(nospace, compute_target(morse_orig)[lookahead:])
print(reconstructed2)
print(morse_decode(reconstructed2))

In [None]:
morse_decode(morse_orig)

In [None]:
morse_decode(reconstructed)