In [2]:
# Downloading Corpus from NLTK
import nltk
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from nltk.corpus import treebank

nltk.download('treebank')
nltk.download('universal_tagset')


# Load the Treebank corpus
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))

tagged_words = [ tup for sent in nltk_data for tup in sent ]

tags = {tag for word,tag in tagged_words}
vocab = {word for word,tag in tagged_words}

# compute Emission Probability
def word_given_tag(word, tag, train_bag = tagged_words):
    tag_list = [pair for pair in train_bag if pair[1]==tag]
    count_tag = len(tag_list)
    w_given_tag_list = [pair[0] for pair in tag_list if pair[0]==word]
    count_w_given_tag = len(w_given_tag_list)
     
    return (count_w_given_tag, count_tag)

# compute Transition Probability
def t2_given_t1(t2, t1, train_bag = tagged_words):
    tags = [pair[1] for pair in train_bag]
    count_t1 = len([t for t in tags if t==t1])
    count_t2_t1 = 0
    for index in range(len(tags)-1):
        if tags[index]==t1 and tags[index+1] == t2:
            count_t2_t1 += 1
    return (count_t2_t1, count_t1)

# creating t x t transition matrix of tags, t= no of tags
# Matrix(i, j) represents P(jth tag after the ith tag)
 
tags_matrix = np.zeros((len(tags), len(tags)), dtype='float32')
for i, t1 in enumerate(list(tags)):
    for j, t2 in enumerate(list(tags)): 
        tags_matrix[i, j] = t2_given_t1(t2, t1)[0]/t2_given_t1(t2, t1)[1]

tags_df = pd.DataFrame(tags_matrix, columns = list(tags), index=list(tags))
 
def Viterbi(words, train_bag = tagged_words):
    state = []
    T = list(set([pair[1] for pair in train_bag]))
     
    for key, word in enumerate(words):
        #initialise list of probability column for a given observation
        p = [] 
        for tag in T:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
                 
            # compute emission and state probabilities
            emission_p = word_given_tag(words[key], tag)[0]/word_given_tag(words[key], tag)[1]
            state_probability = emission_p * transition_p    
            p.append(state_probability)
             
        pmax = max(p)
        # getting state for which probability is maximum
        state_max = T[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))

[nltk_data] Downloading package treebank to
[nltk_data]     C:\Users\user\AppData\Roaming\nltk_data...
[nltk_data]   Package treebank is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     C:\Users\user\AppData\Roaming\nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


In [3]:
import nltk
import numpy as np
import random
from nltk.corpus import movie_reviews
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from nltk.tokenize import word_tokenize

# Download the movie_reviews dataset if not already downloaded
nltk.download("movie_reviews")

[nltk_data] Downloading package movie_reviews to
[nltk_data]     C:\Users\user\AppData\Roaming\nltk_data...
[nltk_data]   Package movie_reviews is already up-to-date!


True

In [4]:
# Load the movie_reviews corpus
documents = [(list(movie_reviews.words(fileid)), category)
             for category in movie_reviews.categories()
             for fileid in movie_reviews.fileids(category)]

# Shuffle the documents to ensure randomness
random.shuffle(documents)

# Extract the text and labels
reviews = [" ".join(document) for document, category in documents]
labels = [category for _, category in documents]

# Split the data into train, validation, and test sets (70-15-15 split)
X_train, X_temp, y_train, y_temp = train_test_split(reviews, labels, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# TF-IDF Vectorizer
tfidf_vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)
X_val_tfidf = tfidf_vectorizer.transform(X_val)
X_test_tfidf = tfidf_vectorizer.transform(X_test)

# Combine POS tag features with TF-IDF embeddings
def combine_features(tfidf_vectors, sentences):
    combined_features = []
    for i in range(len(sentences)):
        # Extract TF-IDF vector for the sentence
        tfidf_vector = tfidf_vectors[i].toarray()

        # Calculate POS tag features for the sentence
        pos_tags = Viterbi(sentences[i].split())
        
        # Calculate average TF-IDF weight for words associated with each POS tag
        avg_tfidf_per_tag = {}
        for word, tag in pos_tags:
            if tag not in avg_tfidf_per_tag:
                avg_tfidf_per_tag[tag] = []
            word_index = tfidf_vectorizer.vocabulary_.get(word, -1)
            if word_index != -1:
                avg_tfidf_per_tag[tag].append(tfidf_vector[0][word_index])
        
        # Calculate total count of each POS tag in the sentence
        tag_counts = {tag: pos_tags.count(tag) for word, tag in pos_tags}
        
        # Calculate mean TF-IDF for each tag
        mean_tfidf_per_tag = {tag: np.mean(weights) if weights else 0.0 for tag, weights in avg_tfidf_per_tag.items()}
        
        # Combine calculated features
        combined_feature = np.concatenate([tfidf_vector, np.array([list(tag_counts.values())]), np.array([list(mean_tfidf_per_tag.values())])])
        combined_features.append(combined_feature)
    
    return np.array(combined_features)

X_train_combined = combine_features(X_train_tfidf, X_train)
X_val_combined = combine_features(X_val_tfidf, X_val)
X_test_combined = combine_features(X_test_tfidf, X_test)

# Train the same classifier on the new features
naive_bayes_classifier = MultinomialNB()
naive_bayes_classifier.fit(X_train_combined, y_train)

# Predict
test_predictions = naive_bayes_classifier.predict(X_test_combined)

# Evaluate
test_accuracy = accuracy_score(y_test, test_predictions)
print("Test Accuracy:", test_accuracy)
print(classification_report(y_test, test_predictions))


KeyboardInterrupt: 