In [1]:
from collections import deque
from sklearn.feature_extraction import DictVectorizer
import tensorflow as tf
import numpy as np
import xml.etree.ElementTree as et

vowel_table = {"a" : ["á"], "e" : ["é"], "i" : ["í"], "o" : ["ó", "ö", "ő"], "u" : ["ú", "ü", "ű"]}
vectorizer = DictVectorizer()

In [2]:
def ispunct(c):
    punctuations = "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"
    for char in punctuations:
        if (c == char):
            return True
    return False


def isalpha(c):
    alphabet = "abcdefghijklmnopqrstuvwxyz"
    for char in alphabet:
        if (c == char):
            return True
    return False

# reduces the number of different characters to 30
def normalize_character(c):
    if (c.isspace()):
        return ' '
    if (c.isdigit()):
        return '0'
    if (ispunct(c)):
        return '_'
    if (isalpha(c)):
        return c
    return '*'


def normalize_text(text):
    normalized = ""
    for c in text:
        normalized += normalize_character(c)
    return normalized

In [245]:
def deaccentize(text):
    text = text.replace("á", "a")
    text = text.replace("é", "e")
    text = text.replace("í", "i")
    text = text.replace("ó", "o")
    text = text.replace("ö", "o")
    text = text.replace("ő", "o")
    text = text.replace("ú", "u")
    text = text.replace("ü", "u")
    text = text.replace("ű", "u")

    return text

def create_row(window, window_size):
    row = {}

    for i in range(-window_size, window_size + 1):
        row[i] = normalize_character(deaccentize(window.popleft()))

    del row[0]

    return row

def get_y(vowel):
    if vowel in "aei":
        return [1, 0]
    if vowel in "áéí":
        return [0, 1]
    if vowel in "ou":
        return [1, 0, 0, 0]
    if vowel in "óú":
        return [0, 1, 0, 0]
    if vowel in "öü":
        return [0, 0, 1, 0]
    if vowel in "őű":
        return [0, 0, 0, 1]

# returns
# x_e: list of windows with the given window_size
# y_e: one-hot encoded values
def prepare_text(text, window_size, vowel):
    x_e = []
    y_e = []
    lower_text = text.lower()

    window = deque((), window_size * 2 + 1)
    for i in range(window.maxlen):
        window.append("_")
        lower_text += "_"

    for character in lower_text:
        window.append(character)
        
        if (window[window_size] == vowel) or (window[window_size] in vowel_table[vowel]):
            x_e.append(create_row(window.copy(), window_size))
            y_e.append(get_y(window[window_size]))

    return x_e, y_e

def prepare_words(words, window_size, vowel):
    x_e = []
    y_e = []
    
    for word in words:
        skip = True
        if vowel in word:
            skip = False
        for c in vowel_table[vowel]:
            if c in word:
                skip = False
        if skip:
            continue
            
        x, y = prepare_text(word, window_size, vowel)

        if x_e == []:
            x_e = vectorizer.transform(x).toarray()
        else:
            tmp = vectorizer.transform(x)
            for tx in tmp:
                #x_e += tx.toarray()
                x_e = np.concatenate((x_e, tx.toarray()))
        y_e += y
        
    return x_e, y_e

In [246]:
# generates template windows for the alphabet
def generate_windows(window_size):
    windows = []
    alphabet = "abcdefghijklmnopqrstuvwxyz 0_*"
    alphabet_size = len(alphabet)

    for i in range(alphabet_size):
        new_window = {}

        end_of_slice = i + window_size * 2
        if end_of_slice <= alphabet_size:
            alphabet_slice = alphabet[i:end_of_slice]
        else:
            alphabet_slice = alphabet[i:alphabet_size]
            alphabet_slice += alphabet[0:end_of_slice - alphabet_size]

        for j in range(window_size):
            new_window[-1 * (j + 1)] = alphabet_slice[window_size - 1 - j]
            new_window[j + 1] = alphabet_slice[window_size + j]

        windows.append(new_window)

    return windows

In [247]:
def read_corpus(count):
    corpus = open("corpus")
    words = []
    
    for i in range(4):
        next(corpus)
        
    for line in corpus:
        splits = line.split()
        if splits != []:
            words.append(splits[0])
            if count < 0:
                break
            count -= 1
        
    return words

In [248]:
vectorizer.fit(generate_windows(4))

DictVectorizer(dtype=<class 'numpy.float64'>, separator='=', sort=True,
        sparse=True)

In [249]:
count = 10000
words = read_corpus(count)

vowel = "e"
p_x, p_y = prepare_words(words, 4, vowel)
np.savez("prepared_" + vowel, x = p_x, y = p_y)



In [250]:
import random

def next_batch(data_x, data_y, count):
    batch_x, batch_y = [], []
    indexes = random.sample(range(len(data_x)), count)
    
    for i in indexes:
        batch_x.append(data_x[i])
        batch_y.append(data_y[i])
        
    return batch_x, batch_y

In [251]:
def convert_time(time):
    m, s = divmod(time, 60)
    s, ms = divmod(s, 1)

    return "%02d:%02d:%04d" % (m, s, ms * 1000)

In [252]:
input_size = 240
output_size = 2

n_input = tf.placeholder(tf.float32, [None, input_size])
n_output = tf.placeholder(tf.float32, [None, output_size])

hidden_neurons = 10

b_hidden = tf.Variable(tf.random_normal([hidden_neurons]))
W_hidden = tf.Variable(tf.random_normal([input_size, hidden_neurons]))
hidden = tf.sigmoid(tf.matmul(n_input, W_hidden) + b_hidden)

W_output = tf.Variable(tf.random_normal([hidden_neurons, output_size]))
# output = tf.sigmoid(tf.matmul(hidden, W_output))
output = tf.nn.softmax(tf.matmul(hidden, W_output))

# cost = tf.reduce_mean(tf.square(n_output - output))
cost = tf.reduce_mean(-tf.reduce_sum(n_output * tf.log(output), reduction_indices=[1]))

# optimizer = tf.train.GradientDescentOptimizer(0.5)
optimizer = tf.train.AdamOptimizer()
train = optimizer.minimize(cost)

init = tf.initialize_all_variables()

In [264]:
import random
import time

from sklearn.model_selection import train_test_split

sess = tf.Session()
sess.run(init)
print("training started")

# data_train = np.load("train_e.npz")
# data_test = np.load("test_e.npz")

prepared_data = np.load("prepared_e.npz")
train_x, test_x, train_y, test_y = train_test_split(prepared_data["x"], prepared_data["y"], test_size = 0.3)
print(len(train_x))
print(len(test_x))
print(len(train_y))

saver = tf.train.Saver()

loss = 100

start_time = time.perf_counter()
for i in range(2001):
    batch_x, batch_y = next_batch(train_x, train_y, 100)
    cvalues = sess.run([train, cost, W_hidden, b_hidden, W_output], feed_dict={n_input: batch_x, n_output: batch_y})
#     cvalues = sess.run([train, cost, W_hidden, b_hidden, W_output], feed_dict={n_input: data_train["x"], n_output: data_train["y"]})

    # early stopping
    if i % 50 == 0 and cvalues[1] < loss:
        loss = cvalues[1]
        saver.save(sess, "session")
#         print("a best loss: {}".format(cvalues[1]))
        

#     if i < 101 and i % 100 == 0:
#         print("")
#         print("")
#         print("step: {:>3}".format(i))
#         print("loss: {}".format(cvalues[1]))
    if i % 1000 == 0:
        print("")
        print("step: {:>3}".format(i))
        print("loss: {}".format(cvalues[1]))
    elif i % 50 == 0:
        print('|', end="")

print("")
print("elapsed time: " + convert_time(time.perf_counter() - start_time))
print("best loss: " + str(loss))

result = sess.run(output, feed_dict={n_input: test_x})
saver.restore(sess, "session")
result_best = sess.run(output, feed_dict={n_input: test_x})

training started
5382
2307
5382

step:   0
loss: 0.9193103313446045
|||||||||||||||||||
step: 1000
loss: 0.4019467830657959
|||||||||||||||||||
step: 2000
loss: 0.31510597467422485

elapsed time: 03:07:0725
best loss: 0.22418


In [267]:
def decide(y):
#     tmp = [0, 0, 0, 0]
    tmp = [0, 0]
    tmp[list(y).index(max(y))] = 1
    return tmp
#     if y[0] > y[1]:
#         return np.array([1, 0])
#     else:
#         return np.array([0, 1])
    
success = 0

for i in range(len(result)):
    if np.array_equal(decide(result[i]), test_y[i]):
        success += 1
        
print("accuracy: " + str(success / len(result)))


success = 0

for i in range(len(result_best)):
    if np.array_equal(decide(result_best[i]), test_y[i]):
        success += 1
        
print("accuracy (best): " + str(success / len(result_best)))
print(len(train_x))
print(len(test_x))

accuracy: 0.857824013870828
accuracy (best): 0.8573905504984829
5382
2307


In [270]:
test_data = "Zsarnoki törvény és erkölcsi zsarnokság egyaránt nyomasztó."
tmp_x, test_y = prepare_text(test_data, 4, "e")
test_x = vectorizer.transform(tmp_x).toarray()

print(test_y)

[[0, 1], [0, 1], [1, 0], [1, 0]]


In [271]:
result = sess.run(output, feed_dict={n_input: test_x})

for i in result:
    print(decide(i))

[0, 1]
[0, 1]
[1, 0]
[1, 0]
