L2-regularized logistic regression for binary or multiclass classification; trains a model (on `train.txt`), optimizes L2 regularization strength on `dev.txt`, and evaluates performance on `test.txt`.  Reports test accuracy with 95% confidence intervals and prints out the strongest coefficients for each class.

In [None]:
from scipy import sparse
from sklearn import linear_model
from collections import Counter
import numpy as np
import operator
import nltk
import math
from scipy.stats import norm
from sklearn.preprocessing import StandardScaler

In [None]:
!python3 -m nltk.downloader punkt

In [None]:
def load_data(filename):
    X = []
    Y = []
    with open(filename, encoding="utf-8") as file:
        for line in file:
            cols = line.split("\t")
            idd = cols[0]
            label = cols[2].lstrip().rstrip()
            text = cols[3]

            X.append(text)
            Y.append(label)

    return X, Y


In [None]:
class Classifier:

    def __init__(self, feature_method, feature_kwargs, trainX, trainY, devX, devY, testX, testY):
        self.feature_vocab = {}
        self.feature_method = feature_method
        self.feature_kwargs = feature_kwargs
        self.min_feature_count=2
        self.log_reg = None

        self.trainY=trainY
        self.devY=devY
        self.testY=testY
        
        self.trainX = self.process(trainX, training=True)
        self.devX = self.process(devX, training=False)
        self.testX = self.process(testX, training=False)

    # Featurize entire dataset
    def featurize(self, data):
        featurized_data = []
        for text in data:
            feats = self.feature_method(text, **self.feature_kwargs)
            featurized_data.append(feats)
        return featurized_data

    # Read dataset and returned featurized representation as sparse matrix + label array
    def process(self, X_data, training = False):
        
        data = self.featurize(X_data)

        if training:
            fid = 0
            feature_doc_count = Counter()
            for feats in data:
                for feat in feats:
                    feature_doc_count[feat]+= 1

            for feat in feature_doc_count:
                if feature_doc_count[feat] >= self.min_feature_count:
                    self.feature_vocab[feat] = fid
                    fid += 1

        F = len(self.feature_vocab)
        D = len(data)
        X = sparse.dok_matrix((D, F))
        for idx, feats in enumerate(data):
            for feat in feats:
                if feat in self.feature_vocab:
                    X[idx, self.feature_vocab[feat]] = feats[feat]

        return X


    # Train model and evaluate on held-out data
    def train(self):
        scaler = StandardScaler(with_mean=False)  # Use with_mean=False for sparse matrix compatibility
        self.trainX = scaler.fit_transform(self.trainX)
        self.devX = scaler.transform(self.devX)
        self.testX = scaler.transform(self.testX)
        (D,F) = self.trainX.shape
        best_dev_accuracy=0
        best_model=None
        for C in [0.1, 1, 10, 100]:
            self.log_reg = linear_model.LogisticRegression(C = C, max_iter=1000)
            self.log_reg.fit(self.trainX, self.trainY)
            training_accuracy = self.log_reg.score(self.trainX, self.trainY)
            development_accuracy = self.log_reg.score(self.devX, self.devY)
            if development_accuracy > best_dev_accuracy:
                best_dev_accuracy=development_accuracy
                best_model=self.log_reg

#             print("C: %s, Train accuracy: %.3f, Dev accuracy: %.3f" % (C, training_accuracy, development_accuracy))

        self.log_reg=best_model
        

    def test(self):
        return self.log_reg.score(self.testX, self.testY)
        

    def printWeights(self, n=10):

        reverse_vocab=[None]*len(self.log_reg.coef_[0])
        for k in self.feature_vocab:
            reverse_vocab[self.feature_vocab[k]]=k

        # binary
        if len(self.log_reg.classes_) == 2:
              weights=self.log_reg.coef_[0]

              cat=self.log_reg.classes_[1]
              for feature, weight in list(reversed(sorted(zip(reverse_vocab, weights), key = operator.itemgetter(1))))[:n]:
                  print("%s\t%.3f\t%s" % (cat, weight, feature))
              print()

              cat=self.log_reg.classes_[0]
              for feature, weight in list(sorted(zip(reverse_vocab, weights), key = operator.itemgetter(1)))[:n]:
                  print("%s\t%.3f\t%s" % (cat, weight, feature))
              print()

        # multiclass
        else:
          for i, cat in enumerate(self.log_reg.classes_):

              weights=self.log_reg.coef_[i]

              for feature, weight in list(reversed(sorted(zip(reverse_vocab, weights), key = operator.itemgetter(1))))[:n]:
                  print("%s\t%.3f\t%s" % (cat, weight, feature))
              print()

            

In [None]:
import numpy as np
from scipy import sparse
from nltk.tokenize import word_tokenize
import spacy
import gensim.downloader as api
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Load resources
nlp = spacy.load("en_core_web_sm")  # For NER
word_vectors = api.load("word2vec-google-news-300")  # For Word Embeddings
analyzer = SentimentIntensityAnalyzer()  # For Sentiment Analysis


In [None]:

def featurize(text, **kwargs):
    feats = {}
    words = word_tokenize(text.lower())

#     # Basic BOW features
    if kwargs.get('bag_of_words', True):
        for word in words:
            feats[f"word_{word}"] = 1

    # Sentiment features
    if kwargs.get('sentiment', False):
        sentiment = analyzer.polarity_scores(text)
        feats.update({
            'sentiment_neg': sentiment['neg'],
            'sentiment_neu': sentiment['neu'],
            'sentiment_pos': sentiment['pos'],
            'sentiment_compound': sentiment['compound']
        })

    # Word embeddings (averaged)
    if kwargs.get('word_embed', False):
        embeddings = [word_vectors[word] for word in words if word in word_vectors]
        if embeddings:
            avg_embedding = np.mean(embeddings, axis=0)
            for i, value in enumerate(avg_embedding):
                feats[f"embedding_{i}"] = value

    # NER features
    if kwargs.get('ner', False):
        doc = nlp(text)
        for ent in doc.ents:
            feats[f"ner_{ent.label_}_{ent.text}"] = 1
            
    if kwargs.get('hyperbolae', False):
        hyperbolic_count = count_hyperbolic_terms(text)
        feats['hyperbolic_count'] = hyperbolic_count
        
    if kwargs.get('topics', False):
        topics = get_document_topics(text)
        for topic_id, weight in topics:
            if weight > 0.5:
                feats[f"topic_{topic_id}"] = weight
            
    return feats



In [None]:
def confidence_intervals(accuracy, n, significance_level):
    critical_value=(1-significance_level)/2
    z_alpha=-1*norm.ppf(critical_value)
    se=math.sqrt((accuracy*(1-accuracy))/n)
    return accuracy-(se*z_alpha), accuracy+(se*z_alpha)

In [None]:
def run(trainingFile, devFile, testFile, featurize_kwargs={}):
    trainX, trainY=load_data(trainingFile)
    devX, devY=load_data(devFile)
    testX, testY=load_data(testFile)
    
    simple_classifier = Classifier(featurize, featurize_kwargs, trainX, trainY, devX, devY, testX, testY)
    simple_classifier.train()
    accuracy=simple_classifier.test()
    
    lower, upper=confidence_intervals(accuracy, len(testY), .95)
    print("Test accuracy for best dev model: %.3f, 95%% CIs: [%.3f %.3f]\n" % (accuracy, lower, upper))

    simple_classifier.printWeights()
    return accuracy
    


# Make train test splits

In [None]:
import pandas as pd
adj_data = pd.read_table("./AP2/adjudicated_data.txt")
adj_data

In [None]:
from sklearn.model_selection import train_test_split
train, test = train_test_split(adj_data, test_size=100)
train, dev = train_test_split(train, test_size=100)
len(train), len(dev), len(test)

In [None]:
train.to_csv("./splits/train.txt", sep="\t", index=False)
dev.to_csv("./splits/dev.txt", sep="\t", index=False)
test.to_csv("./splits/test.txt", sep="\t", index=False)

In [None]:
trainingFile = "splits/train.txt"
devFile = "splits/dev.txt"
testFile = "splits/test.txt"

featurize_kwargs = {
    'bag_of_words': True
}
    
run(trainingFile, devFile, testFile, featurize_kwargs)


## Improving on the baseline

## Sentiment

In [None]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

def sentiment_scores(text):
    analyzer = SentimentIntensityAnalyzer()
    scores = analyzer.polarity_scores(text)
    return scores



## Word Embeddings

In [None]:
import gensim.downloader as api
from nltk.tokenize import word_tokenize

# Load pre-trained Word2Vec model.
word_vectors = api.load("word2vec-google-news-300")

def get_word_embedding(text):
    words = word_tokenize(text.lower())
    embeddings = [word_vectors[word] for word in words if word in word_vectors]
    # Average the word vectors of all words in the text
    if embeddings:
        return sum(embeddings) / len(embeddings)
    else:
        return np.zeros(300)  # Return zero vector if no words have embeddings


## NER

In [None]:
import spacy

# Load an English NLP model
nlp = spacy.load("en_core_web_sm")

def named_entities(text):
    doc = nlp(text)
    return [(ent.text, ent.label_) for ent in doc.ents]


## Topic Features

In [None]:
from gensim.corpora.dictionary import Dictionary
from gensim.models.ldamodel import LdaModel
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import re
nltk.download('stopwords')
nltk.download('punkt')

trainingFile = "splits/train.txt"
devFile = "splits/dev.txt"

def preprocess_text(text):
    # Lowercase, remove special characters, and tokenize text
    tokens = word_tokenize(re.sub(r'[\W_]+', ' ', text.lower()))
    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    tokens = [token for token in tokens if token not in stop_words and len(token) > 1]
    return tokens

def read_documents(file_path):
    documents = []
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            cells = line.split("\t")
            documents.append(preprocess_text(cells[-1]))
            
    return documents

# Read and preprocess the training and dev data
train_documents = read_documents(trainingFile)
dev_documents = read_documents(devFile)

# Combine train and dev documents if you want to use both for training
documents = train_documents + dev_documents

# Assuming 'documents' is a list of tokenized documents (list of list of tokens)
dictionary = Dictionary(documents)
corpus = [dictionary.doc2bow(doc) for doc in documents]

# Train LDA model
lda_model = LdaModel(corpus=corpus, id2word=dictionary, num_topics=10, random_state=42)


def get_document_topics(text):
    tokens = preprocess_text(text)
    # Get the bag-of-words representation for the text
    bow = dictionary.doc2bow(tokens)
    # Retrieve the list of topics with their probabilities
    topics = lda_model.get_document_topics(bow)

    return topics


## Hyperbolic Terms

In [None]:
hyperbolic_terms = ["unbelievable", "amazing", "incredible", "never", "always", "worst", "best"]

def count_hyperbolic_terms(text):
    words = word_tokenize(text.lower())
    count = sum(1 for word in words if word in hyperbolic_terms)
    return count


In [None]:
from itertools import product
from tqdm import tqdm

params = {
    'bag_of_words': [True, False],
    'sentiment': [True, False],
    'word_embed': [True, False],
    'ner': [True, False],
    'hyperbolae': [True, False],
    'topics': [True, False]
}

keys = params.keys()
values = (params[key] for key in keys)
kwargs_combinations = [dict(zip(keys, combination)) 
                for combination in product(*values) 
                if any(combination)]
# featurize_kwargs = {
#     'bag_of_words': False,
#     'sentiment': False,
#     'word_embed': True,
#     'ner': True, 
#     'hyperbolae': False,
#     'topics': True
# }
    
# run(trainingFile, devFile, testFile, featurize_kwargs)

results = []

for kwarg_combination in tqdm(kwargs_combinations):
    # Run the model with the current combination of features
    accuracy = run(trainingFile, devFile, testFile,kwarg_combination)
    results.append((kwarg_combination, accuracy))

# Find the best performing configuration
best_combination = max(results, key=lambda x: x[1])

print("Best configuration:", best_combination[0])
print("Highest accuracy:", best_combination[1])



In [None]:
results