In [None]:
import pandas as pd
import torch
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from transformers import GPT2ForSequenceClassification, GPT2Tokenizer, AdamW, GPT2Config
from tqdm import tqdm
import torch.nn as nn
import numpy as np

In [None]:
path='/content/drive/MyDrive/Market-Sentiment-Analysis/Stock-Market-News-Dataset.csv'

In [None]:
df=pd.read_csv(path)

In [None]:
df.head()

In [None]:
def preprocess_data(df):
  df['Sentence'] = df['Sentence'].str.lower() # convert all text to lower
  df['Sentence'] = df['Sentence'].str.replace(r"https?://\S+|www\.\S+"," ",regex = True) # remove all URLs
  df['Sentence'] = df['Sentence'].str.replace(r"#[A-Za-z0-9_]+"," ", regex = True) #remove all hashtags
  df['Sentence'] = df['Sentence'].str.replace(r"@","at", regex = True) #replacing @ with at
  df['Sentence'] = df['Sentence'].str.replace(r"[^A-Za-z(),!?@\'\"_\n]"," ", regex = True)
  return df

In [None]:
data=preprocess_data(df)

In [None]:
data.head()

In [None]:
label_encoder = LabelEncoder()
data['Sentiment'] = label_encoder.fit_transform(data['Sentiment'])

In [None]:
# Check the mapping between classes and numerical labels
class_mapping = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))

print("Class Mapping:", class_mapping)

In [None]:
from sklearn.model_selection import train_test_split

train_size = 0.7
val_size = 0.1
test_size = 0.2

documents = data['Sentence']
labels = data['Sentiment']

X_train, X_Rem, y_train, y_Rem = train_test_split(documents, labels, test_size= 1 - train_size, random_state=45, stratify = labels)

# Split the remaining set into validation and test sets
X_val, X_test, y_val, y_test = train_test_split(X_Rem, y_Rem, test_size=test_size/(val_size + test_size), random_state=45, stratify = y_Rem)

print("X_train size: ", X_train.size)
print("y_train size: ", y_train.size)
print("X_val size: ", X_val.size)
print("y_val size: ", y_val.size)
print("X_test size: ", X_test.size)
print("y_test size: ", y_test.size)

In [None]:
class MarketSentimentDataset(Dataset):
    def __init__(self, documents, labels, tokenizer, max_len):
        self.documents = documents
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self) -> int:
        return len(self.labels)

    def __getitem__(self, idx: int) :
        text = self.documents.iloc[idx]
        label = torch.tensor(self.labels.iloc[idx], dtype=torch.long)

        # Tokenize the text
        inputs = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        # Squeeze to remove the extra dimension added by return_tensors='pt'
        input_ids = inputs['input_ids'].squeeze()
        attention_mask = inputs['attention_mask'].squeeze()

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'label': label
        }

In [None]:
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score
from tqdm import tqdm
import numpy as np

MAX_LEN = 256
model_name = 'gpt2'
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id

# Create datasets
train_dataset = MarketSentimentDataset(X_train, y_train, tokenizer, MAX_LEN)
val_dataset = MarketSentimentDataset(X_val, y_val, tokenizer, MAX_LEN)
test_dataset = MarketSentimentDataset(X_test, y_test, tokenizer, MAX_LEN)

# Create data loaders
batch_size = 1
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class GPT2ForSequenceClassificationWithDropout(nn.Module):
    def __init__(self, model_name: str, num_labels: int, dropout_prob: float):
        super(GPT2ForSequenceClassificationWithDropout, self).__init__()
        self.gpt2 = GPT2ForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = self.gpt2(input_ids, attention_mask=attention_mask)
        logits = self.dropout(outputs.logits)
        return logits

dropout_prob = 0.3
model = GPT2ForSequenceClassificationWithDropout(model_name, num_labels=3, dropout_prob=dropout_prob).to(device)

# Initialize optimizer and scheduler
optimizer = AdamW(model.parameters(), lr=5e-5, weight_decay=0.01)
total_steps = len(train_loader) * 3   # Assuming 2 epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# Training loop with early stopping
num_epochs = 3
patience = 2  # Number of epochs to wait for improvement
best_val_loss = float('inf')
early_stopping_counter = 0

loss_fn = nn.CrossEntropyLoss()

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for batch in tqdm(train_loader, desc=f"Training Epoch {epoch + 1}/{num_epochs}"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        logits = model(input_ids, attention_mask=attention_mask)
        loss = loss_fn(logits, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)
    print(f"Training Loss: {avg_train_loss:.4f}")

    # Validation loop
    model.eval()
    val_loss = 0
    val_labels = []
    val_preds = []
    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Evaluating Validation Accuracy:"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            logits = model(input_ids, attention_mask=attention_mask)
            loss = loss_fn(logits, labels)
            val_loss += loss.item()

            _, predicted_labels = torch.max(logits, 1)
            val_labels.extend(labels.cpu().numpy())
            val_preds.extend(predicted_labels.cpu().numpy())

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = accuracy_score(val_labels, val_preds)
    print(f'Validation Loss: {avg_val_loss:.4f}')
    print(f'Validation Accuracy: {val_accuracy:.4f}')

    # Early stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        early_stopping_counter = 0
        torch.save(model.state_dict(), 'best_model.pt')  # Save the best model
    else:
        early_stopping_counter += 1

    if early_stopping_counter >= patience:
        print("Early stopping triggered.")
        break

# Load the best model
model.load_state_dict(torch.load('best_model.pt'))

In [None]:
model.eval()

# Initialize lists to store true labels and predictions
test_labels = []
test_preds = []

# Disable gradient calculation for inference
with torch.no_grad():
    for batch in tqdm(test_loader, desc="Predicting:"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        # Perform the forward pass to get logits
        logits = model(input_ids, attention_mask=attention_mask)

        # Get the predicted labels
        _, predicted_labels = torch.max(logits, 1)

        # Store the true labels and predictions
        test_labels.extend(labels.cpu().numpy())
        test_preds.extend(predicted_labels.cpu().numpy())

# Calculate accuracy or other metrics if needed
test_accuracy = accuracy_score(test_labels, test_preds)
print(f'Test Accuracy: {test_accuracy:.4f}')

# If you want to see the predictions
# print("Predictions:", test_preds)

In [None]:
def predict_single_text(model, tokenizer, text, max_len, device):
    model.eval()
    inputs = tokenizer.encode_plus(
        text,
        None,
        add_special_tokens=True,
        max_length=max_len,
        pad_to_max_length=True,
        return_token_type_ids=True,
        truncation=True
    )
    input_ids = torch.tensor(inputs['input_ids']).unsqueeze(0).to(device)
    attention_mask = torch.tensor(inputs['attention_mask']).unsqueeze(0).to(device)

    with torch.no_grad():
        logits = model(input_ids, attention_mask=attention_mask)
        _, predicted_label = torch.max(logits, 1)

    return predicted_label.item()

# Example usage
text = "XYZ company loses a copyyright lawsuit to pay a fine of $1 million"
predicted_label = predict_single_text(model, tokenizer, text, MAX_LEN, device)
print(f"Predicted Label: {predicted_label}")