In [67]:
import csv
import pandas as pd
import re
import numpy as np
from tqdm import tqdm

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from scipy.sparse import hstack, csr_matrix

# Question (a)

In [26]:
# Extract features that are indicative of sentiment
train_filepath = "resources/train.tsv"

categorized_reviews = {"pos": [], "neg": []}
with open(train_filepath, "r") as f:
    tsv_reader = csv.reader(f, delimiter="\t")
    for row in tsv_reader:
        star = row[0]
        review = row[2]

        if star == "4":
            categorized_reviews["pos"].append(review.lower())
        else:
            categorized_reviews["neg"].append(review.lower())

In [31]:
# Quantify freq of 10 features
features = [
    "terrible",
    "good",
    "soggy",
    "bland",
    "bad",
    "expensive",
    "average",
    "slow",
    "like",
    "amazing",
]

feature_counts = []
for feature in features:
    positive_freq = len(
        [
            rev
            for rev in categorized_reviews["pos"]
            if feature in rev
        ]
    )
    negative_freq = len(
        [
            rev
            for rev in categorized_reviews["neg"]
            if feature in rev
        ]
    )

    feature_counts.append(
        {
            "word": feature,
            "pos": positive_freq * 100 / 1000,
            "neg": negative_freq * 100 / 1000,
        }
    )

feature_counts = pd.DataFrame(feature_counts)

print(feature_counts)

        word   pos   neg
0   terrible   0.3   3.6
1       good  56.9  43.8
2      soggy   0.3   1.6
3      bland   1.9   9.2
4        bad   7.6  17.3
5  expensive   3.3   3.8
6    average   3.7   5.3
7       slow   2.8   6.4
8       like  35.4  43.6
9    amazing   5.7   2.1


# Question (b)

In [3]:
# Categorize training/dev/test data by label and description
train_filepath = "resources/train.tsv"
dev_filepath = "resources/dev.tsv"
test_filepath = "resources/test.tsv"

train_data = []
with open(train_filepath, "r") as f:
    tsv_reader = csv.reader(f, delimiter="\t")
    for row in tsv_reader:
        star = row[0]
        review = row[2]

        train_data.append(
            {"label": star, "review": review.lower()}
        )

train_data = pd.DataFrame(train_data)

dev_data = []
with open(dev_filepath, "r") as f:
    tsv_reader = csv.reader(f, delimiter="\t")
    for row in tsv_reader:
        star = row[0]
        doc_id = row[1]
        review = row[2]

        dev_data.append(
            {
                "label": star,
                "doc_id": doc_id,
                "review": review.lower(),
            }
        )

dev_data = pd.DataFrame(dev_data)

test_data = []
with open(test_filepath, "r") as f:
    tsv_reader = csv.reader(f, delimiter="\t")
    for row in tsv_reader:
        star = row[0]
        doc_id = row[1]
        review = row[2]

        test_data.append(
            {
                "label": star,
                "doc_id": doc_id,
                "review": review.lower(),
            }
        )

test_data = pd.DataFrame(test_data)

In [4]:
# Convert to bag of words representation
count_vect = CountVectorizer()
train_counts = count_vect.fit_transform(train_data["review"])

# Convert to tf-idf representation
tf_transformer = TfidfTransformer(use_idf=False).fit(
    train_counts
)
train_tf = tf_transformer.transform(train_counts)

# Train a Naive Bayes classifier
nb_model = MultinomialNB().fit(train_tf, train_data["label"])

In [5]:
# Total num features
print(
    f"Vocab size of training set: {len(count_vect.vocabulary_)}"
)

# Feature representation of first doc in dev set
first_doc_features = count_vect.transform(
    [dev_data["review"][0]]
)
first_doc_tf = tf_transformer.transform(first_doc_features)

feature_names = count_vect.get_feature_names_out()
nonzero_idx = first_doc_tf.nonzero()[1]
feature_values = first_doc_tf.data

print("Feature representation of first doc in dev set...")
for idx, value in zip(nonzero_idx, feature_values):
    print(f"{feature_names[idx]} - {value}")

Vocab size of training set: 11468
Feature representation of first doc in dev set...
again - 0.07293249574894728
and - 0.4375949744936837
average - 0.07293249574894728
both - 0.07293249574894728
business - 0.07293249574894728
chef - 0.07293249574894728
chicken - 0.07293249574894728
china - 0.14586499149789456
deserves - 0.07293249574894728
does - 0.07293249574894728
enjoyed - 0.07293249574894728
fan - 0.07293249574894728
first - 0.07293249574894728
food - 0.07293249574894728
forward - 0.07293249574894728
friendly - 0.07293249574894728
general - 0.07293249574894728
good - 0.14586499149789456
grand - 0.07293249574894728
great - 0.07293249574894728
have - 0.14586499149789456
if - 0.07293249574894728
in - 0.07293249574894728
is - 0.14586499149789456
it - 0.14586499149789456
items - 0.07293249574894728
just - 0.07293249574894728
lady - 0.07293249574894728
look - 0.07293249574894728
management - 0.07293249574894728
menu - 0.21879748724684184
new - 0.07293249574894728
not - 0.21879748724684184

In [6]:
# Predict on first 10 docs of dev data
dev_features = count_vect.transform(dev_data["review"][:10])
dev_predictions = nb_model.predict(dev_features)

print("Predictions for first 10 docs in dev set...")
for doc_id, pred in zip(
    dev_data["doc_id"][:10], dev_predictions
):
    print(f"DocID: {doc_id}, Predicted label: {pred}")

Predictions for first 10 docs in dev set...
DocID: ZSJnW6faaNFQoqq4ALqYg, Predicted label: 4
DocID: Rcbv11hm5AYEwZyqYwAvg, Predicted label: 2
DocID: rkRTjhu5szaBggeFVcVJlA, Predicted label: 4
DocID: dhmeDsQGUS1FXMLs49SWjQ, Predicted label: 4
DocID: z9zfIMYmRRCE4ggfOIieEw, Predicted label: 4
DocID: Xtb3pGSh39bqcozkBECw, Predicted label: 2
DocID: DOUflAGzxLsXG6xOmR1w, Predicted label: 2
DocID: 0RxCEWURe08CTcZt95F4AQ, Predicted label: 2
DocID: MzUg5twEcCyd0X6lBMP2Lg, Predicted label: 2
DocID: uNlw2D5CYKk0wjNxLtYw, Predicted label: 4


In [7]:
# Generate predictions on full dev data
dev_features = count_vect.transform(dev_data["review"])
dev_predicted = nb_model.predict(dev_features)
dev_results = pd.DataFrame(
    {"doc_id": dev_data["doc_id"], "predicted": dev_predicted}
)

dev_results.to_csv("nb_prediction_dev.csv", index=False)

# Generate predictions on full test data
test_features = count_vect.transform(test_data["review"])
test_predicted = nb_model.predict(test_features)
test_results = pd.DataFrame(
    {"doc_id": test_data["doc_id"], "predicted": test_predicted}
)

test_results.to_csv("nb_prediction_test.csv", index=False)

# Question (c)

In [8]:
def evaluate_predictions(pred, actual, pos_label="4"):
    """
    Returns evaluation metrics on predictions.
    """
    true_pos = 0
    false_pos = 0
    false_neg = 0
    true_neg = 0

    # Extract prediction counts
    for p, a in zip(pred, actual):
        if p == pos_label and a == pos_label:
            true_pos += 1
        elif p == pos_label and a != pos_label:
            false_pos += 1
        elif p != pos_label and a == pos_label:
            false_neg += 1
        else:
            true_neg += 1

    # Compute metrics
    precision = true_pos / (true_pos + false_pos)
    recall = true_pos / (true_pos + false_neg)
    f1 = 2 * precision * recall / (precision + recall)

    return precision, recall, f1


# Evaluate for positive class on dev set
dev_features = count_vect.transform(dev_data["review"])
dev_predictions = nb_model.predict(dev_features)
precision, recall, f1 = evaluate_predictions(
    dev_predictions, dev_data["label"]
)
print(
    f"Dev set results (Positive class) - Precision: {precision}, Recall: {recall}, F1: {f1}"
)

Dev set results (Positive class) - Precision: 0.8661137440758294, Recall: 0.731, F1: 0.7928416485900217


In [16]:
# Analyze incorrect predictions on dev set
false_positives = []
false_negatives = []

for pred, actual, review, doc_id in zip(
    dev_predictions,
    dev_data["label"],
    dev_data["review"],
    dev_data["doc_id"],
):
    if (pred == "4") & (actual == "2"):
        false_positives.append(
            {"doc_id": doc_id, "review": review}
        )
    if (pred == "2") & (actual == "4"):
        false_negatives.append(
            {"doc_id": doc_id, "review": review}
        )

# Question (d)

In [18]:
# Train a SVM classifier
svm_model = LinearSVC().fit(train_tf, train_data["label"])

In [19]:
# Predict on first 10 docs of dev data
dev_features = count_vect.transform(dev_data["review"][:10])
dev_predictions = svm_model.predict(dev_features)

print("Predictions for first 10 docs in dev set...")
for doc_id, pred in zip(
    dev_data["doc_id"][:10], dev_predictions
):
    print(f"DocID: {doc_id}, Predicted label: {pred}")

Predictions for first 10 docs in dev set...
DocID: ZSJnW6faaNFQoqq4ALqYg, Predicted label: 4
DocID: Rcbv11hm5AYEwZyqYwAvg, Predicted label: 4
DocID: rkRTjhu5szaBggeFVcVJlA, Predicted label: 2
DocID: dhmeDsQGUS1FXMLs49SWjQ, Predicted label: 4
DocID: z9zfIMYmRRCE4ggfOIieEw, Predicted label: 4
DocID: Xtb3pGSh39bqcozkBECw, Predicted label: 2
DocID: DOUflAGzxLsXG6xOmR1w, Predicted label: 2
DocID: 0RxCEWURe08CTcZt95F4AQ, Predicted label: 2
DocID: MzUg5twEcCyd0X6lBMP2Lg, Predicted label: 2
DocID: uNlw2D5CYKk0wjNxLtYw, Predicted label: 2


In [21]:
def evaluate_predictions(pred, actual, pos_label="4"):
    """
    Returns evaluation metrics on predictions.
    """
    true_pos = 0
    false_pos = 0
    false_neg = 0
    true_neg = 0

    # Extract prediction counts
    for p, a in zip(pred, actual):
        if p == pos_label and a == pos_label:
            true_pos += 1
        elif p == pos_label and a != pos_label:
            false_pos += 1
        elif p != pos_label and a == pos_label:
            false_neg += 1
        else:
            true_neg += 1

    # Compute metrics
    precision = true_pos / (true_pos + false_pos)
    recall = true_pos / (true_pos + false_neg)
    f1 = 2 * precision * recall / (precision + recall)

    return precision, recall, f1


# Evaluate for positive class on dev set
dev_features = count_vect.transform(dev_data["review"])
dev_predictions = svm_model.predict(dev_features)
precision, recall, f1 = evaluate_predictions(
    dev_predictions, dev_data["label"]
)
print(
    f"Dev set results (Positive class) - Precision: {precision}, Recall: {recall}, F1: {f1}"
)

Dev set results (Positive class) - Precision: 0.8096135721017907, Recall: 0.859, F1: 0.8335759340126153


In [26]:
# Analyze incorrect predictions on dev set
false_positives = []
false_negatives = []

for pred, actual, review, doc_id in zip(
    dev_predictions,
    dev_data["label"],
    dev_data["review"],
    dev_data["doc_id"],
):
    if (pred == "4") & (actual == "2"):
        false_positives.append(
            {"doc_id": doc_id, "review": review}
        )
    if (pred == "2") & (actual == "4"):
        false_negatives.append(
            {"doc_id": doc_id, "review": review}
        )

false_negatives[20:25]

[{'doc_id': 'PmWYwnFJKAM9JQx2vAUgeg',
  'review': "leaving the club...looking for late night eats...and in-n-out's closed!  this chicago kid was ready to commit ritualistic suicide on the spot.  well, it turned out to be a blessing in disguise, as we came here, and i finally got to try carne asada fries.  this leads me to believe that tacos mexico used to be known as tacos france (seriously though...what a stupid name).  whatever...even though they weren't on the menu, they sure did serve the fries.  and while my californian friends remarked that it's made a lot better near them, i had no complaints.  they were cheap, plentiful, and damn tasty."},
 {'doc_id': 'UZ1BbY6z4dbakukZhDXpLQ',
  'review': 'i have not been to this place in like 10 years.  got invited to go to lunch today and it was exactly the same as it was the last time i was there.  buffet was pretty good and the menu has not changed either.  not much to look at but for a 6.00 lunch what do you expect.'},
 {'doc_id': 'Lxd9H2R

In [20]:
# Generate predictions on full dev data
dev_features = count_vect.transform(dev_data["review"])
dev_predicted = svm_model.predict(dev_features)
dev_results = pd.DataFrame(
    {"doc_id": dev_data["doc_id"], "predicted": dev_predicted}
)

dev_results.to_csv("svm_prediction_dev.csv", index=False)

# Generate predictions on full test data
test_features = count_vect.transform(test_data["review"])
test_predicted = svm_model.predict(test_features)
test_results = pd.DataFrame(
    {"doc_id": test_data["doc_id"], "predicted": test_predicted}
)

test_results.to_csv("svm_prediction_test.csv", index=False)

# Question (e)

In [36]:
# Categorize SentiWordNet data
swn_filepath = "resources/SentiWordNet_3.0.0_20130122.txt"

swn_data = []
with open(swn_filepath, "r") as f:
    for line in f:
        if not line.strip().startswith("#"):
            line_data = line.strip().split("\t")

            # Split synset terms that have multiple words
            terms = [
                re.sub(r"#\d+$", "", term)
                for term in line_data[4].split()
            ]
            for term in terms:
                swn_data.append(
                    {
                        "PosScore": float(line_data[2]),
                        "NegScore": float(line_data[3]),
                        "SynsetTerms": term,
                    }
                )

swn_data = pd.DataFrame(swn_data)

# Remove words that have zero pos/neg scores
swn_data = swn_data[
    (swn_data["PosScore"] > 0) | (swn_data["NegScore"] > 0)
]

In [64]:
def compute_sentiment_score(
    review: str, swn_data: pd.DataFrame
) -> float:
    """
    Returns sentiment score (pos-neg scores) for a review.
    """
    swn_terms = swn_data["SynsetTerms"].unique()

    # Extract words from review
    words = re.findall(r"\b\w+\b", review)
    total_score = 0

    for word in words:
        if word in swn_terms:
            pos_score = swn_data.loc[
                swn_data["SynsetTerms"] == word
            ]["PosScore"].values[0]
            neg_score = swn_data.loc[
                swn_data["SynsetTerms"] == word
            ]["NegScore"].values[0]
            review_score = pos_score - neg_score
            total_score += review_score

    return total_score


# Generate sentiment scores for all reviews in training data
sentiment_scores = [
    compute_sentiment_score(r, swn_data)
    for r in tqdm(train_data["review"])
]

100%|██████████| 2000/2000 [41:57<00:00,  1.26s/it] 


In [65]:
def negation_bigrams_tokenizer(review: str):
    """
    Bigram tokenizer that handles negation.
    """
    negation_words = ["n't", "not", "no", "never"]

    # Strip review to words
    words = re.findall(r"\w+(?:'\w+)*|[^\w\s]", review)
    prepend_negation = False

    bigrams = []
    for word in words:
        # keep already negated words
        if any(nw in word for nw in negation_words):
            bigrams.append(word)
            prepend_negation = True

        # Stop negation when punctuation reached
        elif word in [".", "!", "?", ",", ";", ":"]:
            prepend_negation = False
            bigrams.append(word)

        # Switch on negation
        elif prepend_negation:
            # Tag word with NOT_
            bigrams.append("NOT_" + word)

        # Regular word
        else:
            bigrams.append(word)

    return bigrams

In [68]:
sentiment_sparse = csr_matrix(
    np.array(sentiment_scores)
).reshape(-1, 1)

# Generate bag of words representation
bag_of_words = count_vect.transform(train_data["review"])

# Generate bigrams with negation handling
vectorizer = CountVectorizer(
    tokenizer=negation_bigrams_tokenizer, ngram_range=(1, 2)
)
negation_bigrams = vectorizer.fit_transform(train_data["review"])

# Combine all features
training_features = hstack(
    [bag_of_words, sentiment_sparse, negation_bigrams]
)

# Train SVM with all features
svm_model_v2 = LinearSVC().fit(
    training_features, train_data["label"]
)

In [70]:
# Obtain features for dev set
dev_bag_of_words = count_vect.transform(dev_data["review"])
dev_sentiment_scores = [
    compute_sentiment_score(r, swn_data)
    for r in tqdm(dev_data["review"])
]
dev_sentiment_sparse = csr_matrix(
    np.array(dev_sentiment_scores).reshape(-1, 1)
)
dev_negation_bigrams = vectorizer.transform(dev_data["review"])
dev_features = hstack(
    [
        dev_bag_of_words,
        dev_sentiment_sparse,
        dev_negation_bigrams,
    ]
)

# Predict on dev set
dev_predictions = svm_model_v2.predict(dev_features)

precision, recall, f1 = evaluate_predictions(
    dev_predictions, dev_data["label"]
)
print(
    f"Dev set results (Positive class) - Precision: {precision}, Recall: {recall}, F1: {f1}"
)

100%|██████████| 2000/2000 [39:57<00:00,  1.20s/it] 


Dev set results (Positive class) - Precision: 0.851, Recall: 0.851, F1: 0.851


In [82]:
def evaluate_predictions(pred, actual, pos_label="4"):
    """
    Returns evaluation metrics on predictions.
    """
    true_pos = 0
    false_pos = 0
    false_neg = 0
    true_neg = 0

    # Extract prediction counts
    for p, a in zip(pred, actual):
        if p == pos_label and a == pos_label:
            true_pos += 1
        elif p == pos_label and a != pos_label:
            false_pos += 1
        elif p != pos_label and a == pos_label:
            false_neg += 1
        else:
            true_neg += 1

    # Compute metrics
    precision = true_pos / (true_pos + false_pos)
    recall = true_pos / (true_pos + false_neg)
    f1 = 2 * precision * recall / (precision + recall)

    return precision, recall, f1


precision, recall, f1 = evaluate_predictions(
    dev_predictions, dev_data["label"]
)
print(
    f"Dev set results (Positive class) - Precision: {precision}, Recall: {recall}, F1: {f1}"
)

Dev set results (Positive class) - Precision: 0.851, Recall: 0.851, F1: 0.851


In [81]:
# Generate predictions on full test data
test_bag_of_words = count_vect.transform(test_data["review"])
test_sentiment_scores = [
    compute_sentiment_score(r, swn_data)
    for r in tqdm(test_data["review"])
]
test_sentiment_sparse = csr_matrix(
    np.array(test_sentiment_scores).reshape(-1, 1)
)
test_negation_bigrams = vectorizer.transform(test_data["review"])
test_features = hstack(
    [
        test_bag_of_words,
        test_sentiment_sparse,
        test_negation_bigrams,
    ]
)

test_predicted = svm_model_v2.predict(test_features)
test_results = pd.DataFrame(
    {"doc_id": test_data["doc_id"], "predicted": test_predicted}
)

test_results.to_csv("svm_v2_prediction_test.csv", index=False)

with open("svm_v2_prediction_test.txt", "w") as file:
    for id, pred in zip(test_data["doc_id"], test_predicted):
        file.write(f"{id}\t{pred}\n")

100%|██████████| 2000/2000 [39:15<00:00,  1.18s/it]
