In [24]:
# Importing libraries
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time

import nltk
import sklearn_crfsuite
from sklearn_crfsuite import metrics

from nltk.corpus import treebank
from nltk.corpus import brown

from nltk.classify import MaxentClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
import pycrfsuite
from nltk.tag import hmm
from nltk.classify import megam
from sklearn.model_selection import GridSearchCV
from nltk.tag import BrillTaggerTrainer
from nltk.tag import UnigramTagger
from nltk.tag import UnigramTagger
from nltk.tag import DefaultTagger
from nltk.metrics import ConfusionMatrix

In [25]:
nltk.download('brown')


[nltk_data] Downloading package brown to
[nltk_data]     C:\Users\21947074\AppData\Roaming\nltk_data...
[nltk_data]   Package brown is already up-to-date!


True

In [26]:
def sample_sentences(category, num_words=5000):
    # Get all the tagged sentences for the specified category
    tagged_sents = list(brown.tagged_sents(categories=category))

    # Randomly shuffle the sentences to ensure randomness
    random.shuffle(tagged_sents)

    # Initialize variables to keep track of the selected sentences and word count
    selected_sents = []
    word_count = 0

    # Iterate through sentences and add them to the selected set until reaching the desired word count
    for sent in tagged_sents:
        if word_count + len(sent) <= num_words:
            selected_sents.append(sent)
            word_count += len(sent)
        else:
            break

    return selected_sents

In [30]:
categories_to_evaluate = ['news', 'reviews', 'religion', 'government', 'learned', 'fiction', 'humor', 'romance'] 

Unigram

In [31]:

def cross_domain_f1_unigram(categories):
    num_categories = len(categories)
    f1_matrix = np.zeros((num_categories, num_categories))

    for i, train_category in enumerate(categories):
        # Get the tagged sentences for the training category
        train_tagged_sents = sample_sentences(train_category, num_words=5000)

        # Train a tagger on the training category
        default_tagger = DefaultTagger('NN')
        unigram_tagger = UnigramTagger(train_tagged_sents, backoff=default_tagger)

        for j, test_category in enumerate(categories):
            if test_category != train_category:
                # Get the tagged sentences for the testing category

                test_tagged_sents = sample_sentences(test_category, num_words=5000)
                test_untagged_words = [tup[0] for sent in test_tagged_sents for tup in sent]

                # Evaluate the tagger on the testing category
                tagged_sents = unigram_tagger.tag(test_untagged_words)
                predicted_tags = [tag for  _,tag in tagged_sents]

                # Flatten the actual and predicted tags
                actual_tags = [tup[1] for sent in test_tagged_sents for tup in sent]

                # Compute the F1 score and store it in the matrix
                f1 = f1_score(actual_tags, predicted_tags, average='weighted')
                f1_matrix[i, j] = f1

    return f1_matrix

 # Add more categories if needed


# random.seed(1234)
# f1_matrix_unigram = cross_domain_f1_unigram(categories_to_evaluate)
# np.savetxt("insert file path here", f1_matrix_unigram, delimiter=',', fmt='%f')


Brill

In [33]:

def cross_domain_brill(categories):
    num_categories = len(categories)
    f1_matrix = np.zeros((num_categories, num_categories))

    for i, train_category in enumerate(categories):
        # Get the tagged sentences for the training category
        train_tagged_sents = sample_sentences(train_category, num_words=5000)

        # Train a tagger on the training category
        #default_tagger = DefaultTagger('NN')
        #unigram_tagger = UnigramTagger(train_tagged_sents, backoff=default_tagger)


        default_tagger = DefaultTagger('NN')
        unigram_tagger = UnigramTagger(train_tagged_sents, backoff=default_tagger)

        templates = nltk.brill.nltkdemo18()
        trainer = BrillTaggerTrainer(templates=templates, initial_tagger=unigram_tagger)

        # Train the Brill Tagger using the templates
        brill_tagger = trainer.train(train_tagged_sents, max_rules=200)

        for j, test_category in enumerate(categories):
            if test_category != train_category:

                # Get the tagged sentences for the testing category
                test_tagged_sents = sample_sentences(test_category, num_words=5000)

                # Flatten into tagged words
                test_untagged_words = [tup[0] for sent in test_tagged_sents for tup in sent]


                # get predictions
                tagged_sents = brill_tagger.tag(test_untagged_words)

                # Flatten the actual and predicted tags
                actual_tags = [tup[1] for sent in test_tagged_sents for tup in sent]
                predicted_tags = [tag for  _,tag in tagged_sents]

                # Compute the F1 score and store it in the matrix
                f1 = f1_score(actual_tags, predicted_tags, average='weighted')
                f1_matrix[i, j] = f1

    return f1_matrix

# random.seed(1234)
# f1_matrix_brill = cross_domain_brill(categories_to_evaluate)
# np.savetxt("insert file path here", f1_matrix_brill, delimiter=',', fmt='%f')

HMM

In [35]:

def cross_domain_f1_hmm(categories):
    num_categories = len(categories)
    f1_matrix = np.zeros((num_categories, num_categories))

    for i, train_category in enumerate(categories):
        # Get the tagged sentences for the training category
        train_tagged_sents = sample_sentences(train_category, num_words=5000)

        # Train a tagger on the training category
        hmm_tagger = nltk.HiddenMarkovModelTagger.train(train_tagged_sents)
        

        for j, test_category in enumerate(categories):
            if test_category != train_category:
                # Get the tagged sentences for the testing category

                test_tagged_sents = sample_sentences(test_category, num_words=5000)
 
                predicted_tags = []
                actual_tags = []

                for h,sent in enumerate(test_tagged_sents):
                    predicted_tags += [tag for _, tag in hmm_tagger.tag([word for word, _ in sent])]
                    actual_tags += [tag for _, tag in sent]

                # Compute the F1 score and store it in the matrix
                f1 = f1_score(actual_tags, predicted_tags, average='weighted')
                f1_matrix[i, j] = f1

    return f1_matrix


# random.seed(1234)
# f1_matrix_hmm = cross_domain_f1_hmm(categories_to_evaluate)
# np.savetxt("insert file path here", f1_matrix_hmm, delimiter=',', fmt='%f')


In [37]:
def word_features(sentence, i):

    
    """
    Extract features for a given index in a sentence.

    Parameters:
    - sentence: List of feature-label pairs.
    - i: index

    Returns:
    - features: a dictionary of features on a given index.
    """
    
    word = sentence[i][0]
    tag = sentence[i][1]
    features = {
        'word': word,
        'is_first': i == 0,  # if the word is the first word
        'is_last': i == len(sentence) - 1,  # if the word is the last word
        'is_capitalized': word[0].upper() == word[0],
        'is_all_caps': word.upper() == word,  # word is in uppercase
        'is_all_lower': word.lower() == word,  # word is in lowercase
        # prefix of the word
        'prefix-1': word[0],
        'prefix-2': word[:2],
        'prefix-3': word[:3],
        # suffix of the word
        'suffix-1': word[-1],
        'suffix-2': word[-2:],
        'suffix-3': word[-3:],
        # extracting previous word
        'prev_word': '' if i == 0 else sentence[i - 1][0],
        # extracting next word
        'next_word': '' if i == len(sentence) - 1 else sentence[i + 1][0],
        'has_hyphen': '-' in word,  # if word has a hyphen
        'is_numeric': word.isdigit(),  # if word is numeric
        'capitals_inside': word[1:].lower() != word[1:]
    }

    # Add previous tag and its previous tag
    prev_tag = '' if i == 0 else sentence[i - 1][1]
    prev_prev_tag = '' if i < 2 else sentence[i - 2][1]
    features['prev_prev_tag'] = f'{prev_prev_tag}_{prev_tag}'

    # Add word after the next word
    features['next_next_word'] = '' if i > len(sentence) - 3 else sentence[i + 2][0]

    # Add word before the previous word
    features['prev_prev_word'] = '' if i < 2 else sentence[i - 2][0]

    return features


MEMM

In [None]:

def cross_domain_f1_memm(categories, maxit):
    num_categories = len(categories)
    f1_matrix = np.zeros((num_categories, num_categories))

    for i, train_category in enumerate(categories):
        # Get the tagged sentences for the training category
        train_tagged_sents = sample_sentences(train_category, num_words=5000)

                # feature extraction    
        X_train = []
        y_train = []
        for sentence in train_tagged_sents:
            X_sentence = []
            y_sentence = []
            for sent in range(len(sentence)):
                X_sentence.append(word_features(sentence, sent))
                y_sentence.append(sentence[sent][1])
            X_train.append(X_sentence)
            y_train.append(y_sentence) 

        MEMM_train = []  # Collect feature-label pairs for MEMM
        for sentence_features, sentence_labels in zip(X_train, y_train):
            MEMM_train.extend(list(zip(sentence_features, sentence_labels)))

        # Train a tagger on the training category
        maxent_classifier = MaxentClassifier.train(MEMM_train, algorithm='gis', max_iter=maxit)
        

        for j, test_category in enumerate(categories):
            if test_category != train_category:
                # Get the tagged sentences for the testing category

                test_tagged_sents = sample_sentences(test_category, num_words=5000)

                X_test = []
                y_test = []
                for sentence in test_tagged_sents:
                    X_sentence = []
                    y_sentence = []
                    for sent in range(len(sentence)):
                        X_sentence.append(word_features(sentence, sent))
                        y_sentence.append(sentence[sent][1])
                    X_test.append(X_sentence)
                    y_test.append(y_sentence) 

                MEMM_test = []  # Collect feature-label pairs for MEMM
                for sentence_features, sentence_labels in zip(X_test, y_test):
                    MEMM_test.extend(list(zip(sentence_features, sentence_labels)))

                
                # predictions 
                predicted_tags = maxent_classifier.classify_many([features for features, _ in MEMM_test])

                actual_tags = [pos for _, pos in MEMM_test]

                # Compute the F1 score and store it in the matrix
                f1 = f1_score(actual_tags, predicted_tags, average='weighted')
                f1_matrix[i, j] = f1

    return f1_matrix

# random.seed(1234)
# f1_matrix_memm = cross_domain_f1_memm(categories_to_evaluate, maxit=30)
# np.savetxt("insert file path here", f1_matrix_memm, delimiter=',', fmt='%f')

CRF


In [39]:

def cross_domain_f1_crf(categories, paramgrid):
    num_categories = len(categories)
    f1_matrix = np.zeros((num_categories, num_categories))

    for i, train_category in enumerate(categories):

        # Get the tagged sentences for the training category
        train_tagged_sents = sample_sentences(train_category, num_words=5000)

                # feature extraction    
        X_train = []
        y_train = []
        for sentence in train_tagged_sents:
            X_sentence = []
            y_sentence = []
            for sent in range(len(sentence)):
                X_sentence.append(word_features(sentence, sent))
                y_sentence.append(sentence[sent][1])
            X_train.append(X_sentence)
            y_train.append(y_sentence) 

            # Train a tagger on the training category
            # training using the tuned value
            trainer = pycrfsuite.Trainer(verbose=False)

            # Add training data
            for x, y in zip(X_train, y_train):
                trainer.append(x, y)

            # Set trainer parameters
            trainer.set_params(paramgrid)

            # Train the CRF model
            trainer.train('pos.crfsuite')
            

        for j, test_category in enumerate(categories):
            if test_category != train_category:
                # Get the tagged sentences for the testing category

                test_tagged_sents = sample_sentences(test_category, num_words=5000)

                X_test = []
                y_test = []
                for sentence in test_tagged_sents:
                    X_sentence = []
                    y_sentence = []
                    for sent in range(len(sentence)):
                        X_sentence.append(word_features(sentence, sent))
                        y_sentence.append(sentence[sent][1])
                    X_test.append(X_sentence)
                    y_test.append(y_sentence) 

                
                # Testing
                # Initialize the tagger
                tagger = pycrfsuite.Tagger()
                tagger.open('pos.crfsuite')


                # predictions
                CRF_predictions = [tagger.tag(instance) for instance in X_test]

                predicted_tags = [tag for instance_tags in CRF_predictions for tag in instance_tags]
                actual_tags = [tag for instance_tags in y_test for tag in instance_tags]

                # Compute the F1 score and store it in the matrix
                f1 = f1_score(actual_tags, predicted_tags, average='weighted')
                f1_matrix[i, j] = f1

    return f1_matrix


cf_params = {
    "max_iterations": 50,
    "c1": 0.01,
    "c2": 0.001,
    "feature.possible_transitions": True
}

# random.seed(1234)
# f1_matrix_crf = cross_domain_f1_crf(categories_to_evaluate, paramgrid=cf_params)
# np.savetxt("insert file path here", f1_matrix_crf, delimiter=',', fmt='%f')