# Initialization

In [None]:
# --- Standard library ---
import os
import re
import math

#BEFORE importing transformers
os.environ["TRANSFORMERS_NO_TORCHVISION"] = "1"

# --- Third-party: numerics / plotting ---
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import emoji



# --- Third-party: PyTorch stack ---
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import accelerate
from torch.optim import AdamW
# --- Third-party: scikit-learn ---
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import accuracy_score, f1_score
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, f1_score, balanced_accuracy_score
from sklearn.utils import resample

# --- Third-party: Transformers ---
import transformers  
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    pipeline,
    get_linear_schedule_with_warmup,
    EarlyStoppingCallback,
    DataCollatorWithPadding,
    EvalPrediction,
    AutoModel,
    
)

import torch.nn as nn
from torch.utils.data import WeightedRandomSampler, SubsetRandomSampler


In [None]:
# load data sets
df = pd.read_csv("hf://datasets/StephanAkkerman/financial-tweets/financial_tweets.csv")
stock_df = pd.read_csv('stock_tweets.csv', low_memory=False)

In [None]:
# Keep only relevant columns
sentiment_df = df[["description", "sentiment"]].copy()

# Filter only valid sentiment labels
sentiment_df = sentiment_df[sentiment_df["sentiment"].isin(["Bullish", "Neutral", "Bearish"])]

# Map text labels to numeric values
label_map = {"Bearish": 0, "Neutral": 1, "Bullish": 2}
sentiment_df["sentiment"] = sentiment_df["sentiment"].map(label_map)

In [None]:
# Check the head using the familiar Pandas method
print(sentiment_df.columns)
print(stock_df.columns)

print(f"\nTotal elements in sentiment_df (via shape): {sentiment_df.shape[0]}")
print(f"Total elements in stock_df (via shape): {stock_df.shape[0]}")
# print("Sentiment Dataset Head:")
# print(sentiment_df.head())

# print("\nStock Dataset Info:")
# sentiment_df.info()

# print("Sentiment Dataset Head:")
# print(sentiment_df.head())

# print("\nStock Dataset Info:")
# sentiment_df.info()

In [None]:
def plot_distribution(df, label_column, title_prefix, color='skyblue'):
    """
    Plots and prints the count and ratio distribution for a categorical column.

    Parameters
    ----------
    df : pd.DataFrame
        The DataFrame to analyze.
    label_column : str
        The name of the column containing categorical labels (e.g., company, sentiment).
    title_prefix : str
        Prefix for the plot title (e.g., 'Company', 'Sentiment').
    color : str, optional
        Color for the bar plot (default is 'skyblue').

    Returns
    -------
    tuple
        (counts, ratios) as pandas Series objects.
    """
    # Count occurrences
    counts = df[label_column].value_counts()
    ratios = counts / counts.sum()

    # Plot distribution
    plt.figure(figsize=(10, 6))
    counts.plot(kind='bar', color=color)
    plt.title(f'{title_prefix} Distribution')
    plt.xlabel(title_prefix)
    plt.ylabel('Number of Records')
    plt.xticks(rotation=45 if title_prefix.lower() == 'company' else 0, ha='right')
    plt.tight_layout()
    plt.show()

    # Display results
    print(f"=== {title_prefix} Ratios ===")
    print(ratios)
    print(f"\n=== {title_prefix} Counts ===")
    print(counts)
    
    return counts, ratios

In [None]:
# For Stock DataFrame
stock_counts, stock_ratios = plot_distribution(
    df=stock_df, 
    label_column='Stock Name', 
    title_prefix='Company',
    color='skyblue'
)

# For Sentiment DataFrame
sentiment_counts, sentiment_ratios = plot_distribution(
    df=sentiment_df, 
    label_column='sentiment', 
    title_prefix='Sentiment',
    color='lightcoral'
)

In [None]:
def get_balanced_subset(df_name: pd.DataFrame, df_column_name: str, num: int, random_state: int = 42) -> pd.DataFrame:
    """
    Returns a roughly balanced subset of size `num` from the given DataFrame.
    
    Parameters
    ----------
    df_name : pd.DataFrame
        The input dataset.
    df_column_name : str
        The name of the column containing class labels.
    num : int
        The total number of samples desired.
    random_state : int, optional
        Random seed for reproducibility (default=42).
    
    Returns
    -------
    pd.DataFrame
        A subset of the original DataFrame with roughly balanced class distribution.
    """
    # Unique classes and samples per class
    classes = df_name[df_column_name].unique()
    samples_per_class = num // len(classes)
    
    balanced_subset = []
    
    for c in classes:
        class_subset = df_name[df_name[df_column_name] == c]
        
        # Sample from each class
        subset = resample(class_subset,
                          replace=False,
                          n_samples=min(len(class_subset), samples_per_class),
                          random_state=random_state)
        balanced_subset.append(subset)
    
    # Combine and shuffle
    balanced_df = pd.concat(balanced_subset)
    balanced_df = balanced_df.sample(frac=1, random_state=random_state).reset_index(drop=True)
    
    return balanced_df

In [None]:
stock_df_balanced = get_balanced_subset(df_name=stock_df, df_column_name='Stock Name', num=sentiment_df.shape[0])
sentiment_classes = sentiment_df["sentiment"].nunique()  # likely 3
num2 = stock_df_balanced.shape[0] - (stock_df_balanced.shape[0] % sentiment_classes)
# Trim stock to num2 as well so they match
stock_df_balanced = stock_df_balanced.sample(n=num2, random_state=42).reset_index(drop=True)
sentiment_df_balanced = get_balanced_subset(sentiment_df, 'sentiment', num=num2)

In [None]:
# For Stock DataFrame
stock_counts, stock_ratios = plot_distribution(
    df=stock_df_balanced, 
    label_column='Stock Name', 
    title_prefix='Company',
    color='skyblue'
)

# For Sentiment DataFrame
sentiment_counts, sentiment_ratios = plot_distribution(
    df=sentiment_df_balanced, 
    label_column='sentiment', 
    title_prefix='Sentiment',
    color='lightcoral'
)

In [None]:
##Preprocessign

In [None]:
from TweetNormalizer import normalizeTweet

# =========================================
# STOCK_DF PROCESSING
# =========================================

# 1. Create a unique pair identifier combining Stock Name and Company Name
stock_df_processed = stock_df_balanced.copy()
stock_df_processed["pair"] = stock_df_processed["Stock Name"] + "___" + stock_df_processed["Company Name"]

# 2) Build mapping: frequent pairs keep their own ID, others â†’ OTHERS
MIN_COUNT = 200  #more than 200
pair_counts = stock_df_processed["pair"].value_counts()
frequent = sorted(pair_counts.index[pair_counts > MIN_COUNT].tolist())

pair_to_id = {p: i for i, p in enumerate(frequent)}
OTHERS_ID = len(pair_to_id)

stock_df_processed["label"] = (
    stock_df_processed["pair"]
    .where(stock_df_processed["pair"].isin(frequent), "OTHERS")
    .map({**pair_to_id, "OTHERS": OTHERS_ID})
)

# 3) Clean columns and text
stock_df_processed.drop(columns=["pair", "Stock Name", "Date", "Company Name"], inplace=True, errors="ignore")
stock_df_processed.rename(columns={"Tweet": "text"}, inplace=True)
# 4. Normalize text
stock_df_processed["text"] = stock_df_processed["text"].astype(str).apply(normalizeTweet)

label2id = {**pair_to_id, "OTHERS": OTHERS_ID}
id2label = {v: k for k, v in label2id.items()}


# 7. Inspect
print("Stock DataFrame Head after processing:")
print(stock_df_processed.head())
print("\nFinal Columns in stock_df_processed:")
print(stock_df_processed.columns)

# =========================================
# SENTIMENT_DF PROCESSING
# =========================================

sentiment_df_processed = sentiment_df_balanced.copy()

# 1. Rename columns to match stock_df
sentiment_df_processed.rename(columns={"description": "text", "sentiment": "label"}, inplace=True)
sentiment_df_processed["text"] = sentiment_df_processed["text"].astype(str)

# 2. Normalize text
sentiment_df_processed["text"] = sentiment_df_processed["text"].apply(normalizeTweet)

# 3. Inspect
print("\nSentiment DataFrame Head after processing:")
print(sentiment_df_processed.head())
print("\nFinal Columns in sentiment_df_processed:")
print(sentiment_df_processed.columns)

In [None]:
stock_counts, stock_ratios = plot_distribution(
    df=stock_df_processed, 
    label_column='label', 
    title_prefix='Company',
    color='skyblue'
)

# Sentiment analysis (finBERT)

In [None]:
#Split and tokenize
# SPlit the pre processed data into taining and validation. 
train, val = train_test_split(
    sentiment_df_processed,
    test_size=0.2,
    stratify=sentiment_df_processed["label"], # keeps the label distrubation for both sets
    random_state=42, # makes it repeatble
)

# Pull tokenizer for finBERT
tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")

# Tokenize train and val texts
train_encodings = tokenizer(
    train["text"].tolist(), # input format
    truncation=True, # cut sequences longer than max_length
    padding=True, # padd shorter sequences than max_length
    max_length=96,    
    return_tensors="pt"  # output format
)

val_encodings = tokenizer(
    val["text"].tolist(),
    truncation=True,
    padding=True,
    max_length=96,
    return_tensors="pt"
)

In [None]:
#Create wrapper and datasets
# Pytorch Dataset wrapper
class FinBERTDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


# Create two datasets
train_dataset = FinBERTDataset(train_encodings, train["label"].astype(int).tolist())
val_dataset   = FinBERTDataset(val_encodings,   val["label"].astype(int).tolist())

In [None]:
# Load finBERT model
model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")
# label mapping
model.config.id2label = {0: "Bearish", 1: "Neutral", 2: "Bullish"}  # match your dataset meaning
model.config.label2id = {"Bearish": 0, "Neutral": 1, "Bullish": 2}

# unfreeze all layers(Full fine tuning) 
for p in model.parameters():
    p.requires_grad = True

# unfreeze classification layers(Fine tuning)
#for param in model.bert.parameters():
#    param.requires_grad = False

# Use mps on my mac /change when using VM)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model.to(device)
print(device)


In [None]:
# Metrics function
def compute_metrics(eval_pred):
    """Compute accuracy and F1 scores from model evaluation outputs.

    Accepts either a (logits, labels) tuple or a transformers.EvalPrediction.

    Args:
        eval_pred: Tuple[np.ndarray, np.ndarray] or transformers.EvalPrediction
            - logits: array of shape (num_examples, num_classes)
            - labels: array of shape (num_examples,)

    Returns:
        Dict[str, float]: {
            "accuracy": overall accuracy,
            "f1_macro": macro-averaged F1 across classes,
            "f1_weighted": class-frequency-weighted F1
        }
    """
    if isinstance(eval_pred, tuple):
        logits, labels = eval_pred
    else:
        logits, labels = eval_pred.predictions, eval_pred.label_ids

    preds = np.argmax(logits, axis=-1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1_macro": f1_score(labels, preds, average="macro"),
        "f1_weighted": f1_score(labels, preds, average="weighted"),
    }

In [None]:
# Training config
args = TrainingArguments(
    output_dir="./finbert_trained_tweets",
    num_train_epochs=10,
    learning_rate=2e-5, # try with 1e-5 and 3e-5
    per_device_train_batch_size=16, # Try with 32
    per_device_eval_batch_size=16, # Try with 32
    weight_decay=0.01,
    warmup_ratio=0.1, #initial ramp up phase
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=1, 
    load_best_model_at_end=True, # save the model which perform best
    metric_for_best_model="f1_macro", # aiming for high f1 macro
    greater_is_better=True, 
    report_to=[],
    seed=42,
)

# Trainer setup
trainer = Trainer(
    model=model,
    args=args,
    tokenizer=tokenizer,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=DataCollatorWithPadding(tokenizer=tokenizer, pad_to_multiple_of=8),
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)],#if f1 macro dosent improve for 2 epochs -> stop
)


In [None]:
#Train the model
trainer.train()
# evalutet the best model and print the results
eval_results = trainer.evaluate()
print(eval_results)

# save best model
trainer.save_model("./finbert_trained_tweets/best")
tokenizer.save_pretrained("./finbert_trained_tweets/best")

# Company Classification (BERTweet)

In [None]:
# Split the pre processed data into taining and validation. 
train, val= train_test_split(
    stock_df_processed,
    test_size=0.2,
    stratify=stock_df_processed["label"],
    random_state=42,
)


# Pull tokenizer for BERTweet
tokenizer = AutoTokenizer.from_pretrained(
    "vinai/bertweet-base",
    use_fast=False, # Recommended to use slow tokenizer
    normalization=True #tweet normalization (just to be safe)
)

# Tokenize train and val texts
train_enc = tokenizer(
    train["text"].tolist(), # input format
    truncation=True, # cut sequences longer than max_length
    padding=True, # padd shorter sequences than max_length
    max_length=96,
    return_tensors="pt" # output format
)
val_enc = tokenizer(
    val["text"].tolist(),
    truncation=True,
    padding=True,
    max_length=96,
    return_tensors="pt"
)

In [None]:
#Create wrapper and datasets
# Pytorch Dataset wrapper
class TweetCompanyDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {k: v[idx] for k, v in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)

# Create two datasets
train_dataset = TweetCompanyDataset(train_enc, train["label"].tolist())
val_dataset   = TweetCompanyDataset(val_enc,   val["label"].tolist())

In [None]:
# Label maps from dataset
label2id = pair_to_id                       # str -> int
id2label = {v: k for k, v in label2id.items()}  # int -> str
num_labels = len(label2id)

# Load BERTweet with a classification head of size 3
model = AutoModelForSequenceClassification.from_pretrained(
    "vinai/bertweet-base",
    num_labels=num_labels,
    id2label=id2label,
    label2id=label2id,
)

# unfreeze all layers(Full fine tuning) 
for p in model.parameters():
    p.requires_grad = True

# unfreeze classification layers(Fine tuning)
#for param in model.bert.parameters():
#    param.requires_grad = False


# Use mps on my mac /change when using VM)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model.to(device)
print(device)

In [None]:
# --- TrainingArguments: select BEST CHECKPOINT BY ACCURACY ---
args = TrainingArguments(
    output_dir="./berttweet-company",
    num_train_epochs=15,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    learning_rate=2e-5,
    lr_scheduler_type="cosine",
    weight_decay=0.01,
    warmup_ratio=0.06, #initial ramp up phase
    report_to=[],
    save_strategy="epoch",
    eval_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1_macro", # aiming for high f1 macro
    greater_is_better=True,
    save_total_limit=1,
    seed=42,
    label_smoothing_factor=0.1,
    gradient_accumulation_steps=2 
)


# Trainer setup
trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,           
    data_collator=DataCollatorWithPadding(tokenizer, pad_to_multiple_of=8),
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=4)],
)


In [None]:
#Train the model
trainer.train()

# evalutet the best model and print the results
eval_results = trainer.evaluate()
print(eval_results)

# Save
trainer.save_model("./berttweet-company/best")
tokenizer.save_pretrained("./berttweet-company/best")

# Function to pipeline both models

In [None]:
# Load
company_clf = pipeline(
    "text-classification",
    model="./berttweet-company/best",
    tokenizer="./berttweet-company/best",
    return_all_scores=True
)
sentiment_clf = pipeline(
    "text-classification",
    model="./finbert_trained_tweets/best",
    tokenizer="./finbert_trained_tweets/best",
    return_all_scores=True
)

def analyze_tweet_pretty(tweet: str):
    text = tweet.strip()

    # Company
    comp_scores = company_clf([text])[0]  # list of
    comp_top = max(comp_scores, key=lambda x: x["score"])
    # tidy up
    comp_label = re.sub(r"_+", "_", comp_top["label"]).strip("_")
    comp_score = round(float(comp_top["score"]), 2)

    # Sentiment (binary)
    sent_scores = sentiment_clf([text])[0]
    sent_top = max(sent_scores, key=lambda x: x["score"])
    raw_lbl = sent_top["label"].lower()
    if "pos" in raw_lbl:
        sent_label = "Bullish"
    elif "neg" in raw_lbl:
        sent_label = "Bearish"
    else:
        # keep model label as is
        sent_label = sent_top["label"]
    sent_score = round(float(sent_top["score"]), 2)


    return {comp_label: {"score": comp_score}}, {sent_label: {"score": sent_score}}

# Example:
analyze_tweet_pretty("Am considering taking Tesla private at $420. Funding secured.")
