In [3]:
import numpy as np
import EvalScript.evalResult as evalResult
import heapq
import math

BASE = './Data/'
START = 'START'
STOP = 'STOP'

In [4]:
class UnlabelledData:
    def __init__(self, path):
        self.sentences = []
        with open(path, 'r', encoding='utf8') as f:
            current_sentence = []
            for line in f:
                line = line.strip()
                if line == '':
                    self.sentences.append(current_sentence)
                    current_sentence = []
                else:
                    current_sentence.append(line)
            if len(current_sentence):
                self.sentences.append(current_sentence)

In [5]:
class LabelledData:
    def __init__(self, path = None):
        self.sentences = []
        if path == None:
            return
        with open(path, 'r', encoding='utf8') as f:
            current_sentence = []
            for line in f:
                line = line.strip()
                if line == '':
                    self.sentences.append(current_sentence)
                    current_sentence = []
                else:
                    current_sentence.append(tuple(line.rsplit(maxsplit=1)))
            if len(current_sentence):
                self.sentences.append(current_sentence)
    
    def write_to_file(self, path):
        with open(path, 'w', encoding='utf8') as f:
            for sentence in self.sentences:
                for data in sentence:
                    print(*data, file=f)
                print(file=f)

# Part 1

In [6]:
class HMModel1:
    labels = ['O', 'B-negative', 'I-negative', 'B-neutral', 'I-neutral', 'B-positive', 'I-positive']
    
    def __init__(self, k, smoothing_label):
        self.emit_counts = {}
        self.label_counts = dict.fromkeys(self.labels, 0)
        self.k = k
        self.smoothing_label = smoothing_label
    
    def get_emission_prob(self, x, y):
        if self.emit_counts.get((y, x), 0) != 0:
            probability = np.log(float(self.emit_counts.get((y, x),0) - self.smoothing_label)) - np.log(float(self.label_counts[y])) 
        else:
            probability = np.log(self.smoothing_label ) - np.log(float(self.label_counts[y])) 
        return probability
    
    def learn(self, data: LabelledData):
        for sentence in data.sentences:
            for x, y in sentence:
                for label in self.labels:
                    self.emit_counts.setdefault((label, x), 0)
                self.emit_counts[y, x] += 1
                self.label_counts[y] += 1
    
    def label(self, data: UnlabelledData):
        labelled = LabelledData()
        for unlabelled_sentence in data.sentences:
            sentence = []
            for token in unlabelled_sentence:
                y_max = None; y_max_prob = float('-inf')
                for y in self.labels:
                    y_prob = self.get_emission_prob(token, y)
                    if y_prob > y_max_prob:
                        y_max_prob = y_prob
                        y_max = y

                sentence.append((token, y_max))
            labelled.sentences.append(sentence)
        return labelled
            

In [7]:
%%time

for dataset in ['RU', 'ES']:
    train = LabelledData(BASE + dataset + '/train')
    dev_in = UnlabelledData(BASE + dataset + '/dev.in')
    model = HMModel1(k=1, smoothing_label= 0.004)
    model.learn(train)
    predicted = model.label(dev_in)
    predicted.write_to_file(BASE + dataset + '/dev.p1.out')
    
    print(f'{f" {dataset} ":=^30}')
    evalResult.evaluate(BASE + dataset + '/dev.out', BASE + dataset + '/dev.p1.out')
    print('='*30)

#Entity in gold data: 389
#Entity in prediction: 2086

#Correct Entity : 269
Entity  precision: 0.1290
Entity  recall: 0.6915
Entity  F: 0.2174

#Correct Sentiment : 129
Sentiment  precision: 0.0618
Sentiment  recall: 0.3316
Sentiment  F: 0.1042
#Entity in gold data: 229
#Entity in prediction: 1620

#Correct Entity : 175
Entity  precision: 0.1080
Entity  recall: 0.7642
Entity  F: 0.1893

#Correct Sentiment : 97
Sentiment  precision: 0.0599
Sentiment  recall: 0.4236
Sentiment  F: 0.1049
CPU times: total: 359 ms
Wall time: 350 ms


# Part 2

In [8]:
class HMModel2:
    labels = ['O', 'B-negative', 'I-negative', 'B-neutral', 'I-neutral', 'B-positive', 'I-positive', START, STOP]
    
    def __init__(self, k):
        self.emit_counts = {}
        self.label_counts = dict.fromkeys(self.labels, 0)
        self.transition_counts = {}
        self.k = k
        self.label_to_index = dict(map(lambda x: (x[1], x[0]), enumerate(self.labels)))
    
    def get_emission_prob2(self, x, y):
        return np.log(float(self.emit_counts.get((y, x), self.k))) - np.log(float((self.label_counts[y] + self.k)))

    def get_emission_prob(self, x, y):
        if (y, x) in self.emit_counts:
            return np.log(self.emit_counts.get((y, x)) / (self.label_counts[y] + self.k))
        else:
            for tag in self.labels:
                if (tag, x) in self.emit_counts:
                    return np.log(0)
            return np.log(self.k / (self.label_counts[y] + self.k))
    
    def get_transition_prob(self, y1, y2):
        return np.log(float(self.transition_counts.get((y1, y2), 0))) - np.log(float((self.label_counts.get(y1, 1))))
    
    def learn(self, data: LabelledData):
        for sentence in data.sentences:
            prev_y = START
            for x, y in sentence:
                self.emit_counts.setdefault((y, x), 0)

                
                self.emit_counts[y, x] += 1
                self.label_counts[y] += 1
                self.transition_counts[prev_y, y] = self.transition_counts.get((prev_y, y), 0) + 1
                prev_y = y
            self.transition_counts[prev_y, STOP] = self.transition_counts.get((prev_y, STOP), 0) + 1

            self.label_counts[START] += 1
            self.label_counts[STOP] += 1
    
    def viterbi(self, sentence, transition_prob):
        pi = np.full((len(sentence) + 1, len(self.labels)), -np.inf, dtype=np.double)
        pi[0, self.label_to_index[START]] = 0.0 # = log(1)

        for k in range(1, len(sentence) + 1):
            token = sentence[k-1]
            for vi, v in enumerate(self.labels):
                possible_next = pi[k-1, :] + transition_prob[:, vi].T + self.get_emission_prob(token, v)
                pi[k, vi] = np.max(possible_next)

        return pi

    def label(self, data):
        labelled = LabelledData()
        transition_prob = np.ndarray(shape=(len(self.labels), len(self.labels)), dtype=np.double)
        
        for ui, u in enumerate(self.labels):
            for vi, v in enumerate(self.labels):
                transition_prob[ui, vi] = self.get_transition_prob(u, v)
        
        for unlabelled_sentence in data.sentences:
            pi = self.viterbi(unlabelled_sentence, transition_prob)
            sentence = list(unlabelled_sentence)
            next_yi = self.label_to_index[STOP]
            for i in reversed(range(len(unlabelled_sentence))):
                next_yi = np.argmax(pi[i+1,:] + transition_prob[:, next_yi].T)
                sentence[i] = (sentence[i], self.labels[next_yi])
            labelled.sentences.append(sentence)
        return labelled

In [9]:
%%time

for dataset in ['RU', 'ES']:
    train = LabelledData(BASE + dataset + '/train')
    dev_in = UnlabelledData(BASE + dataset + '/dev.in')
    model = HMModel2(k=1)
    model.learn(train)
    predicted = model.label(dev_in)
    predicted.write_to_file(BASE + dataset + '/dev.p2.out')
    
    print(f'{f" {dataset} ":=^30}')
    evalResult.evaluate(BASE + dataset + '/dev.out', BASE + dataset + '/dev.p2.out')
    print('='*30)

  return np.log(float(self.transition_counts.get((y1, y2), 0))) - np.log(float((self.label_counts.get(y1, 1))))
  return np.log(0)


#Entity in gold data: 389
#Entity in prediction: 488

#Correct Entity : 189
Entity  precision: 0.3873
Entity  recall: 0.4859
Entity  F: 0.4310

#Correct Sentiment : 129
Sentiment  precision: 0.2643
Sentiment  recall: 0.3316
Sentiment  F: 0.2942
#Entity in gold data: 229
#Entity in prediction: 542

#Correct Entity : 134
Entity  precision: 0.2472
Entity  recall: 0.5852
Entity  F: 0.3476

#Correct Sentiment : 97
Sentiment  precision: 0.1790
Sentiment  recall: 0.4236
Sentiment  F: 0.2516
CPU times: total: 891 ms
Wall time: 899 ms


# Part 3

Note here that $k$ has been renamed to $t$ to remove the conflict between the $k$ variable in Viterbi's algorithm.

In [10]:
class HMModel3:
    labels = ['O', 'B-negative', 'I-negative', 'B-neutral', 'I-neutral', 'B-positive', 'I-positive', START, STOP]
    
    def __init__(self, k):
        self.emit_counts = {}
        self.label_counts = dict.fromkeys(self.labels, 0)
        self.transition_counts = {}
        self.k = k
        self.label_to_index = dict(map(lambda x: (x[1], x[0]), enumerate(self.labels)))
    
    def get_emission_prob(self, x, y):
        return np.log(float(self.emit_counts.get((y, x), self.k))) - np.log(float((self.label_counts[y] + self.k)))
    
    def get_transition_prob(self, y1, y2):
        return np.log(float(self.transition_counts.get((y1, y2), 0))) - np.log(float((self.label_counts.get(y1, 1))))
    
    def learn(self, data: LabelledData):
        for sentence in data.sentences:
            prev_y = START
            for x, y in sentence:
                for label in self.labels:
                    self.emit_counts.setdefault((label, x), 0)

                self.emit_counts[y, x] += 1
                self.label_counts[y] += 1
                self.transition_counts[prev_y, y] = self.transition_counts.get((prev_y, y), 0) + 1
                prev_y = y
            self.transition_counts[prev_y, STOP] = self.transition_counts.get((prev_y, STOP), 0) + 1

            self.label_counts[START] += 1
            self.label_counts[STOP] += 1
    
    def kbest(self, pi, prev, transition_prob, t, k, vi, token=None):
        heap = []
        for i in range(t):
            emission_prob = self.get_emission_prob(token, self.labels[vi]) if token != None else 0.0
            possible_next = pi[k-1, :, i] + transition_prob[:, vi].T + emission_prob

            for j, p in enumerate(possible_next):
                if not any(math.isclose(x[0], p) for x in heap):
                    if len(heap) == t:
                        heapq.heappushpop(heap, (p, i, j))
                    else:
                        heapq.heappush(heap, (p, i, j))

        for besti, (p, i, j) in enumerate(heapq.nlargest(t, heap)):
            pi[k, vi, besti] = p
            prev[k-1, vi, besti, 0] = i
            prev[k-1, vi, besti, 1] = j

    
    def viterbi(self, sentence, transition_prob, t):
        pi = np.full((len(sentence) + 2, len(self.labels), t), -np.inf, dtype=np.double)
        pi[0, self.label_to_index[START], 0] = 0.0 # = log(1)
        prev = np.full((len(sentence) + 1, len(self.labels), t, 2), -1, dtype=np.int64)

        for k in range(1, len(sentence) + 1):
            for vi, v in enumerate(self.labels):
                self.kbest(pi, prev, transition_prob, t, k, vi, sentence[k-1])
        
        self.kbest(pi, prev, transition_prob, t, len(sentence) + 1, self.label_to_index[STOP])

        return prev

    def label(self, data, t):
        labelled = LabelledData()
        transition_prob = np.ndarray(shape=(len(self.labels), len(self.labels)), dtype=np.double)
        
        for ui, u in enumerate(self.labels):
            for vi, v in enumerate(self.labels):
                transition_prob[ui, vi] = self.get_transition_prob(u, v)
        
        for unlabelled_sentence in data.sentences:
            prev = self.viterbi(unlabelled_sentence, transition_prob, t)

            sentence = list(unlabelled_sentence)
            next_yi = self.label_to_index[STOP]
            j = t-1
            
            for i in reversed(range(len(unlabelled_sentence))):
                p = (j, self.labels[next_yi])
                j, next_yi = prev[i+1, next_yi, j, :]
                sentence[i] = (sentence[i], self.labels[next_yi])
            labelled.sentences.append(sentence)
        return labelled

In [11]:
%%time

# Verify the algorithm gives the same result as non-modified viterbi for k=1
for dataset in ['RU', 'ES']:
    train = LabelledData(BASE + dataset + '/train')
    dev_in = UnlabelledData(BASE + dataset + '/dev.in')

    model = HMModel3(k=1)
    model.learn(train)

    predicted2 = model.label(dev_in, 1)
    predicted2.write_to_file(BASE + dataset + '/dev.p3.1st.out')    
    print(f'{f" {dataset} k=1 ":=^30}')
    evalResult.evaluate(BASE + dataset + '/dev.out', BASE + dataset + '/dev.p3.1st.out')
    print('='*30)

  return np.log(float(self.transition_counts.get((y1, y2), 0))) - np.log(float((self.label_counts.get(y1, 1))))
  return np.log(float(self.emit_counts.get((y, x), self.k))) - np.log(float((self.label_counts[y] + self.k)))


#Entity in gold data: 389
#Entity in prediction: 488

#Correct Entity : 189
Entity  precision: 0.3873
Entity  recall: 0.4859
Entity  F: 0.4310

#Correct Sentiment : 129
Sentiment  precision: 0.2643
Sentiment  recall: 0.3316
Sentiment  F: 0.2942
#Entity in gold data: 229
#Entity in prediction: 542

#Correct Entity : 134
Entity  precision: 0.2472
Entity  recall: 0.5852
Entity  F: 0.3476

#Correct Sentiment : 97
Sentiment  precision: 0.1790
Sentiment  recall: 0.4236
Sentiment  F: 0.2516
CPU times: total: 1.36 s
Wall time: 1.36 s


In [12]:
%%time

for dataset in ['RU', 'ES']:
    train = LabelledData(BASE + dataset + '/train')
    dev_in = UnlabelledData(BASE + dataset + '/dev.in')

    model = HMModel3(k=1)
    model.learn(train)

    predicted2 = model.label(dev_in, 2)
    predicted2.write_to_file(BASE + dataset + '/dev.p3.2nd.out')    
    print(f'{f" {dataset} k=2 ":=^30}')
    evalResult.evaluate(BASE + dataset + '/dev.out', BASE + dataset + '/dev.p3.2nd.out')
    print('='*30)
    
    predicted8 = model.label(dev_in, 8)
    predicted8.write_to_file(BASE + dataset + '/dev.p3.8th.out')
    print(f'{f" {dataset}#  k=8 ":=^30}')
    evalResult.evaluate(BASE + dataset + '/dev.out', BASE + dataset + '/dev.p3.8th.out')
    print('='*30)

  return np.log(float(self.transition_counts.get((y1, y2), 0))) - np.log(float((self.label_counts.get(y1, 1))))
  return np.log(float(self.emit_counts.get((y, x), self.k))) - np.log(float((self.label_counts[y] + self.k)))


#Entity in gold data: 389
#Entity in prediction: 749

#Correct Entity : 202
Entity  precision: 0.2697
Entity  recall: 0.5193
Entity  F: 0.3550

#Correct Sentiment : 126
Sentiment  precision: 0.1682
Sentiment  recall: 0.3239
Sentiment  F: 0.2214
#Entity in gold data: 389
#Entity in prediction: 716

#Correct Entity : 170
Entity  precision: 0.2374
Entity  recall: 0.4370
Entity  F: 0.3077

#Correct Sentiment : 94
Sentiment  precision: 0.1313
Sentiment  recall: 0.2416
Sentiment  F: 0.1701
#Entity in gold data: 229
#Entity in prediction: 464

#Correct Entity : 117
Entity  precision: 0.2522
Entity  recall: 0.5109
Entity  F: 0.3377

#Correct Sentiment : 66
Sentiment  precision: 0.1422
Sentiment  recall: 0.2882
Sentiment  F: 0.1905
#Entity in gold data: 229
#Entity in prediction: 464

#Correct Entity : 106
Entity  precision: 0.2284
Entity  recall: 0.4629
Entity  F: 0.3059

#Correct Sentiment : 59
Sentiment  precision: 0.1272
Sentiment  recall: 0.2576
Sentiment  F: 0.1703
CPU times: total: 11.1 

### Part 4

Used Absolute Discounting for smoothing in place of Laplace Smoothing. In addition, applied smoothing to both emission and transition probabilities.

In [13]:
class HMModel4:
    labels = ['O', 'B-negative', 'I-negative', 'B-neutral', 'I-neutral', 'B-positive', 'I-positive', START, STOP]
    
    def __init__(self, k, smoothing_label):
        self.emit_counts = {}
        self.label_counts = dict.fromkeys(self.labels, 0)
        self.transition_counts = {}
        self.k = k
        self.label_to_index = dict(map(lambda x: (x[1], x[0]), enumerate(self.labels)))
        self.smoothing_label = smoothing_label
    
    def get_emission_prob(self, x, y):
        if self.emit_counts.get((y, x), 0) != 0:
            probability = np.log(float(self.emit_counts.get((y, x),0) - self.smoothing_label)) - np.log(float(self.label_counts[y])) 
        else:
            probability = np.log(self.smoothing_label) - np.log(float(self.label_counts[y])) 
        return probability
    
    def get_transition_prob(self, y1, y2):
        if self.transition_counts.get((y1, y2), 0) != 0:
            probability = np.log(float(self.transition_counts.get((y1, y2), 0) - self.smoothing_label)) - np.log(float(self.label_counts.get(y1, 1))) 
        else:
            probability = np.log(self.smoothing_label) - np.log(float(self.label_counts.get(y1, 1))) 
        return probability

    
    def learn(self, data: LabelledData):
        for sentence in data.sentences:
            prev_y = START
            for x, y in sentence:
                for label in self.labels:
                    self.emit_counts.setdefault((label, x), 0)

                self.emit_counts[y, x] += 1
                self.label_counts[y] += 1
                self.transition_counts[prev_y, y] = self.transition_counts.get((prev_y, y), 0) + 1
                prev_y = y
            self.transition_counts[prev_y, STOP] = self.transition_counts.get((prev_y, STOP), 0) + 1

            self.label_counts[START] += 1
            self.label_counts[STOP] += 1
    
    def viterbi(self, sentence, transition_prob):
        pi = np.full((len(sentence) + 1, len(self.labels)), -np.inf, dtype=np.double)
        pi[0, self.label_to_index[START]] = 0.0 # = log(1)

        for k in range(1, len(sentence) + 1):
            token = sentence[k-1]
            for vi, v in enumerate(self.labels):
                possible_next = pi[k-1, :] + transition_prob[:, vi].T + self.get_emission_prob(token, v)
                pi[k, vi] = np.max(possible_next)

        return pi

    def label(self, data):
        labelled = LabelledData()
        transition_prob = np.ndarray(shape=(len(self.labels), len(self.labels)), dtype=np.double)
        
        for ui, u in enumerate(self.labels):
            for vi, v in enumerate(self.labels):
                transition_prob[ui, vi] = self.get_transition_prob(u, v)
        
        for unlabelled_sentence in data.sentences:
            pi = self.viterbi(unlabelled_sentence, transition_prob)
            sentence = list(unlabelled_sentence)
            next_yi = self.label_to_index[STOP]
            for i in reversed(range(len(unlabelled_sentence))):
                next_yi = np.argmax(pi[i+1,:] + transition_prob[:, next_yi].T)
                sentence[i] = (sentence[i], self.labels[next_yi])
            labelled.sentences.append(sentence)
        return labelled

In [15]:
%%time

for dataset in ['RU', 'ES']:
    train = LabelledData(BASE + dataset + '/train')
    dev_in = UnlabelledData(BASE + dataset + '/dev.in')
    model = HMModel4(k=1, smoothing_label=0.003)
    model.learn(train)
    predicted = model.label(dev_in)
    predicted.write_to_file(BASE + dataset + '/dev.p2.out')
    
    print(f'{f" {dataset} ":=^30}')
    evalResult.evaluate(BASE + dataset + '/dev.out', BASE + dataset + '/dev.p2.out')
    print('='*30)

#Entity in gold data: 389
#Entity in prediction: 486

#Correct Entity : 176
Entity  precision: 0.3621
Entity  recall: 0.4524
Entity  F: 0.4023

#Correct Sentiment : 123
Sentiment  precision: 0.2531
Sentiment  recall: 0.3162
Sentiment  F: 0.2811
#Entity in gold data: 229
#Entity in prediction: 512

#Correct Entity : 129
Entity  precision: 0.2520
Entity  recall: 0.5633
Entity  F: 0.3482

#Correct Sentiment : 96
Sentiment  precision: 0.1875
Sentiment  recall: 0.4192
Sentiment  F: 0.2591
CPU times: total: 906 ms
Wall time: 899 ms
