In [1]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from tqdm import tqdm
from transformers import Trainer, TrainingArguments

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
df = pd.read_csv('dataset/finance_news.csv')
df.head()

Unnamed: 0,text,label
0,SpiceJet to issue 6.4 crore warrants to promoters,2.0
1,MMTC Q2 net loss at Rs 10.4 crore,2.0
2,"Mid-cap funds can deliver more, stay put: Experts",0.0
3,Mid caps now turn into market darlings,0.0
4,"Market seeing patience, if not conviction: Pra...",2.0


In [3]:
df.columns

Index(['text', 'label'], dtype='object')

In [4]:
df['label'].unique()

array([ 2.,  0.,  1., nan])

In [5]:
df = df.dropna()
df = df.drop_duplicates()

In [6]:
df['label'] = df['label'].astype(int)

In [7]:
X = df['text']
y = df['label']

In [8]:
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

In [9]:
model_name = "ProsusAI/finbert"
tokenizer = AutoTokenizer.from_pretrained(model_name, cache_dir='finbert')
model = AutoModelForSequenceClassification.from_pretrained(model_name, cache_dir='finbert')

In [10]:
batch = tokenizer(list(X_train[:3]), padding=True, truncation=True, max_length=512, return_tensors="pt")
# batch = torch.tensor(batch["input_ids"])
batch

{'input_ids': tensor([[  101,  1038,  1000,  8495,  2758, 24111,  1005,  1055,  4255, 14441,
          6461,  2012,  4238,  1000,   102,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0],
        [  101,  1038,  1005,  8246,  1024,  2034,  2931,  2343,  1997,  6849,
         11509,  1000,  4941,  2000,  2168,  1011,  3348,  3510,  1010, 11324,
          1998,  1996,  2851,  2044, 17357,  1000,  1005,   102],
        [  101,  2031, 12718,  2634,  1053,  2487,  6986,  2039,  2603,  1003,
          2012, 12667,  6584, 21665,   102,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       

In [11]:
with torch.no_grad():
    outputs = model(**batch)
    label_ids = torch.argmax(outputs.logits, dim=1)
    print(label_ids)
    labels = [model.config.id2label[label_id] for label_id in label_ids.tolist()]
    print(labels)

tensor([2, 2, 0])
['neutral', 'neutral', 'positive']


In [12]:
class FinanceData(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [13]:
train_encodings = tokenizer(list(X_train), truncation=True, padding=True)
val_encodings = tokenizer(list(X_val), truncation=True, padding=True)
test_encodings = tokenizer(list(X_test), truncation=True, padding=True)

In [14]:
train_dataset = FinanceData(train_encodings, list(y_train))
val_dataset = FinanceData(val_encodings, list(y_val))
test_dataset = FinanceData (test_encodings, list(y_test))

In [17]:
training_args = TrainingArguments(
    output_dir='./results',              # output directory
    num_train_epochs=1,                  # total number of training epochs
    per_device_train_batch_size=16,      # batch size per device during training
    per_device_eval_batch_size=64,       # batch size for evaluation
    warmup_steps=100,                    # number of warmup steps for learning rate scheduler
    learning_rate=5e-5,                  # learning rate
    weight_decay=0.01,                   # strength of weight decay
    logging_dir='./logs',                # directory for storing logs
    logging_steps=100
)

In [18]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset, eval_dataset=val_dataset
)

In [19]:
trainer.train()

Step,Training Loss
100,1.0712
200,0.7786
300,0.7851
400,0.774
500,0.7606


KeyboardInterrupt: 

_______________

In [None]:
tokenized_train_dataset = tokenizer(list(X_train), padding=True, truncation=True, max_length=512, return_tensors="pt")
tokenized_test_dataset = tokenizer(list(X_train), padding=True, truncation=True, max_length=512, return_tensors="pt")

In [None]:
def tokenize_data(df, tokenizer, max_length=128):
    input_ids = []
    attention_masks = []
    labels = []

    with tqdm(total=df.shape[0]) as pbar:
        for index, row in df.iterrows():
            pbar.update(1)
            text = row["text"]
            label = row["label"]

            encoding = tokenizer(text, padding='max_length', truncation=True, max_length=max_length, return_tensors='pt')
            input_ids.append(encoding['input_ids'])
            attention_masks.append(encoding['attention_mask'])
            labels.append(label)

    input_ids = torch.cat(input_ids, dim=0)
    attention_masks = torch.cat(attention_masks, dim=0)
    labels = torch.tensor(labels)

    dataset = TensorDataset(input_ids, attention_masks, labels)
    return dataset

In [None]:
batch_size = 4
train_dataset = tokenize_data(train_df, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = tokenize_data(val_df, tokenizer)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

test_dataset = tokenize_data(test_df, tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
optimizer = AdamW(model.parameters(), lr=1e-5)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_dataloader) * 5)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
!export TORCH_USE_CUDA_DSA=1
!export CUDA_LAUNCH_BLOCKING=1

In [None]:
num_params = sum(p.numel() for p in model.parameters())
print(f"Number of parameters in the model: {num_params}")

In [None]:
model.to(device)
model.train()
num_epochs = 5
num_epochs = 5  # Adjust the number of epochs as needed
for epoch in range(num_epochs):
    total_loss = 0.0
    for batch in train_dataloader:
        input_ids, attention_mask, labels = batch
        input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        scheduler.step()
        total_loss += loss.item()

    # Print the average loss for this epoch
    avg_loss = total_loss / len(train_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {avg_loss}")

In [None]:
model.eval()
predictions = []
true_labels = []

for batch in test_dataloader:
    input_ids, attention_mask, labels = batch
    input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)

    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits

    predictions.extend(logits.argmax(dim=1).cpu().numpy())
    true_labels.extend(labels.cpu().numpy())

In [None]:
accuracy = accuracy_score(true_labels, predictions)
report = classification_report(true_labels, predictions)

print(f"Accuracy: {accuracy}")
print(report)

In [None]:
import torch
from transformers import BertTokenizer, BertForSequenceClassification

# Load the tokenizer and model
model_name = "ProsusAI/finbert"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name)

# Define the text you want to analyze
text = "The company's revenue has increased significantly, and the stock price is rising."

# Tokenize and prepare the input
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)

# Perform inference
with torch.no_grad():
    outputs = model(**inputs)

# Get the predicted sentiment label and score
logits = outputs.logits
predicted_class = torch.argmax(logits, dim=1).item()
sentiment_score = torch.softmax(logits, dim=1)[0].tolist()

# Map the sentiment label to human-readable form
sentiment_labels = ["Negative", "Neutral", "Positive"]
predicted_sentiment = sentiment_labels[predicted_class]

# Print the results
print(f"Text: {text}")
print(f"Predicted Sentiment: {predicted_sentiment}")
print(f"Sentiment Scores (Negative, Neutral, Positive): {sentiment_score}")


In [None]:
import tensorflow as tf
model = tf.keras.models.load_model('/content/drive/MyDrive/datasets/NLP_mini_project/tf_model.h5')
model.summary()
