In [28]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import nltk
import torch
import re
import keras
from collections import Counter
from datetime import datetime
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer
from sklearn.metrics import classification_report, accuracy_score
import torch



In [29]:
df = pd.read_csv('/content/news_data_labelled.csv', encoding='latin-1')
df['combined_text'] = df['Title'] + " " + df['Text']
sentiment_mapping = {'positive': 1, 'negative': 0, 'neutral': 2}
df['Sentiment'] = df['Sentiment'].map(sentiment_mapping)

In [30]:
# Split data into training and testing wuth a 70,15,15 split

train_texts, temp_texts, train_labels, temp_labels = train_test_split(df['combined_text'], df['Sentiment'],
                                                                      random_state=2018,
                                                                      test_size=0.3,
                                                                      stratify=df['Sentiment'])

val_texts, test_texts, val_labels, test_labels = train_test_split(temp_texts, temp_labels,
                                                                  random_state=2018,
                                                                  test_size=0.5,
                                                                  stratify=temp_labels)



In [31]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

train_encodings = tokenizer(train_texts.tolist(), truncation=True, padding='max_length', max_length=512)
val_encodings = tokenizer(val_texts.tolist(), truncation=True, padding='max_length', max_length=512)
test_encodings = tokenizer(test_texts.tolist(), truncation=True, padding='max_length', max_length=512)

In [32]:
df['Sentiment'] = df['Sentiment'].apply(lambda x: 1 if x == 'positive' else (-1 if x == 'negative' else 0))

In [33]:
# Function to create tensor datasets
def create_dataset(encodings, labels):
    return TensorDataset(torch.tensor(encodings['input_ids']),
                         torch.tensor(encodings['attention_mask']),
                         torch.tensor(labels.tolist()))

# Create datasets
train_dataset = create_dataset(train_encodings, train_labels)
val_dataset = create_dataset(val_encodings, val_labels)
test_dataset = create_dataset(test_encodings, test_labels)

In [34]:
batch_size = 16

# Create DataLoaders
train_loader = DataLoader(train_dataset, sampler=RandomSampler(train_dataset), batch_size=batch_size)
val_loader = DataLoader(val_dataset, sampler=SequentialSampler(val_dataset), batch_size=batch_size)
test_loader = DataLoader(test_dataset, sampler=SequentialSampler(test_dataset), batch_size=batch_size)

BERT Model

In [35]:
from transformers import BertForSequenceClassification, AdamW
from transformers import AutoModelForSequenceClassification

# Load the FinBERT model
model = AutoModelForSequenceClassification.from_pretrained('yiyanghkust/finbert-tone', num_labels=3)
model.to(device)

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30873, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12,

In [36]:
# Define the optimizer
optimizer = AdamW(model.parameters(), lr=2e-5)



In [37]:
from tqdm import tqdm  # for progress bar

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Training loop
epochs = 7

for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    model.train()
    total_loss = 0
    for i, batch in enumerate(tqdm(train_loader)):
        print("Batch: ", i)
        # Move batch to GPU
        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch

        # Clear previously calculated gradients
        model.zero_grad()

        # Perform a forward pass
        outputs = model(b_input_ids, attention_mask=b_input_mask, labels=b_labels)
        loss = outputs.loss
        total_loss += loss.item()

        # Perform a backward pass to calculate gradients
        loss.backward()

        # Update parameters and take a step using the computed gradient
        optimizer.step()

    # Calculate the average loss over the training data
    avg_train_loss = total_loss / len(train_loader)
    print(f"Average train loss: {avg_train_loss}")


Epoch 1/7


  0%|          | 0/5 [00:00<?, ?it/s]

Batch:  0


 20%|██        | 1/5 [00:00<00:01,  2.12it/s]

Batch:  1


 40%|████      | 2/5 [00:00<00:01,  2.19it/s]

Batch:  2


 60%|██████    | 3/5 [00:01<00:00,  2.21it/s]

Batch:  3


100%|██████████| 5/5 [00:01<00:00,  2.51it/s]


Batch:  4
Average train loss: 2.540662336349487
Epoch 2/7


  0%|          | 0/5 [00:00<?, ?it/s]

Batch:  0


 20%|██        | 1/5 [00:00<00:01,  2.26it/s]

Batch:  1


 40%|████      | 2/5 [00:00<00:01,  2.25it/s]

Batch:  2


 60%|██████    | 3/5 [00:01<00:00,  2.24it/s]

Batch:  3


100%|██████████| 5/5 [00:01<00:00,  2.55it/s]


Batch:  4
Average train loss: 0.9782084584236145
Epoch 3/7


  0%|          | 0/5 [00:00<?, ?it/s]

Batch:  0


 20%|██        | 1/5 [00:00<00:01,  2.24it/s]

Batch:  1


 40%|████      | 2/5 [00:00<00:01,  2.24it/s]

Batch:  2


 60%|██████    | 3/5 [00:01<00:00,  2.24it/s]

Batch:  3


100%|██████████| 5/5 [00:01<00:00,  2.55it/s]


Batch:  4
Average train loss: 0.6070441961288452
Epoch 4/7


  0%|          | 0/5 [00:00<?, ?it/s]

Batch:  0


 20%|██        | 1/5 [00:00<00:01,  2.24it/s]

Batch:  1


 40%|████      | 2/5 [00:00<00:01,  2.24it/s]

Batch:  2


 60%|██████    | 3/5 [00:01<00:00,  2.24it/s]

Batch:  3


100%|██████████| 5/5 [00:01<00:00,  2.54it/s]


Batch:  4
Average train loss: 0.6214935898780822
Epoch 5/7


  0%|          | 0/5 [00:00<?, ?it/s]

Batch:  0


 20%|██        | 1/5 [00:00<00:01,  2.23it/s]

Batch:  1


 40%|████      | 2/5 [00:00<00:01,  2.24it/s]

Batch:  2


 60%|██████    | 3/5 [00:01<00:00,  2.23it/s]

Batch:  3


100%|██████████| 5/5 [00:01<00:00,  2.54it/s]


Batch:  4
Average train loss: 0.4452159345149994
Epoch 6/7


  0%|          | 0/5 [00:00<?, ?it/s]

Batch:  0


 20%|██        | 1/5 [00:00<00:01,  2.23it/s]

Batch:  1


 40%|████      | 2/5 [00:00<00:01,  2.24it/s]

Batch:  2


 60%|██████    | 3/5 [00:01<00:00,  2.24it/s]

Batch:  3


100%|██████████| 5/5 [00:01<00:00,  2.54it/s]


Batch:  4
Average train loss: 0.307234388589859
Epoch 7/7


  0%|          | 0/5 [00:00<?, ?it/s]

Batch:  0


 20%|██        | 1/5 [00:00<00:01,  2.24it/s]

Batch:  1


 40%|████      | 2/5 [00:00<00:01,  2.24it/s]

Batch:  2


 60%|██████    | 3/5 [00:01<00:00,  2.24it/s]

Batch:  3


100%|██████████| 5/5 [00:01<00:00,  2.55it/s]

Batch:  4
Average train loss: 0.3188169836997986





Evaluate Model

In [38]:
# Evaluation
model.eval()

# Tracking variables
predictions, true_labels = [], []

# Predict
for i, batch in enumerate(test_loader):

    # Move batch to GPU
    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
        # Forward pass, calculate logit predictions
        outputs = model(b_input_ids, attention_mask=b_input_mask)

    logits = outputs.logits
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    # Store predictions and true labels
    predictions.append(logits)
    true_labels.append(label_ids)

In [39]:
# Flatten the predictions and true labels
flat_predictions = np.concatenate(predictions, axis=0)
flat_true_labels = np.concatenate(true_labels, axis=0)

# Convert the predictions to the same format as labels
flat_predictions = np.argmax(flat_predictions, axis=1).flatten()

# Calculate accuracy, precision, recall, and F1 score
report = classification_report(flat_true_labels, flat_predictions, target_names=['Negative', 'Positive', 'Neutral'])

# Calculate the overall accuracy of the model
overall_accuracy = accuracy_score(flat_true_labels, flat_predictions)

print("Overall Accuracy:", overall_accuracy)
print(report)

Overall Accuracy: 0.7333333333333333
              precision    recall  f1-score   support

    Negative       0.75      0.50      0.60         6
    Positive       0.67      0.86      0.75         7
     Neutral       1.00      1.00      1.00         2

    accuracy                           0.73        15
   macro avg       0.81      0.79      0.78        15
weighted avg       0.74      0.73      0.72        15



In [None]:
# Convert numeric prediction to label
label_mapping = {0: 'Negative', 1: 'Positive', 2: 'Neutral'}

def preprocess_for_finbert(text):
    # Load the same tokenizer used during training
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

    # Encode the text
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding='max_length', max_length=512)

    return inputs


def predict_sentiment(text, model):
    # Preprocess the text
    inputs = preprocess_for_finbert(text)

    # Move inputs to the same device as the model
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Put model in evaluation mode
    model.eval()

    # Predict
    with torch.no_grad():
        outputs = model(**inputs)

    # Get the prediction
    prediction = torch.argmax(outputs.logits, dim=1)

    return label_mapping[prediction.item()]