Create a tool to analyze sentiment for products based on Twitter posts, to be used by companies to monitor public perception of their products. 

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import EarlyStopping
from scikeras.wrappers import KerasClassifier
from keras import metrics
import tensorflow as tf
from tensorflow.keras import backend as K

from nltk.corpus import stopwords
from nltk.corpus import stopwords, wordnet
from nltk.stem import WordNetLemmatizer
from nltk import pos_tag, word_tokenize
import nltk
nltk.download("wordnet")
nltk.download("averaged_perceptron_tagger")


import warnings
warnings.filterwarnings("ignore")

In [None]:
df = pd.read_csv('judge-1377884607_tweet_product_company.csv', encoding='latin1')
df.info()

In [None]:
df.dropna(subset=['tweet_text'], inplace=True)
df.drop_duplicates(subset=['tweet_text'], inplace=True)
df

In [None]:
pos_neg_df = df[(df['is_there_an_emotion_directed_at_a_brand_or_product'] == 'Positive emotion') | (df['is_there_an_emotion_directed_at_a_brand_or_product'] == 'Negative emotion')]

In [None]:
pos_neg_tweets = pos_neg_df['tweet_text']

In [None]:
pos_neg_sentiment = pos_neg_df['is_there_an_emotion_directed_at_a_brand_or_product'].copy()

In [None]:
sentiment_count = pos_neg_sentiment.value_counts()
print(f'Sentiment count: \n{sentiment_count}')
print(f'Positive: {round(sentiment_count[0]/sum(sentiment_count)*100,1)}%')

In [None]:
X_train, X_test, y_train, y_test = train_test_split(pos_neg_tweets,pos_neg_sentiment, 
                                                    test_size=0.15, 
                                                    random_state=42, 
                                                    stratify=pos_neg_sentiment)
# X_val = X_train[-int(len(X_train) * 0.2):]
# y_val = y_train[-int(len(y_train) * 0.2):]

In [None]:
def f1_macro(y_true, y_pred):
    """
    Multiclass F1 Score (macro average) for Keras.
    Works with sparse categorical crossentropy labels.
    """
    # Convert y_true to one-hot
    y_true = tf.one_hot(tf.cast(y_true, tf.int32), depth=tf.shape(y_pred)[-1])
    
    # Binarize predictions
    y_pred = tf.one_hot(tf.argmax(y_pred, axis=-1), depth=tf.shape(y_pred)[-1])
    
    # True positives, false positives, false negatives
    tp = tf.reduce_sum(tf.cast(y_true * y_pred, "float"), axis=0)
    fp = tf.reduce_sum(tf.cast((1 - y_true) * y_pred, "float"), axis=0)
    fn = tf.reduce_sum(tf.cast(y_true * (1 - y_pred), "float"), axis=0)

    precision = tp / (tp + fp + K.epsilon())
    recall = tp / (tp + fn + K.epsilon())
    f1 = 2 * precision * recall / (precision + recall + K.epsilon())
  
    # Macro average (mean across classes)
    return tf.reduce_mean(f1)

In [None]:
def run_pipe(X,y, num_words=2000, 
             remove_stopwords=False, 
             lemmatize=False, 
             num_epochs=200, 
             show_results=False, 
             balance_classes=False):

    # Compute weights to balance positive and negative comments to combat data imbalance
    classes = np.unique(y)
    class_weights = [1]*len(classes)
    if balance_classes:
        class_weights = compute_class_weight(
            class_weight="balanced",
            classes=classes,
            y=y)
        if len(classes) == 3:
            class_weights=[1/.15,1/.6,1/.4]

    class_weights = dict(zip(classes,class_weights))

    # ---------------------------
    # Step 1: Custom text prep transformer
    # ---------------------------

    class TextPrep(BaseEstimator, TransformerMixin):
        def __init__(self):
            self.remove_stopwords = remove_stopwords
            self.lemmatize = lemmatize
            self.stop_words = set(stopwords.words("english"))
            self.lemmatizer = WordNetLemmatizer()

        def get_wordnet_pos(self, treebank_tag):
            """Map POS tag to WordNet format for better lemmatization"""
            if treebank_tag.startswith("J"):
                return wordnet.ADJ
            elif treebank_tag.startswith("V"):
                return wordnet.VERB
            elif treebank_tag.startswith("N"):
                return wordnet.NOUN
            elif treebank_tag.startswith("R"):
                return wordnet.ADV
            else:
                return wordnet.NOUN  # fallback

        def fit(self, X, y=None):
            return self

        def transform(self, X):
            if isinstance(X, np.ndarray):
                X = X.ravel()
            X = [str(x) for x in X]

            processed = []
            for text in X:
                words = word_tokenize(text)

                # Remove stopwords if enabled
                if self.remove_stopwords:
                    words = [w for w in words if w.lower() not in self.stop_words]

                # Lemmatization if enabled
                if self.lemmatize:
                    pos_tags = pos_tag(words)
                    words = [self.lemmatizer.lemmatize(w, self.get_wordnet_pos(tag)) 
                             for w, tag in pos_tags]
                processed.append(" ".join(words))

            return processed        
        
    # ---------------------------
    # Step 2: Wrap tokenizer
    # ---------------------------
    class KerasTokenizer(BaseEstimator, TransformerMixin):
        def __init__(self, mode="binary"):
            self.num_words = num_words
            self.mode = mode
            self.tokenizer = Tokenizer(num_words=self.num_words)

        def fit(self, X, y=None):
            self.tokenizer.fit_on_texts(X)
            return self

        def transform(self, X):
            return self.tokenizer.texts_to_matrix(X, mode=self.mode)    
      
    # ---------------------------
    # Step 3: Build model factory
    # ---------------------------
    def build_model(input_dim, n_classes):
        model = Sequential()
        model.add(Dense(50, activation="relu", input_shape=(input_dim,)))
        model.add(Dense(25, activation="relu"))
        model.add(Dense(n_classes, activation="softmax"))
        model.compile(
            optimizer="adam",
            loss="sparse_categorical_crossentropy",
            metrics=['accuracy'],
            weighted_metrics=[]
        )
        return model

    # ---------------------------
    # Step 4: Setup pipeline
    # ---------------------------
    n_classes = len(np.unique(y))  # adjust to your labels

    pipeline = Pipeline([
        ("prep", TextPrep()),
        ("tok", KerasTokenizer()),
        ("clf", KerasClassifier(
            build_fn=lambda: build_model(
                input_dim=pipeline.named_steps["tok"].num_words, 
                n_classes=n_classes
            ),
            epochs=num_epochs,
            batch_size=256,
            verbose=0,
            validation_split=0.2, 
            callbacks=[EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)],
            class_weight=class_weights
        ))
    ])
    
    # ---------------------------
    # Step 5: Fit
    # ---------------------------
    pipeline.fit(X, y)
        
    # ---------------------------
    # Step 6: Test results
    # ---------------------------   
    
    # Access history directly (no `.history`)
    pipe_history = pipeline.named_steps["clf"].history_

#     test_acc = pipeline.score(X_test, y_test)
#     print("Test Accuracy:", test_acc)

    X_val = X[-int(len(X) * 0.2):]
    y_val = y[-int(len(y) * 0.2):]

    # Get predictions and F1 score
    y_pred = pipeline.predict(X_val)
    pipe_f1 = f1_score(y_val, y_pred, average='macro')
#     print(y_val.value_counts())
    print('\nPredictions:')
    display(pd.DataFrame(y_pred).value_counts())

    # Detailed classification report
#     print(classification_report(y_test, y_pred))
    
    if show_results:
        # Confusion matrix
        print(confusion_matrix(y_val, y_pred))
        # Plot accuracy
        print("Max Val Accuracy:", max(pipe_history["val_accuracy"]), accuracy_score(y_val, y_pred))
        print("F1 score: ", pipe_f1)
        plt.figure(figsize=(8,5))
        plt.plot(pipe_history["accuracy"], label="Train Accuracy")
        if "val_accuracy" in pipe_history:
            plt.plot(pipe_history["val_accuracy"], label="Validation Accuracy")
        plt.title("Epoch vs Accuracy")
        plt.xlabel("Epoch")
        plt.ylabel("Accuracy")
        plt.legend()
        plt.show()
    
    return max(pipe_history["val_accuracy"]), pipe_f1;

In [None]:
def parameter_check (X_par, y_par, num_words_list, plot=True, balance_classes=False, show_results=False):
    
    results=pd.DataFrame(columns=["number_of_words","val_accuracy", 'remove_stopwords', 'lemmatize', 'f1'])
    i=1
    for x in num_words_list:
        for stop_tf in [False,True]:
            for lem_tf in [False,True]:
                acc, f1=run_pipe(X_par,y_par, 
                                 num_words=x,
                                 remove_stopwords=stop_tf, 
                                 lemmatize=lem_tf, 
                                 balance_classes=balance_classes, 
                                 show_results=show_results)
                
                print(f'({i} of {len(num_words_list)*4}) Stopwords: {stop_tf}, Lemmatize: {lem_tf}, Number of words: {x}, Val accuracy: {round(acc,4)}, F1 score: {round(f1,4)}')
                new_row_data = {'number_of_words': [x], 'val_accuracy': [acc], 'remove_stopwords': [stop_tf], 'lemmatize': [lem_tf], 'f1': [f1]}
                new_row = pd.DataFrame(new_row_data)
                results = pd.concat([results,new_row], ignore_index=True)
                i += 1
    results["config"] = results.apply(
        lambda row: f"Stopwords={row['remove_stopwords']}, Lemmatize={row['lemmatize']}", axis=1
    )

    results = results.sort_values("number_of_words")

    if plot:
        fig, ax1 = plt.subplots(figsize=(10,6))
        sns.lineplot(
            data=results,
            x="number_of_words",
            y="val_accuracy",
            hue="config",
            style="config",
            ax=ax1,
            markers=True,
            dashes=False)
        
        plt.title("Validation Accuracy vs Number of Words", fontsize=14)
        plt.xlabel("Number of Words", fontsize=12)
        plt.ylabel("Validation Accuracy", fontsize=12)
        plt.legend(title="Preprocessing Config")
        plt.grid(True, alpha=0.3)
        plt.show()
        
        fig, ax2 = plt.subplots(figsize=(10,6))
        
        sns.lineplot(
            data=results,
            x="number_of_words",
            y="f1",
            hue="config",
            style="config",
            ax=ax2,
            markers=True,
            dashes=True)
        plt.title("Validation F1 Score vs Number of Words", fontsize=14)
        plt.xlabel("Number of Words", fontsize=12)
        plt.ylabel("Validation F1 Score", fontsize=12)
        plt.legend(title="Preprocessing Config")
        plt.grid(True, alpha=0.3)
        plt.show()
        
    return results

In [None]:
sentiment_count = y_test.value_counts()
print(f'Sentiment count: \n{sentiment_count}')
sentiment_count[0]/sum(sentiment_count)

In [None]:
num_words_list = [300, 1000, 2000, 5000]
results = parameter_check(X_train,y_train, num_words_list, show_results=True)

In [None]:
display(results)

### add another preprocessing step? There was somethng i was thinking about that i can't remember now
Add step to artificially increase negative comments. can't remember the name for it. Unbalanced data?

In [None]:
num_words_list = [300, 1000, 2000, 5000]
results = parameter_check(X_train,y_train, num_words_list, balance_classes=True, show_results=True)

In [None]:
results

In [None]:
# run_pipe(X_train,y_train, num_words=500, num_epochs=300, remove_stopwords=False, lemmatize=True)

# don't use X_test, y_test in final version. print val results

### add neutral/can't tell

In [None]:
rand_df = df[df['is_there_an_emotion_directed_at_a_brand_or_product'] != "I can't tell"].sample(frac=1, random_state=42).reset_index(drop=True)

In [None]:
rand_df['is_there_an_emotion_directed_at_a_brand_or_product'].value_counts()

In [None]:
tweets = rand_df['tweet_text']
sentiment = rand_df['is_there_an_emotion_directed_at_a_brand_or_product'].copy()

In [None]:
X_train_all, X_test_all, y_train_all, y_test_all = train_test_split(tweets,sentiment, test_size=0.15, random_state=7, stratify=sentiment)
# X_val_all = X_train_all[-int(len(X_train_all) * 0.2):]
# y_val_all = y_train_all[-int(len(y_train_all) * 0.2):]

In [None]:
round(sentiment.value_counts()/len(sentiment)*100,1)

In [None]:
round(y_train_all.value_counts()/len(y_train_all)*100,1)

In [None]:
round(y_test_all.value_counts()/len(y_test_all)*100,1)

In [None]:
y_val_all = y_train_all[-int(len(y_train_all) * 0.2):]
round(y_val_all.value_counts()/len(y_val_all)*100,1)

In [None]:
num_words_list = [300, 1000, 2000, 5000]
results = parameter_check(X_train_all,y_train_all, num_words_list, balance_classes=False, show_results=True)

In [None]:
762/(31+71+595+55+762)

In [None]:
num_words_list = [300, 1000, 2000, 5000]
results = parameter_check(X_train_all,y_train_all, num_words_list, balance_classes=True, show_results=True)

In [None]:
y_val_all.value_counts()