In [1]:
# This is a first attempt at a rudimentary sentence grammaticality classifier 
# It is meant as a feasibility test for the larger experiment and as a learning
# Experience for future endeavours

# Standard pytorch imports 
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random

#spacy model
import spacy
from spacy.tokenizer import Tokenizer
#load Spacy spanish model to handle tokenization of toy data
nlp = spacy.load('es_core_news_sm')

#other utilities
import numpy as np
from numpy.random import choice
import random
from collections import defaultdict
import math
from string import capwords

In [2]:
# Handles importing data, default version takes mini toy, further will take full sentences.

#Manual seed for consistency
random.seed(42)
#This is the kid version
# real_text = ["Los acontecimientos tienen lugar en una galaxia ficticia de nombre desconocido y en un tiempo no especificado.", "Además de la raza humana, son descritos muchos tipos de especies extraterrestres procedentes de los numerosos planetas y satélites que forman dicha galaxia y pertenecen a la alianza de planetas de la República Galáctica.", "Otros personajes recurrentes son los robots y los androides, creados generalmente para servir a un propósito, observándose así droides astromecánicos, médicos, de protocolo, de combate, entre otros.",
# "Los viajes espaciales son comunes y la mayoría de los planetas que aparecen en la saga están afiliados a la República Galáctica, la unión democrática que rige la galaxia y cuyo gobierno, presidido por un Canciller Supremo, está formado por representantes elegidos o designados de toda ella agrupados en el llamado Senado Galáctico, ubicado en el planeta Coruscant.", "En oposición a la República se encuentra la Confederación de Sistemas Independientes, siendo el enfrentamiento de ambas uno de los temas más importantes en la trama de las tres primeras películas de Star Wars.", "Uno de los elementos principales en la saga es «la Fuerza», un campo de energía metafísico y omnipresente creado por las cosas que existen, que impregna el universo y todo lo que hay en él.", "La Orden Jedi es una organización de caballeros unidos por su creencia y percepción de la Fuerza, que luchan por la paz y la justicia en la República Galáctica.","Se entrenan en el uso del sable de luz o espada láser, un arma similar a una espada tradicional salvo por el hecho que su hoja es un haz de energía.",
# "Los Jedi son capaces de manejar la Fuerza y lograr así habilidades como la telequinesis, la clarividencia, el control mental o una amplificación de los reflejos, la velocidad y otras capacidades físicas.", "No obstante y aunque dicho grupo la utiliza con fines positivos, tiene un lado oscuro provocado por la ira, el miedo y el odio.", "Este lado es usado por los sith con el fin de exterminar a los jedi y tomar el control de la Galaxia."]

#More real version that reads in a corpus file
input_corpus_filename = "euro.mini"
in_file = open(input_corpus_filename, "r")
real_text = []
numlines = 0
inter_excl=0
for line in in_file.readlines():
    #Keep only sentences, those have a period at the end (is support for ? and ! needed??)
    if line.strip() !="":
        if line.strip()[-1] == ".":
            real_text.append(line.strip())
        elif line.strip()[-1] == "?" or line.strip()[-1] == "!":
            inter_excl +=1
    numlines+=1

print("Full corpus has {} sentences, {} were dumped, among which {} interogatives or exclamatives".format(
                            len(real_text),numlines-len(real_text),inter_excl))

proportion_train = 0.8
cutoff = math.floor(len(real_text)*proportion_train)
random.shuffle(real_text)

real_train , real_test = real_text[:cutoff], real_text[cutoff:]

# Process the input sentences (for tokenization, tokenizer sucks otherwise)
parsed_real_train = [nlp(sentence) for sentence in real_train]
parsed_real_test = [nlp(sentence) for sentence in real_test]

#Extract the statististical info needed to generate unigram word salad
#Calculate average sentence length
lengths= [len(sent) for sent in parsed_real_train]
avg_sent_length = np.mean(lengths)
length_sd = np.std(lengths)
counts = defaultdict(int)
total = 0.0
for sentence in parsed_real_train:
    for token in sentence:
        if token.text != ".":
            counts[token.text.lower()] +=1
            total += 1

# TODO: implement a version where low frequency words are replaced by their tag
#Switch happaxes for the UNK token
hapaxes = []
counts["#unk"]=0
for key in counts:
    if counts[key] == 1:
        counts["#unk"] += 1
        hapaxes.append(key)

for hapax in hapaxes:
    counts.pop(hapax)

vocabulary = []
probdist = []
for key in counts:
    vocabulary.append(key)
    probdist.append(counts[key])

# In tokenized we only retain the sentences as an array of words and we implement the replacements.
# We pre tokenize test data taking only the hapaxes from train out 


def token_replacement(parsed_sentences, hapaxes):
    # Takes a list of sentences that have gone through the spacy pipeline
    # (sentences have Doc type)
    # Returns a list of sentences, each of which is a list of words (str)
    # Words specified in hapaxes are replaced by UNK
    # TODO: implement a version that replaces words by their tag instead
    tokenized = []
    for sentence in parsed_sentences:
        this_sentence = []
        for token in sentence:
            if token.text.lower() in hapaxes:
                this_sentence.append("#UNK")
            else:
                this_sentence.append(token.text)
        tokenized.append(this_sentence)
    return tokenized

# Get the sentences represented as lists of words
tokenized_real_train = token_replacement(parsed_real_train, hapaxes)
tokenized_real_test = token_replacement(parsed_real_test, hapaxes)



def generateWS(vocab, probdist, avg_length,sd):
    # Method to generate one word salad sentence usin unigram distribution
    # Vocab is a list of vocabulary words
    # probdist contains the probabilities of vocabulary words in same order
    # avg_length is the average length of sentences
    # sd is the standar deviation for the legths of sentences
    
    #Draw the length
    length= math.floor(random.gauss(avg_length, sd))
    if length<6:
        length = 6
    #Draw the words
    draw= choice(vocab, length, probdist).tolist()
    #Assemble the sentence
    sentence = [capwords(draw.pop(0))]
    while draw:
        next_word = draw.pop(0)
        #special case for punctuation that needs to be closed
        if next_word in ["(","«"]:
            try:
                sentence.append(next_word) 
                sentence.append(draw.pop(0))
                closing = ""
                if next_word == "(":
                    closing = ")"
                elif next_word == "«":
                    closing = "»"
                draw.insert(random.randint(0,len(draw)),closing)
            except IndexError:
                break
        elif next_word not in [")","»"]:
            sentence.append(next_word)
    sentence.append(".")
    return sentence

# get a list of word salads the same length as the real test data    
word_salads_train = [generateWS(vocabulary, 
                          probdist,avg_sent_length,length_sd) for _ in range(len(tokenized_real_train))]
word_salads_test = [generateWS(vocabulary, 
                          probdist,avg_sent_length,length_sd) for _ in range(len(tokenized_real_test))]

#Consolidate training data
labeled_sentences_train = [[sentence, 1] for sentence in tokenized_real_train]
labeled_sentences_train += [[sentence, 0] for sentence in word_salads_train]
random.shuffle(labeled_sentences_train)

#Consolidate test data
labeled_sentences_test = [[sentence, 1] for sentence in tokenized_real_test]
labeled_sentences_test += [[sentence, 0] for sentence in word_salads_test]
random.shuffle(labeled_sentences_test)

#Define the vocabulary and word ids
vocabulary.append(".")

word_to_ix = {}
for word in vocabulary:
    word_to_ix[word] = len(word_to_ix)

print("Done, you now have {} train instances and {} test instancess:".format(len(labeled_sentences_train),len(labeled_sentences_test)))

Full corpus has 947 sentences, 53 were dumped, among which 33 interogatives or exclamatives
Done, you now have 1514 train instances and 380 test instancess:


In [5]:
# Now we define the Neural network


class Linguo(nn.Module):
    def __init__(self,embedding_dim, vocab_size, lstm_dim , hidden_dim):
        super(Linguo,self).__init__()
        # Store the hidden layer dimension
        self.hidden_dim = hidden_dim
        # Define word embeddings
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        # Define LSTM
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        #Define hidden linear layer
        self.hidden2dec = nn.Linear(hidden_dim, 2)
        #Define the hidden state
        self.hstate = self.init_hstate()
        
    def forward(self, inputsentence):
        linguo.hstate = linguo.init_hstate()
        embeds = self.word_embeddings(inputsentence)
        lstm_out, self.hstate = self.lstm(embeds.view(len(inputsentence),1, -1), self.hstate)
        decision_lin = self.hidden2dec(lstm_out[-1])
        #print(decision_lin)
        decision_fin = F.log_softmax(decision_lin)
        return decision_fin
        
        
    def init_hstate(self):
        var1 = autograd.Variable(torch.zeros(1, 1, self.hidden_dim)) 
        var2 = autograd.Variable(torch.zeros(1, 1, self.hidden_dim))
        hidden_state = (var1, var2)
        return hidden_state
        
        
def prepare_input(word_to_ix, sentence):
    idxs = []
    for word in sentence:
        if word in word_to_ix:
            idxs.append(word_to_ix[word.lower()])
        else:
            idxs.append(word_to_ix["#unk"])
    tensor = torch.LongTensor(idxs)
    return autograd.Variable(tensor)
     


In [6]:
# Training time! Cue Eye of the Tiger
embed_dim = 32
lstm_dim =32
voc_size = len(word_to_ix)
hidden_dim = 64
epochs = 50
linguo = Linguo(embed_dim, voc_size, lstm_dim, hidden_dim) 
optimizer = optim.SGD(linguo.parameters(),lr=0.1)
loss_function = nn.NLLLoss()
learning_rate=0.1

for i in range(epochs):
    epoch_loss = 0
    random.shuffle(labeled_sentences_train)
    for data, label in labeled_sentences_train:
        # Restart gradient
        linguo.zero_grad()
        
        
        # Run model
        in_sentence = prepare_input(word_to_ix,data)
        target = autograd.Variable(torch.LongTensor([label]))
        prediction = linguo(in_sentence)
        #Calculate loss and backpropagate
        
        #Squared Loss
        #loss = torch.pow(target-prediction.view(1),2)
        loss = loss_function(prediction,target) 
        
        loss.backward()
        optimizer.step()
        #for parameter in linguo.parameters():
        #   parameter.data.sub_(parameter.grad.data*learning_rate)
        epoch_loss += loss.data[0]
    print("{}:{}".format(i,epoch_loss))
        

0:397.17325605871156
1:95.80154102374217
2:26.846176105420454
3:9.32045095074318
4:1.9972047887604276
5:1.0209774740824287
6:0.7122335123187895
7:0.5341757699685559
8:0.4260383539068471
9:0.352865827917185
10:0.3019765468444007
11:0.26151024713959714
12:0.22952918588214288
13:0.20460415455954717
14:0.1838120420393352
15:0.16593028157032563
16:0.1513145843937309
17:0.13924814194717783
18:0.128394508952141
19:0.11897931548620022
20:0.11087066511353783
21:0.10357774431140854
22:0.09704936164791889
23:0.09134159603726744
24:0.08617626974979942
25:0.08147209621515117
26:0.07720858383913765
27:0.07347065155070709
28:0.06993300837140026
29:0.06669293774456264
30:0.06373165940851777
31:0.060950940778710105
32:0.05848273513673519
33:0.05616594970850741
34:0.05399038397673195
35:0.05196967551303544
36:0.050079557427800125
37:0.04831565696375861
38:0.046660342335101745
39:0.04512212344327082
40:0.04365940363304688
41:0.0422987175322973
42:0.04101066939001896
43:0.03979242792393123
44:0.0386340629

In [7]:
correct = 0
salads =[]
for testcase in labeled_sentences_test:
    prepared_inputs = prepare_input(word_to_ix, testcase[0] )
    prediction_vec = linguo(prepared_inputs).view(2)
    if prediction_vec.data[0] > prediction_vec.data[1]:
        prediction = 0
    else:
        prediction = 1
    if prediction == testcase[1]:
        correct += 1
    
print("Accuracy:{}".format(correct/len(labeled_sentences_test)))

Accuracy:0.9842105263157894


In [None]:
correct

In [None]:
choices = ["yo","tu"]
probabilities = [5,1]
print(choice(choices,10,probabilities))