In [7]:
import pandas as pd 
import numpy as np

In [12]:
import pandas as pd

# Read training data from CSV file
train_data = pd.read_csv('train.csv')

# # Extract untagged sentences and tagged sentences from the training data
# untagged_sentences = train_data['untagged_sentence'].apply(eval).tolist()
# tagged_sentences = train_data['tagged_sentence'].apply(eval).tolist()

# # Flatten the lists to get individual words and POS tags
# words = [word for sentence in untagged_sentences for word in sentence]
# pos_tags = [pos_tag[1] for sentence_tags in tagged_sentences for pos_tag in sentence_tags]
# sentences=[sentence for sentence in untagged_sentences]
# tagged_sentences=[sentence for sentence in tagged_sentences]

tagged_sentences = train_data['tagged_sentence'].apply(eval).tolist()

def get_states_obs(data):
    observations = set()
    states = set()

    for row in tagged_sentences:
        for word, tag in row:
            observations.add(word)
            states.add(tag)

    observations = list(observations)
    states = list(states)
    
    return observations, states


obs, states=get_states_obs(train_data)


In [9]:
class HMM():
    def __init__(self, states, observations):
        self.states = states
        self.state_to_index = {state: i for i, state in enumerate(states)}
        self.observations = observations
        self.observation_to_index = {obs: i for i, obs in enumerate(observations)}
        self.num_states = len(states)
        self.num_observations = len(observations)
        self.transition_probability = np.zeros((self.num_states, self.num_states))
        self.emission_probability = np.zeros((self.num_states, self.num_observations))
        self.initial_state_probabilities = np.zeros(self.num_states)
        self.unknown_word_prob = 1e-5

    def train(self, training_data):
        # Estimate initial state probabilities
        for sentence in training_data:
            self.initial_state_probabilities[self.state_to_index[sentence[0][1]]] += 1

        self.initial_state_probabilities /= np.sum(self.initial_state_probabilities)

        # Estimate transition and emission probabilities
        for sentence in training_data:
            for i in range(len(sentence)):
                current_state = self.state_to_index[sentence[i][1]]
                if(i+1!=len(sentence)):
                    next_state = self.state_to_index[sentence[i + 1][1]]
                    self.transition_probability[current_state, next_state] += 1
                
                current_observation = self.observation_to_index[sentence[i][0]]
                self.emission_probability[current_state, current_observation] += 1

        # Laplace smoothing
        self.transition_probability = (self.transition_probability) / (
            np.sum(self.transition_probability, axis=1, keepdims=True))
        self.emission_probability = (self.emission_probability) / (
            np.sum(self.emission_probability, axis=1, keepdims=True))

    def viterbi_algorithm(self, observation_sequence):
        T = len(observation_sequence)
        V = np.zeros((self.num_states, T))
        B = np.zeros((self.num_states, T), dtype=int)

        # Initialization step
        V[:, 0] = self.initial_state_probabilities * self.get_emission_probabilities(observation_sequence[0])

        # Recursion step
        for t in range(1, T):
            for s in range(self.num_states):
                trans_prob = V[:, t - 1] * self.transition_probability[:, s]
                max_trans_prob = np.max(trans_prob)
                max_trans_prob_state = np.argmax(trans_prob)
                V[s, t] = max_trans_prob * self.get_emission_probabilities(observation_sequence[t], state=s)
                B[s, t] = max_trans_prob_state

        # Termination step
        best_path_prob = np.max(V[:, -1])
        best_last_state = np.argmax(V[:, -1])

        # Backtrack
        best_path = [best_last_state]
        for t in range(T - 1, 0, -1):
            best_last_state = B[best_last_state, t]
            best_path.insert(0, best_last_state)

        return best_path, best_path_prob

    def get_emission_probabilities(self, observation, state=None):
        if state is not None:
            if observation==len(self.observations):
                return self.unknown_word_prob
            else:
                return self.emission_probability[state, observation] 
        else:
            if(observation!=len(self.observations)):
                return np.array([self.emission_probability[s, observation] for s in range(self.num_states)])
            else:
             return np.array([self.unknown_word_prob]*self.num_states)
    
    def predict(self, sentence):
        sentence_indices =[]
        for word in sentence:
            if(word not in self.observations):
                sentence_indices.append(len(self.observations))
            else:
                sentence_indices.append(self.observations.index(word))

        best_path, best_path_prob = self.viterbi_algorithm(sentence_indices)

        predicted_tags = [self.states[state] for state in best_path]
        return predicted_tags, best_path_prob
    

# # Example usage:
# # Define POS tags and vocabulary
# states = ['Noun', 'Verb', 'Adjective']
# observations = ['cat', 'dog', 'run', 'jump', 'quick', 'lazy']

# # Define initial, transition, and emission probabilities
# initial_prob = np.array([0.4, 0.4, 0.2])
# transition_prob = np.array([[0.3, 0.4, 0.3],
#                              [0.2, 0.5, 0.3],
#                              [0.4, 0.3, 0.3]])
# emission_prob = np.array([[0.5, 0.2, 0.1, 0.1, 0.05, 0.05],
#                             [0.1, 0.2, 0.3, 0.1, 0.2, 0.1],
#                             [0.1, 0.1, 0.2, 0.4, 0.1, 0.1]])

# # Create HMM for POS tagging
# hmm = HMM_POS_Tagging(states, observations, initial_prob, transition_prob, emission_prob)

# # Example sentence for prediction
# test_sentence = ['the', 'quick', 'brown', 'fox', 'jumps', 'over', 'the', 'lazy', 'dog']

# # Predict POS tags using Viterbi algorithm
# predicted_tags, likelihood = hmm.predict(test_sentence)

# print("Predicted POS Tags:", predicted_tags)
# print("Likelihood of the sequence:", likelihood)


In [10]:
hmm=HMM(states, obs)
# test_data = pd.read_csv('test_small.csv')

# # untagged_test_sentences = test_data['untagged_sentence'].apply(eval).tolist()

# test_sentences=[]
hmm.train(tagged_sentences)
# sample=['For', 'you', 'have', 'been', 'reborn', ',', 'not', 'from', 'corruptible', 'seed', 'but', 'from', 'incorruptible', ',', 'through', 'the', 'word', 'of', 'God', '.']
test_data=pd.read_csv('test_small.csv')
output_list=[]

test_sentences=test_data["untagged_sentence"].apply(eval).tolist()
for sentence in test_sentences:
    best_path,best_path_prob=hmm.predict(sentence)
    temp_list=[]
    for i in range(len(sentence)):
        # temp_var=False
        # for t in sentence[i]:
        #     if(65<=ord(t)<=90 or 97<=ord(t)<=122 or 48<=ord(t)<=57):
        #         temp_var=True
        # if(temp_var==False):
        #     temp_tuple=(sentence[i], sentence[i])
        # else:
        temp_tuple=(sentence[i], best_path[i])
        temp_list.append(temp_tuple)
    output_list.append(temp_list)


In [13]:
ids = np.array(test_data['id'].to_list(), dtype="object")

print(output_list[0])
def save_tagged_sentences_to_csv(tagged_sentences, ids, filename):
    df = pd.DataFrame({'id': ids, 'tagged_sentence': tagged_sentences})

    df.to_csv(filename,index=False)

output_path=r'C:\Users\2828a\OneDrive\Desktop\ELL884 ASS1\output.csv'
save_tagged_sentences_to_csv(output_list, ids, output_path)
df = pd.read_csv(output_path)

[('For', 'CS'), ('you', 'PP'), ('have', 'HV'), ('been', 'BE'), ('reborn', 'VB'), (',', ','), ('not', '*'), ('from', 'IN'), ('corruptible', 'JJ'), ('seed', 'NN'), ('but', 'CC'), ('from', 'IN'), ('incorruptible', 'JJ'), (',', ','), ('through', 'IN'), ('the', 'AT'), ('word', 'NN'), ('of', 'IN'), ('God', 'NP'), ('.', '.')]
