In [3]:
import os

def read_poems(file_path, label):
    with open(file_path, 'r') as file:
        lines = file.readlines()
        
    lines = [line.strip() for line in lines if line.strip()]
    return [(line, label) for line in lines]

poe_lines = read_poems("./../data/edgar_allan_poe.txt", 0)
frost_lines = read_poems("./../data/robert_frost.txt", 1)

In [2]:
print(poe_lines[0])
print(frost_lines[1])

("LO! Death hath rear'd himself a throne", 0)
('And sorry I could not travel both', 1)


In [4]:
data_poe, labels_poe = zip(*poe_lines)
data_frost, labels_frost = zip(*frost_lines)

In [6]:
# Split
from sklearn.model_selection import train_test_split

data_train_poe, data_test_poe, labels_train_poe, labels_test_poe = train_test_split(data_poe, labels_poe, test_size=0.2, random_state=42)
data_train_frost, data_test_frost, labels_train_frost, label_test_frost = train_test_split(data_frost, labels_frost, test_size=0.2, random_state=42)

In [10]:
from collections import defaultdict

def create_mapping(data):
    word_to_index = defaultdict(lambda: len(word_to_index))
    word_to_index["<UNK>"] = 0 # special token for unknown words
    
    for line in data:
        words = line.split()
        [word_to_index[word] for word in words]
        
    return dict(word_to_index)

word_to_index_poe = create_mapping(data_train_poe)
word_to_index_frost = create_mapping(data_train_frost)

In [9]:
word_to_index_poe

{'<UNK>': 0,
 'It': 1,
 'was': 2,
 'night': 3,
 'in': 4,
 'the': 5,
 'lonesome': 6,
 'October': 7,
 'In': 8,
 'seraphic': 9,
 'glancing': 10,
 'of': 11,
 'thine': 12,
 'eyes-': 13,
 'And': 14,
 'now,': 15,
 'as': 16,
 'if': 17,
 'mockery': 18,
 'that': 19,
 'boast,': 20,
 'we': 21,
 'marked': 22,
 'not': 23,
 'year-': 24,
 'The': 25,
 'mask': 26,
 'viol': 27,
 'and': 28,
 'vine.': 29,
 'But': 30,
 'you': 31,
 'meddle': 32,
 'with': 33,
 'my': 34,
 'fate': 35,
 'That': 36,
 'a': 37,
 'quickening': 38,
 'spell': 39,
 'doth': 40,
 "o'er": 41,
 'us': 42,
 'pass': 43,
 'Drawn': 44,
 'by': 45,
 'their': 46,
 "heart's": 47,
 'passion,': 48,
 'tone,': 49,
 'She': 50,
 'rolls': 51,
 'through': 52,
 'an': 53,
 'ether': 54,
 'sighs-': 55,
 'agony': 56,
 'sobbed,': 57,
 'letting': 58,
 'sink': 59,
 'her': 60,
 'senescent,': 61,
 'Death': 62,
 'to': 63,
 'some': 64,
 'more': 65,
 'happy': 66,
 'clime': 67,
 'Perhaps': 68,
 'it': 69,
 'may': 70,
 'be': 71,
 'mind': 72,
 'is': 73,
 'wrought': 74,
 'O

In [16]:
def tokenize_and_convert(data, word_to_index):
    tokenized_data = []
    for line in data:
        words = line.split()
        tokenized_line = [word_to_index[word] if word in word_to_index else word_to_index["<UNK>"] for word in words]
        tokenized_data.append(tokenized_line)
    return tokenized_data

data_train_tokenized_poe = tokenize_and_convert(data_train_poe, word_to_index_poe)
data_test_tokenized_poe = tokenize_and_convert(data_test_poe, word_to_index_poe)

data_train_tokenized_frost = tokenize_and_convert(data_train_frost, word_to_index_frost)
data_test_tokenized_frost = tokenize_and_convert(data_test_frost, word_to_index_frost)

In [18]:
print(data_train_poe[0])
print(data_train_tokenized_poe[0])

It was night in the lonesome October
[1, 2, 3, 4, 5, 6, 7]


In [41]:
# Utils functions for implementing transition matrix and initial distribution for starting words
def get_transition_probability(word_i, word_j, transition_counts, total_counts, vocabulary_size):
    """
    Calculate the transition probability from word_i to word_j using one-add smoothing
    """
    count_i_j = transition_counts.get((word_i, word_j), 0)
    total_count_i = total_counts.get(word_i, 0)
    
    return (count_i_j + 1) / (total_count_i + vocabulary_size)

def calculate_initial_state_distribution(word, distribution):
    count_word = distribution.get(word, 0)
    distribution[word] = count_word + 1
    
def get_initial_state_distribution(word, distribution, N, vocabulary_size):
    count_word = distribution.get(word, 0)
    return (count_word + 1) / (N + vocabulary_size)

In [37]:
# Define state transition matrix, calculate transition_counts, total_counts, vocab size and initial_state
# distribution and pi
def calculate_transitions_and_initial_distr(data, initial_state_distrib, transitions_counts_A, total_counts_A, vocab_size):
    for line in data:
        calculate_initial_state_distribution(line[0], initial_state_distrib)
        for i in range(len(line) - 1):
            total_counts_A[line[i]] =  total_counts_A.get(line[i], 0) + 1
            transitions_counts_A[(line[i], line[i+1])] = transitions_counts_A.get((line[i], line[i+1]), 0) + 1
            
transitions_counts_poe = {}
total_counts_poe = {}
vocab_size_poe = len(word_to_index_poe)
initial_state_distrib_poe = {}

calculate_transitions_and_initial_distr(data_train_tokenized_poe, initial_state_distrib_poe, transitions_counts_poe,
                                        total_counts_poe, vocab_size_poe)

transitions_counts_frost = {}
total_counts_frost = {}
vocab_size_frost = len(word_to_index_frost)
initial_state_distrib_frost = {}

N_poe = len(data_train_poe)
N_frost = len(data_train_frost)

calculate_transitions_and_initial_distr(data_train_tokenized_frost, initial_state_distrib_frost, transitions_counts_frost,
                                        total_counts_frost, vocab_size_frost)

In [46]:
import random

# pick a random test line
test_data_ex = data_test_tokenized_frost[random.randrange(len(data_test_tokenized_frost))]

def calculate_sequence_probability(test_line, transition_counts, total_counts, initial_state_distrib, vocab_size, N):
    pi_calc = get_initial_state_distribution(test_line[0], initial_state_distrib, N, vocab_size)
    prod = 1
    for i in range(1, len(test_line)):
        prod = prod * get_transition_probability(test_line[i-1], test_line[i], transition_counts, total_counts, vocab_size)

    return pi_calc * prod

In [47]:
prob_poe = calculate_sequence_probability(test_data_ex, transitions_counts_poe, total_counts_poe, initial_state_distrib_poe, vocab_size_poe, N_poe)
prob_frost = calculate_sequence_probability(test_data_ex, transitions_counts_frost, total_counts_frost, initial_state_distrib_frost, vocab_size_frost, N_frost)

print(prob_poe)
print(prob_frost)

1.2578599953666534e-21
1.1825799383031862e-20


In [48]:
# Compute argmax of log of probabilities for each class
classes_of_prob = [
    {'label': 'Edgar Allan Poe', 'probability': max(prob_poe, 1e-300)},
    {'label': 'Robert Frost', 'probability': max(prob_frost, 1e-300)}
]

for class_prob in classes_of_prob:
    class_prob['log_probability'] = math.log(class_prob['probability'])

argmax_class = max(classes_of_prob, key=lambda x: x['log_probability'])

print(f"The class with the maximum log probability is: {argmax_class['label']}")

The class with the maximum log probability is: Robert Frost
