In [1]:
import pandas as pd
import ast
from collections import Counter
import math
import re

# Load Data

In [2]:
train_data = pd.read_csv('./data_release/train.csv', encoding='latin-1')
val_data = pd.read_csv('./data_release/val.csv', encoding='latin-1')
test_data = pd.read_csv('./data_release/test_no_label.csv', encoding='latin-1')

# Model 1: HMM

In [3]:
class HMM_Metaphor_Tagger():
    def __init__(self, training_data, k=0.5, weights={(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1}):
        self.tag_counts = Counter()
        self.tag_bigrams = {}
        self.emissions = {}
        
        #set k for add-k smoothing
        self.k = k
        
        #set weights for transition probabilities
        self.weights = weights
        
        #iterate through training examples
        for row in training_data.iterrows():
            #preprocess: add start characters and labels for computing initial probabilities
            # and convert strings to lists and downcase sentences
            tags_string = row[1][2]
            tags = ast.literal_eval(tags_string)
            tags.insert(0, '<START>')
            sentence = row[1][0].lower().split()
            sentence.insert(0, '<s>')
            
            #get label bigram counts -- (0,0), (0,1), (1,0), (1,1), ('<START>',0), ('<START>',1)
            for t in range(1, len(tags)):
                tag_bigram = (tags[t-1], tags[t])
                if tag_bigram not in self.tag_bigrams:
                    self.tag_bigrams[tag_bigram] = 1
                else:
                    self.tag_bigrams[tag_bigram] += 1
                    
            #get individual tag counts
            self.tag_counts.update(tags)
            
            #get emission counts
            for i, word in enumerate(sentence):
                if word not in self.emissions:
                    self.emissions[word] = {tags[i] : 1}
                else:
                    if tags[i] not in self.emissions[word]:
                        self.emissions[word][tags[i]] = 1
                    else:
                        self.emissions[word][tags[i]] += 1
    
    
    def viterbi(self, sentence):
        '''
        sentence: string where each token (punctuation included) is separated by a space
        '''
        sentence = sentence.lower().split()
        previous_log_scores = []
        backpointers = []
        tags = list(self.tag_counts)

        #initialization
        for t in range(1, len(tags)):
            tag = tags[t]

            initial_transition_prob = self.tag_bigrams[('<START>', tag)] / self.tag_counts['<START>']
            if sentence[0] in self.emissions:
                initial_emission_prob = self.emissions[sentence[0]].get(tag, self.k) / self.tag_counts[tag]
            else:
                initial_emission_prob = self.k / self.tag_counts[tag]
            
            previous_log_scores.append(math.log(initial_transition_prob) + math.log(initial_emission_prob))
        
        #iteration
        #w is index of current word
        for w in range(1, len(sentence)):
            
            log_scores = [None, None]
            w_backpointers = []
            max_log_score_final = (float('-inf'), None)
            
            #t is index of current tag
            for t in range(1, len(tags)):
                
                t_backpointer = None
                max_log_score = (float('-inf'), None)

                #j is index of previous tag
                for j in range(1, len(tags)):
                    
                    transition_prob = self.tag_bigrams[(tags[j], tags[t])] / self.tag_counts[tags[j]]
                    if sentence[w] in self.emissions:
                        emission_prob = self.emissions[sentence[w]].get(tags[t], self.k) / self.tag_counts[tags[t]]
                    else:
                        emission_prob = self.k / self.tag_counts[tags[t]]
                    
                    weight = self.weights[(tags[j], tags[t])]
                    log_score = previous_log_scores[j-1] + weight * math.log(transition_prob) + math.log(emission_prob)
                    if log_score > max_log_score[0]:
                        max_log_score = (log_score, j)
                        t_backpointer = j
                        
                    if max_log_score[0] > max_log_score_final[0]:
                        max_log_score_final = max_log_score
                
                log_scores[t-1] = max_log_score[0]
                w_backpointers.append(t_backpointer)
                
            previous_log_scores = log_scores
            backpointers.insert(0, w_backpointers)
        
        #backtracking
        max_index = previous_log_scores.index(max(previous_log_scores)) + 1
        output = [tags[max_index]]
    
        if len(sentence) == 1:
            return output
        
        max_index = max_log_score_final[1]
        for bptrs in backpointers:
            max_index = bptrs[max_index-1]
            output.insert(0, tags[max_index])
            
        return output
            

## Weight Experiments

In [4]:
#1: k = 0.25, weights = {(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1}, comparable accuracy, comparable F1
#2: k = 0.5, weights = {(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1}, treating as baseline
#3: k = 1, weights = {(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1}, worse accuracy, worse F1
#4: k = 2, weights = {(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1}, worse accuracy, worse F1

#5: k = 10, weights = {(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1}, worse accuracy, worse F1
#6: k = 100, weights = {(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1}, worse accuracy, worse F1
#7: k = 1000, weights = {(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1}, worse accuracy, worse F1

#max F1 at k = 0.5

#8: k = 0.5, weights = {(0,0): 1, (0,1): 2, (1,0): 2, (1,1): 4}, comparable accuracy, worse F1
#9: k = 0.5, weights = {(0,0): 4, (0,1): 2, (1,0): 2, (1,1): 1}, comparable accuracy, worse F1
#10: k = 0.5, weights = {(0,0): 2, (0,1): 1, (1,0): 1, (1,1): 1}, slightly worse accuracy, slightly better F1
#11: k = 0.5, weights = {(0,0): 4, (0,1): 1, (1,0): 1, (1,1): 1}, slightly worse accuracy, slightly better F1
#12: k = 0.5, weights = {(0,0): 10, (0,1): 1, (1,0): 1, (1,1): 1}, worse accuracy, worse F1

#13: k = 0.5, weights = {(0,0): 5, (0,1): 1, (1,0): 1, (1,1): 1}, worse accuracy, highest F1

#14: k = 0.5, weights = {(0,0): 6, (0,1): 1, (1,0): 1, (1,1): 1}, worse accuracy, worse F1
#15: k = 0.5, weights = {(0,0): 5, (0,1): 2, (1,0): 2, (1,1): 1}, comparable accuracy, worse F1
#15: k = 0.5, weights = {(0,0): 5, (0,1): 2, (1,0): 2, (1,1): 1}, comparable accuracy, worse F1
#16: k = 0.5, weights = {(0,0): 5, (0,1): 0.5, (1,0): 0.5, (1,1): 0.5}, comparable accuracy, worse F1

#max F1 at k = 0.5 and weights = {(0,0): 5, (0,1): 1, (1,0): 1, (1,1): 1} (model 13)

#analysis: the above max parameters result in a very slightly higher F1 score than the baseline
# but with low precision and high recall and as such I believe the baseline (model 2, AKA unweighted) 
# is in fact the most robust model since it has precision roughly equal to recall roughly equal to F1
# as well as the highest accuracy across all tests

#final model parameters: k = 0.5, weights = {(0,0): 1, (0,1): 1, (1,0): 1, (1,1): 1} (model 2)

## Validation

In [5]:
def validate_model_1(model, val_data):
    labels = []
    for row in val_data.iterrows():
        sentence = row[1][0]
        labels += model.viterbi(sentence)
    ids = [i for i in range(len(labels))]
    df = pd.DataFrame({'idx': ids, 'label': labels}, columns = ['idx', 'label'])
    return df

In [6]:
hmm_model = HMM_Metaphor_Tagger(train_data)
df = validate_model_1(hmm_model, val_data)
df.to_csv('model_1_validation.csv', index=False)

## Testing

In [7]:
df = validate_model_1(hmm_model, test_data)
df.to_csv('model_1_test.csv', index=False)

# Model 2: MaxEnt

In [8]:
from sklearn.linear_model import LogisticRegression
from gensim.sklearn_api import W2VTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
from nltk.corpus import wordnet as wn

In [9]:
class MaxEnt_Metaphor_Tagger():
    def __init__(self, train_data, size, k):
        '''
        train_data: dataframe of word features where the first column is the sentence, second column
            is a string of a list of POS_Sequences, and third columnn is a string of a list of metaphor 
            label sequences (0: not metaphor, 1: metaphor)
        '''
        
        self.size = size
        self.k = k
        self.emissions = {}
        # Transform POS_Seq
        self.pos_seqs_list_of_lists = [ast.literal_eval(pos_seq) for pos_seq in train_data['pos_seq']]
        self.pos_seqs = []
        for pos_seq in train_data['pos_seq']:
            self.pos_seqs += ast.literal_eval(pos_seq)
        self.pos_model = W2VTransformer(size=1, min_count=1, seed=1)
        self.posvecs = np.array(self.pos_model.fit(self.pos_seqs_list_of_lists).transform(self.pos_seqs))
        
        # Transform Word Tokens
        self.word_token_list_of_lists = [sentence.lower().split() for sentence in train_data['sentence']]
        self.word_tokens = []
        self.sample_indices = []
        for i, sentence in enumerate(train_data['sentence']):
            self.word_tokens += sentence.lower().split()
            self.sample_indices += [i for x in range(len(sentence))]
        
        
        self.word_model = W2VTransformer(size=self.size, min_count=1, seed=1)
        self.wordvecs = np.array(self.word_model.fit(self.word_token_list_of_lists).transform(self.word_tokens))
        
        # Transform P(metaphor | word)
        self.word_counts = Counter(self.word_tokens)
        
        #TF-IDF 
        self.tfidf_vectorizer = TfidfVectorizer(token_pattern=r"(?u)\S+|[0-9]|!|\'m|\.|\'ll|:|\'re|\'s|,|i\.e\.|\'ve|\'d|\(|\)|a|i|n\'t|;|\?|\"|\'|–")
        self.tfidf_vecs = self.tfidf_vectorizer.fit_transform([sentence.lower() for sentence in train_data['sentence']]).toarray()
        
        # Concatenate Feature Vectors
        self.vecs = np.concatenate((self.wordvecs, self.posvecs), axis=1)
        
        feature_names = self.tfidf_vectorizer.get_feature_names()
        
        for i, word_vec in enumerate(self.vecs):
            word = self.word_tokens[i]
            sentence_index = self.sample_indices[i]
            word_index = feature_names.index(word)
            tfidf = self.tfidf_vecs[sentence_index][word_index]
            word_vec += tfidf
        
        # Create Target Vector
        self.label_seqs = []
        for label_seq in train_data['label_seq']:
            self.label_seqs += ast.literal_eval(label_seq)
        
        # Train MaxEnt classifier
        self.classifier = LogisticRegression(random_state=0, solver='lbfgs', multi_class='ovr').\
            fit(self.vecs, self.label_seqs)
        
        # Get tag bigrams for transition probabilities
        self.tag_counts = Counter()
        self.tag_bigrams = {}
        for row in train_data.iterrows():
            
            #preprocess: add start characters and labels for computing initial probabilities
            # and convert strings to lists and downcase sentences
            tags_string = row[1][2]
            tags = ast.literal_eval(tags_string)
            tags.insert(0, '<START>')
            sentence = row[1][0].lower().split()
            sentence.insert(0, '<s>')
            
            #get label bigram counts -- (0,0), (0,1), (1,0), (1,1), ('<START>',0), ('<START>',1)
            for t in range(1, len(tags)):
                tag_bigram = (tags[t-1], tags[t])
                if tag_bigram not in self.tag_bigrams:
                    self.tag_bigrams[tag_bigram] = 1
                else:
                    self.tag_bigrams[tag_bigram] += 1
                    
            #get individual tag counts
            self.tag_counts.update(tags)
            
            #get emission counts
            for i, word in enumerate(sentence):
                if word not in self.emissions:
                    self.emissions[word] = {tags[i] : 1}
                else:
                    if tags[i] not in self.emissions[word]:
                        self.emissions[word][tags[i]] = 1
                    else:
                        self.emissions[word][tags[i]] += 1
               
            self.metaphor_frequencies = np.zeros(shape=(len(self.word_tokens), 1))
            for i, word in enumerate(sentence):
                if sentence[i] in self.emissions:
                    self.metaphor_frequencies[i] = self.emissions[sentence[0]].get(1, self.k) / self.word_counts.get(word, 1)
                else:
                    self.metaphor_frequencies[i] = self.k / self.word_counts.get(word, 1)
                
        self.vecs = np.concatenate((self.vecs, self.metaphor_frequencies), axis=1)

    def transform_sentence(self, sentence, pos_sequence):
        wordvecs = np.zeros(shape=(len(sentence.split()), self.size))
        posvecs = np.zeros(shape=(len(pos_sequence), 1))
        
        for i, word in enumerate(sentence.lower().split()):
            if word in self.word_tokens:
                wordvecs[i] = self.word_model.transform(word)
            else:
                replacement_word = self.replace_unknown_word(word) 
                if replacement_word == '':
                    wordvecs[i] = np.array([None for i in range(0, self.size)])
                else:
                    wordvecs[i] = self.word_model.transform(replacement_word)


            posvecs[i] = self.pos_model.transform(pos_sequence[i])
        
        
        return np.concatenate((wordvecs, posvecs), axis=1)
    
    def predict_log_proba(self, vectors):
        return self.classifier.predict_log_proba(vectors)
    
    def predict(self, vectors):
        return self.classifier.predict(vectors)
    
    def replace_unknown_word(self, unknown_word):
        max_combo = ['', 0]
        synsets_names = []
        synsets =  wn.synsets(unknown_word)
        if len(synsets) == 0:
            return max_combo[0]
        for synset in synsets:
            synsets_names.append(re.findall('^(\S*)\.+', synset.name())[0])
        
        synsets = wn.synsets(unknown_word)

        for i, synset in enumerate(synsets):
            if synsets_names[i] in self.word_tokens:
                similarity = synset.path_similarity(wn.synsets(unknown_word)[0])
                if similarity is not None and similarity > max_combo[1]:
                    max_combo[0] = synsets_names[i]
                    max_combo[1] = similarity
        return max_combo[0]
    
    def viterbi(self, sentence, pos_seq):
        feature_vectors = self.transform_sentence(sentence, pos_seq)
        sentence = sentence.lower().split()
        previous_log_scores = []
        backpointers = []
        tags = list(self.tag_counts)

        #initialization
        for t in range(1, len(tags)):
            tag = tags[t]
            
            #TRANSITION PROB
            initial_transition_prob = self.tag_bigrams[('<START>', tag)] / self.tag_counts['<START>']
            
            #EMISSION PROB
            if sentence[0] in self.emissions:
                initial_emission_prob = self.emissions[sentence[0]].get(tag, self.k) / self.tag_counts[tag]
            else:
                initial_emission_prob = self.k / self.tag_counts[tag]
            
            #TRAINING PROB
            if np.isnan(feature_vectors[0]).any():
                initial_maxent_log_prob = initial_transition_prob
            else:
                initial_maxent_log_prob = self.classifier.predict_log_proba([feature_vectors[0]])[0][t-1]
            
            previous_log_scores.append(math.log(initial_emission_prob) + initial_maxent_log_prob)
        
        #iteration
        #w is index of current word
        for w in range(1, feature_vectors.shape[0]):
            log_scores = [None, None]
            w_backpointers = []
            max_log_score_final = (float('-inf'), None)
            
            #t is index of current tag
            for t in range(1, len(tags)):
                
                t_backpointer = None
                max_log_score = (float('-inf'), None)

                #j is index of previous tag
                for j in range(1, len(tags)):
                    
                    #TRANSITION
                    transition_prob = self.tag_bigrams[(tags[j], tags[t])] / self.tag_counts[tags[j]]
                    
                    #EMISSION PROB
                    if sentence[w] in self.emissions:
                        emission_prob = self.emissions[sentence[w]].get(tags[t], self.k) / self.tag_counts[tags[t]]
                    else:
                        emission_prob = self.k / self.tag_counts[tags[t]]
                    
                    #TRAINING PROB
                    if np.isnan(feature_vectors[w]).any():
                        maxent_log_prob = transition_prob
                    else:
                        maxent_log_prob = self.classifier.predict_log_proba([feature_vectors[w]])[0][t-1]
                    
                    log_score = previous_log_scores[j-1] + math.log(emission_prob) + maxent_log_prob 
                    if log_score > max_log_score[0]:
                        max_log_score = (log_score, j)
                        t_backpointer = j
                        
                    if max_log_score[0] > max_log_score_final[0]:
                        max_log_score_final = max_log_score
                    
                
                log_scores[t-1] = max_log_score[0]
                w_backpointers.append(t_backpointer)
                
            previous_log_scores = log_scores
            backpointers.insert(0, w_backpointers)
        
        #backtracking
        max_index = previous_log_scores.index(max(previous_log_scores)) + 1
        output = [tags[max_index]]
    
        if feature_vectors.shape[0] == 1:
            return output
        
        max_index = max_log_score_final[1]
        for bptrs in backpointers:
            max_index = bptrs[max_index-1]
            output.insert(0, tags[max_index])
            
        return output

## Validation

In [10]:
def validate_model_2(model, data):
    labels = []
    for row in data.iterrows():
        sentence = row[1][0]
        labels += model.viterbi(sentence, ast.literal_eval(row[1][1]))
    ids = [i for i in range(len(labels))]
    df = pd.DataFrame({'idx': ids, 'label': labels}, columns = ['idx', 'label'])
    return df

In [11]:
maxent_model = MaxEnt_Metaphor_Tagger(train_data, 15, 0.01)
df = validate_model_2(maxent_model, val_data)
df.to_csv('model_2_validation.csv', index=False)

## Testing

In [12]:
maxent_model = MaxEnt_Metaphor_Tagger(train_data, 15, 0.01)
df = validate_model_2(maxent_model, val_data)
df.to_csv('model_2_test.csv', index=False)