In [1]:
import nltk
from nltk.corpus import treebank
from sklearn.model_selection import train_test_split
import dlib
import numpy as np
import random
import time
import pickle
from gensim.models import Word2Vec

In [2]:
# The class for solving the SSVM problem
class POSTaggingProblem:
    C = 1
    
    #initializing the problem
    def __init__(self, samples, labels, L, K, d):
        self.L = L
        self.K = K
        self.d = d
        self.Niter = 10
        self.num_samples = len(samples)
        self.num_dimensions = 1 + (L - 1) * (K * d + 1) + (K - 1) * d + d
        self.samples = samples
        self.labels = labels
        self.loss_for_loop = True
    #returns a feature vector given an input sample and its label
    def make_psi(self, x, label):
        psi = np.zeros(self.num_dimensions)
        psi[0] = 1.0  # The bias
        for iL in range(self.L):
            i = label[iL] if iL < len(label) else self.K - 1  
            start = 1 + iL * (self.K * self.d + 1) + i * self.d
            if iL < len(x): 
                psi[start:start + self.d] = x[iL][:self.d]
        return dlib.vector(psi)
    
    #returns a feature vector for a given sample with its true label:
    def get_truth_joint_feature_vector(self, idx):
        return self.make_psi(self.samples[idx], self.labels[idx])
    
    #finds most violating classes and calculates loss:
    def separation_oracle(self, idx, current_solution):
        samp = self.samples[idx]
        psi = dlib.vector()
        psi.resize(self.num_dimensions)
        max1 = -1e10
        max_scoring_label = [0] * self.L
        for iL in range(self.L):
            for i in range(self.K):
                tmp_label = list(self.labels[idx])
                tmp_label[iL] = i
                tmp_psi = self.make_psi(samp, tmp_label)
                score1 = dlib.dot(current_solution, tmp_psi)
                loss1 = 0.0
                if self.loss_for_loop:
                    for j in range(self.L):
                        if self.labels[idx][j] != tmp_label[j]:
                            loss1 += 1.0
                if max1 < score1 + loss1:
                    max1 = score1 + loss1
                    loss = loss1
                    max_scoring_label[iL] = i
                    psi = tmp_psi
        return loss, psi


In [3]:
#Loading a subset of the data 
tagged_sentences = treebank.tagged_sents()[:1000]

#Function to simplify tags for decreased complexity
def simplify_tag(tag):
    if tag.startswith('N'): return "NOUN"
    elif tag.startswith('V'): return "VERB"
    elif tag.startswith('J'): return "ADJ"
    else: return "OTHER"
    
#Preprocessing the sentences
def preprocess_sentence(tagged_sentence):
    words, tags = zip(*tagged_sentence)
    words = [word.lower() for word in words]
    tags = [simplify_tag(tag) for tag in tags]
    return words, tags

#Padding the list 
def pad_to_length(lst, length, padding_element):
    if len(lst) < length:
        return lst + [padding_element] * (length - len(lst))
    else:
        return lst


sentences, labels = zip(*[preprocess_sentence(sentence) for sentence in tagged_sentences])


In [4]:
# Training the Word2Vec model
sentences = [list(sentence) for sentence in sentences]
model = Word2Vec(sentences, min_count=1, vector_size=100)

#Converting words to vectors using word2vec
def word_to_vec(word, model):
    return model.wv[word] if word in model.wv else np.zeros(model.vector_size)

#Building a vocab of all the words and list for labels
vocabulary = list(set(word for sentence in sentences for word in sentence))
vocabulary.append('PADDING')
all_tags = list(set(tag for tags in labels for tag in tags))
all_tags.append('PADDING')

K = len(all_tags)
L = 7  # Window size 
d = model.vector_size

print('Number of unique words (including padding):', len(vocabulary))
print('Number of unique tags (including padding):', len(all_tags))
print('Window size:', L)
print('Dimensions:', d)


Number of unique words (including padding): 5026
Number of unique tags (including padding): 5
Window size: 7
Dimensions: 100


In [5]:
sentences_padded = [pad_to_length(sentence, L, 'PADDING') for sentence in sentences]
labels_padded = [pad_to_length(tags, L, 'PADDING') for tags in labels]

features = [[word_to_vec(word, model) for word in sentence] for sentence in sentences_padded]
label_numbers = [[all_tags.index(tag) if tag != 'PADDING' else -1 for tag in label] for label in labels_padded]

split = int(len(features) * 0.8)
train_features = features[:split]
train_labels = label_numbers[:split]
test_features = features[split:]
test_labels = label_numbers[split:]

train_features_windows = [sentence[i:i + L] for sentence in train_features for i in range(len(sentence) - L + 1)]
train_labels_windows = [labels[i:i + L] for labels in train_labels for i in range(len(labels) - L + 1)]


In [6]:
#Solving the SSVM Problem
print('Creating and solving the SSVM problem...')
start_train_time = time.time()
problem = POSTaggingProblem(train_features_windows, train_labels_windows, L, K, d)
solution = dlib.solve_structural_svm_problem(problem)
end_train_time = time.time()
print('SSVM problem solved.')

print(f'Training time: {end_train_time - start_train_time} seconds')


Creating and solving the SSVM problem...
SSVM problem solved.
Training time: 906.6982996463776 seconds


In [7]:
#Model evaluation

print('Evaluating the model...')
start_test_time = time.time()
correct_predictions = 0
total_predictions = 0
total_tags = 0
predictions = []

for idx, sentence_features in enumerate(test_features):
    predicted_labels = []
    
    scores = []
    for tag_index in range(K):
        label_sequence = [tag_index]
        scores.append(dlib.dot(solution, problem.make_psi([sentence_features[0]], label_sequence)))
    predicted_labels.append(scores.index(max(scores)))

    for i in range(1, len(sentence_features) - 1):
        window_features = sentence_features[i-1:i+2]
        scores = []
        for tag_index in range(K):
            label_sequence = [0, tag_index, 0]  # Now the target label is in the middle of the label sequence
            scores.append(dlib.dot(solution, problem.make_psi(window_features, label_sequence)))
        predicted_labels.append(scores.index(max(scores)))

    scores = []
    for tag_index in range(K):
        label_sequence = [tag_index]
        scores.append(dlib.dot(solution, problem.make_psi([sentence_features[-1]], label_sequence)))
    predicted_labels.append(scores.index(max(scores)))

    predictions.append(predicted_labels)
    
    print('Progress: [{0}/{1}]'.format(idx + 1, len(test_features)), flush=True, end='\r')

    # Updating counts for accuracy calc
    actual_labels = test_labels[idx][:len(sentence_features)]  # Consider all labels in the sentence
    for predicted, actual in zip(predicted_labels, actual_labels):
        if actual != -1:
            total_predictions += 1
            if predicted == actual:
                correct_predictions += 1

end_test_time = time.time()
print(f'\nTest time: {end_test_time - start_test_time} seconds')



Evaluating the model...
Progress: [200/200]
Test time: 5.975387811660767 seconds


In [8]:
# Calculate and print accuracy
if total_predictions != 0:
    accuracy = correct_predictions / total_predictions
    print('\nAccuracy:', accuracy)
else:
    print('\nNo predictions')



Accuracy: 0.20567510950295181


In [9]:
# Random test example to print
example_index = random.randint(0, len(test_features) - 1)
example_sentence = sentences[split:][example_index]
example_ground_truth = test_labels[example_index]
example_prediction = predictions[example_index]

print('Example sentence:', ' '.join(example_sentence))
print('Ground truth:', ' '.join([all_tags[label] if label != -1 else 'PADDING' for label in example_ground_truth]))
print('Prediction:', ' '.join([all_tags[label] if label != -1 else 'PADDING' for label in example_prediction]))


Example sentence: terms were n't disclosed *-1 .
Ground truth: NOUN VERB OTHER VERB OTHER OTHER PADDING
Prediction: NOUN NOUN NOUN OTHER NOUN NOUN OTHER
