In [None]:
# Libraries

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

In [None]:
# Load teh dataset
df = pd.read_csv("stock_news.csv", index_col = 0)

In [None]:
def find_few_shot_examples(df):
    few_shot_pos = None
    few_shot_neu = None
    few_shot_neg = None
    drop_indices = []

    for idx, dtpoint in df.iterrows():
        if dtpoint['label'] == "Positive" and few_shot_pos is None:
            few_shot_pos = dtpoint
            drop_indices.append(idx)
        elif dtpoint['label'] == "Neutral" and few_shot_neu is None:
            few_shot_neu = dtpoint
            drop_indices.append(idx)
        elif dtpoint['label'] == "Negative" and few_shot_neg is None:
            few_shot_neg = dtpoint
            drop_indices.append(idx)

        if few_shot_pos is not None and few_shot_neu is not None and few_shot_neg is not None:
            break

    # Drop the selected rows all at once after the loop
    df = df.drop(drop_indices).reset_index(drop=True)

    return df, few_shot_pos, few_shot_neu, few_shot_neg

In [None]:
# If doing PEFT models with n-shot prompting
# Run this cell for 3 an d1-shot prompting
df, few_shot_pos, few_shot_neu, few_shot_neg = find_few_shot_examples(df)

In [None]:
# If doing PEFT models with n-shot prompting
# Run this cell for 6-shot-prompt
df, few_shot_pos, few_shot_neu, few_shot_neg = find_few_shot_examples(df)
df, few_shot_pos1, few_shot_neu1, few_shot_neg1 = find_few_shot_examples(df)

In [None]:
# Map the labels (Positive: 2, Neutral: 1, Negative: 0)
label_mapping = {'Positive': 2, 'Neutral': 1, 'Negative': 0}
df['label'] = df['label'].map(label_mapping)

In [None]:
# Split into training and validation sets
train_texts, temp_text, train_labels, temp_labels = train_test_split(
    df['headline'].values, df['label'].values, test_size=0.3, random_state=42
)

val_texts, test_texts, val_labels, test_labels = train_test_split(
    temp_text, temp_labels, test_size=0.3, random_state=42
)

In [None]:
# Run this cell for RobERTa base models
from transformers import RobertaForSequenceClassification, RobertaTokenizer

# Load the pre-trained RoBERTa model for sequence classification
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=3)
# Load the tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

In [None]:
# Rnu this cell for BERT base models
from transformers import BertForSequenceClassification, BertTokenizer

# Load the pre-trained BERT model for sequence classiffication
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)
# Load the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
# Run this cell for PEFT models with lora
from peft import get_peft_model, LoraConfig, TaskType

lora_config = LoraConfig(
    task_type=TaskType.SEQ_CLS,  # Sequence Classification task
    r=8,  # Low-rank adaptation size
    lora_alpha=32,  # Scaling factor
    lora_dropout=0.1,  # Dropout probability for LoRA layers
    target_modules=["query", "value"]  # Apply LoRA to specific BERT layers
)

In [None]:
# Run this cell to wrap the model with the LoRA configuration
# Only for PEFT n-shot prompt models

model = get_peft_model(model, lora_config)

In [None]:
# Run this cell for 6 shot prompting
def six_shot_prompt(text):
  few_shot_prompt = f"""Classify the following news headlines as either Positive (2), Negative (0) or Neutral (1). Provide only the label as your answer. Here are some examples:
  1. {few_shot_pos['headline']}: {few_shot_pos['label']}
  2. {few_shot_neu['headline']}: {few_shot_neu['label']}
  3. {few_shot_neg['headline']}: {few_shot_neg['label']}
  4. {few_shot_pos['headline']}: {few_shot_pos['label']}
  5. {few_shot_neu['headline']}: {few_shot_neu['label']}
  6. {few_shot_neg['headline']}: {few_shot_neg['label']}

  Now, classify the following headline: {text}
  Label:
  """

  return few_shot_prompt

# Tokenizing the training, testing and validation texts
train_encodings = tokenizer([six_shot_prompt(text) for text in train_texts], truncation=True, padding=True, max_length=512)
val_encodings = tokenizer([six_shot_prompt(text) for text in val_texts], truncation=True, padding=True, max_length=512)
test_encodings = tokenizer([six_shot_prompt(text) for text in test_texts], truncation=True, padding=True, max_length=512)

In [None]:
# Run this cell for 3 shot prompting
def three_shot_prompt(text):
  few_shot_prompt = f"""Classify the following news headlines as either Positive (2), Negative (0) or Neutral (1). Provide only the label as your answer. Here are some examples:
  1. {few_shot_pos['headline']}: {few_shot_pos['label']}
  2. {few_shot_neu['headline']}: {few_shot_neu['label']}
  3. {few_shot_neg['headline']}: {few_shot_neg['label']}

  Now, classify the following headline: {text}
  Label:
  """

  return few_shot_prompt

# Tokenizing the training, testing and validation texts
train_encodings = tokenizer([three_shot_prompt(text) for text in train_texts], truncation=True, padding=True, max_length=256)
val_encodings = tokenizer([three_shot_prompt(text) for text in val_texts], truncation=True, padding=True, max_length=256)
test_encodings = tokenizer([three_shot_prompt(text) for text in test_texts], truncation=True, padding=True, max_length=256)

In [None]:
#Run this cell for 1 shot prompting
import random

def one_shot_prompt(text):
  examples = [few_shot_pos, few_shot_neu, few_shot_neg]
  example = random.choice(examples)
one_shot_prompt = f"""Classify the following news headline as either Positive (2), Negative (0) or Neutral (1). Provide only the label as your answer. Here is one example:
  {example['headline']}: {example['label']}

  Now, classify the following headline: {text}
  Label:
  """

  return one_shot_prompt

# Tokenizing the training, testing and validation texts
train_encodings = tokenizer([one_shot_prompt(text) for text in train_texts], truncation=True, padding=True, max_length=128)
val_encodings = tokenizer([one_shot_prompt(text) for text in val_texts], truncation=True, padding=True, max_length=128)
test_encodings = tokenizer([one_shot_prompt(text) for text in test_texts], truncation=True, padding=True, max_length=128)

In [None]:
# Run this to create a class to orgqanize the labels and the trainning texts and transform to tensors
# Run for all models
import torch

class NewsDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [None]:
# Create datasets for training and validation
train_dataset = NewsDataset(train_encodings, train_labels)
val_dataset = NewsDataset(val_encodings, val_labels)
test_dataset = NewsDataset(test_encodings, test_labels)

In [None]:
# Run to import the data Collator
# Only for PEFT models
from transformers import DataCollatorWithPadding
from peft import TaskType

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
# Training loop for FFT models
from transformers import Trainer, TrainingArguments
from transformers import EarlyStoppingCallback

# Define training arguments with early stopping
training_args = TrainingArguments(
    output_dir='C:\\Users\\34618\\OneDrive\\Documentos\\UNI\\LLMs\\',
    num_train_epochs=5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    warmup_steps=500,
    weight_decay=0.01,  # Regularization
    eval_strategy="epoch",  # Evaluate at the end of each epoch
    save_strategy="epoch",  # Save the model at the end of each epoch (to match eval_strategy)
    load_best_model_at_end=True,  # Load the best model found at the end of training
)

# Initialize the Trainer with early stopping
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=1)],  # Stop after 3 epochs of no improvement
)

# Train the model
trainer.train()

# Evaluate the model on the validation data
eval_result = trainer.evaluate()
print(eval_result)

In [None]:
# Training loop for PEFT models
from transformers import Trainer, TrainingArguments

# Define training arguments
training_args = TrainingArguments(
    output_dir='C:\\Users\\34618\\OneDrive\\Documentos\\UNI\\LLMs\\',
    num_train_epochs=5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    warmup_steps=500,
    weight_decay=0.01,
    eval_strategy="epoch",
)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=data_collator,
)

# Train the model
trainer.train()

# Evaluate the model on the validation data
eval_result = trainer.evaluate()
print(eval_result)

In [None]:
# Plot the training and validation curves
def plot_loss_curves(trainer):
    train_loss = trainer.state.log_history
    train_loss_values = [log['loss'] for log in train_loss if 'loss' in log]
    val_loss_values = [log['eval_loss'] for log in train_loss if 'eval_loss' in log]

    # Drop the last value (best loss value is added at the end of teh validation loss array)
    val_loss_values = val_loss_values[:3]

    # Adjust depending of early stopping
    epochs = [1,2,3]

    plt.figure(figsize=(10, 6))
    plt.plot(epochs, train_loss_values[:len(epochs)], label="Training Loss")
    plt.plot(epochs, val_loss_values, label="Validation Loss")

    plt.title("Training vs Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

plot_loss_curves(trainer)

In [None]:
# Print the classification report
import numpy as np
from scipy.special import softmax
from sklearn.metrics import classification_report, classification_report, roc_curve, auc, precision_recall_curve

# Make predictions
predictions = trainer.predict(test_dataset)

# Extract predicted probabilities and convert to labels
pred_probs = softmax(predictions.predictions, axis=1)
pred_labels = pred_probs.argmax(axis=1)

unique_classes = np.unique(test_labels)

report = classification_report(test_labels, pred_labels, target_names=["Negative", "Neutral", "Positive"], labels=unique_classes, output_dict=True)

# Print classification report with specified labels
print("Test Classification Report:")
print(report)

In [None]:
# PLot heatmap with the classification report
import seaborn as sns

# Convert the classification report to a pandas DataFrame
df_report = pd.DataFrame(report).transpose()

# Drop the 'support' column for cleaner visualization
df_report = df_report.drop(columns=['support'])
df_report = df_report[~df_report.index.isin(['accuracy','macro avg', 'weighted avg'])]

# Create a heatmap plot of the classification report
plt.figure(figsize=(10, 6))
sns.heatmap(df_report, annot=True, cmap="Blues", cbar=False, fmt=".2f", linewidths=0.5)

# Add labels and title
plt.title("Classification Report Heatmap", fontsize=16)
plt.ylabel("Classes", fontsize=12)
plt.xlabel("Metrics", fontsize=12)

# Show plot
plt.show()

In [None]:
# Plot ROC curve
def plot_roc_curve(y_true, y_scores, pos_label=2):
    n_classes = 3

    plt.figure(figsize=(10, 8))

    for i in range(n_classes):
        fpr, tpr, _ = roc_curve(y_true, y_scores[:, i], pos_label=i)
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label='Class {} (AUC = {:.2f})'.format(i, roc_auc))

    # Plot diagonal line for random chance
    plt.plot([0, 1], [0, 1], color='red', linestyle='--', label='Random chance')

    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Multi-Class Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc='lower right')
    plt.grid()
    plt.show()

# Call the function for ROC Curve
plot_roc_curve(test_labels, pred_probs)

In [None]:
# Display wring predictions for error analysis
def error_analysis(texts, true_labels, predicted_labels):
    errors = np.where(true_labels != predicted_labels)[0]
    for i in errors:
        print(f'Text: {texts[i]}')
        print(f'True Label: {true_labels[i]}, Predicted Label: {predicted_labels[i]}')
        print('---')

# Call the function for error analysis
error_analysis(test_texts, test_labels, pred_labels)