In [None]:
!pip install torch transformers pandas
!pip install matplotlib

In [None]:
import torch
import pandas as pd
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from transformers import BertTokenizerFast,AutoModel,BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup
import requests
import zipfile
import re
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
import itertools


In [None]:
request = requests.get("https://github.com/MHDBST/PerSenT/archive/refs/heads/main.zip")
with open("data.zip", "wb") as file:
    file.write(request.content)

# Unzip data
with zipfile.ZipFile('data.zip') as zip:
    zip.extractall('data')

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
# This class is responsible for reading the CSV file containing paragraph data,
# tokenizing paragraphs using the provided tokenizer, and returning the tokenized input along with the sentiment label.
class PerSentParagraphDataset(Dataset):
    def __init__(self, path, tokenizer, max_length=150): #256
        # seperate data by columns.
        self.data = pd.read_csv(path, delimiter=',')
        self.data.dropna(subset=['DOCUMENT','TRUE_SENTIMENT'], inplace=False)
        self.tokenizer = tokenizer
        self.max_length = max_length

        # Collect paragraphs and their sentiments
        self.paragraphs = []
        self.sentiments = []
        
        for _, row in self.data.iterrows():
            #devide each paragraph from document.
            paragraphs = row['DOCUMENT'].split('\n')
            for i in range(len(paragraphs)):
                if(i >=16):
                  break
                paragraph_col = f'Paragraph{i}'
                if pd.notna(row[paragraph_col]):
                    #store its paragraph
                    self.paragraphs.append(paragraphs[i])
                    #get each sentiments 
                    self.sentiments.append(row[paragraph_col])
                # if there is one paragraph, then that paragraph is the document sentiment.
                elif (len(paragraphs) == 1): 
                    self.sentiments.append(row['TRUE_SENTIMENT'])  
                    self.paragraphs.append(paragraphs[i])
                    
    def __len__(self):
        return len(self.paragraphs)

    def __getitem__(self, idx):
        paragraph = self.paragraphs[idx]
        true_sentiment = self.sentiments[idx]
        # make paragraph as caculatable form
        inputs = self.tokenizer.encode_plus(
            paragraph,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )
        # laveling
        label_map = {'Negative': 0, 'Neutral': 1, 'Positive': 2}
        label = torch.tensor(label_map[true_sentiment], dtype=torch.long)
        
        return {
            'input_ids': inputs['input_ids'].squeeze(),
            'attention_mask': inputs['attention_mask'].squeeze(),
            'labels': label
        }


In [None]:

def plot_confusion_matrix(cm, target_names):
    plt.imshow(cm, interpolation='nearest', cmap=plt.get_cmap('Blues'))
    plt.colorbar()
    tick_marks = np.arange(len(target_names))
    plt.xticks(tick_marks, target_names, rotation=45)
    plt.yticks(tick_marks, target_names)

    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > cm.max() / 2 else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.show()

In [None]:
# Training and validation function for combined datasets
def fine_tune(model, train_loader, val_loader, num_epochs=4, device=None):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = model.to(device)
    optimizer = AdamW(model.parameters(), lr=2e-5)
    for epoch in range(num_epochs):
        # Training loop
        model.train()
        i = 0
        for batch in train_loader:
            print(i)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            i = i +1

        # Validation loop
        true_labels = []
        pred_labels = []
        model.eval()
        total_correct = 0
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            with torch.no_grad():
                outputs = model(input_ids, attention_mask=attention_mask)
                _, predictions = torch.max(outputs.logits, dim=1) # argMax
                total_correct += torch.sum(predictions == labels).item()
            true_labels.extend(labels.cpu().numpy())
            pred_labels.extend(predictions.cpu().numpy())

        accuracy = total_correct / len(val_loader.dataset)
        report = classification_report(true_labels, pred_labels, target_names=['Negative', 'Neutral', 'Positive'])
        cm = confusion_matrix(true_labels, pred_labels)
        
        print(f'Epoch {epoch + 1}/{num_epochs}, Report: {report}')
        print('Confusion Matrix:')
        plot_confusion_matrix(cm, target_names=['Negative', 'Neutral', 'Positive'])


    return model

In [None]:

# # Load BERT and tokenizer: The pre-trained BERT model and tokenizer are loaded
# # using the 'BertTokenizerFast' classes from the Hugging Face Transformer library
# tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

train_dataset = PerSentParagraphDataset('data/PerSenT-main/train.csv', tokenizer)
val_dataset = PerSentParagraphDataset('data/PerSenT-main/dev.csv', tokenizer)
# # Create data loaders: Train and validation datasets are created using the custom dataset class.
# # Data loaders are created with a specified batch size and shuffling for the traing set.
train_loader = DataLoader(train_dataset, batch_size=350, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=350, shuffle=False)



In [None]:
#initilize model and optimizer: The Bert model for sequence classification is initialized with three output lavels.
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)
fine_tuned_model = fine_tune(model, train_loader, val_loader)

In [None]:
# Save the fine-tuned model
fine_tuned_model.save_pretrained('fine_tuned_bert_sentiment_elm')

# model.save_pretrained('fine_tuned_bert_sentiment')

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('fine_tuned_bert_sentiment_elm')

def predict_paragraph_sentiments(document):
    paragraph_sentiments = []
    paragraphs = document.split('\n')
    true_labels = []
    pred_labels = []
    for paragraph in paragraphs:
        if len(paragraph.strip()) > 0:
            inputs = tokenizer.encode_plus(
                paragraph,
                padding='max_length',
                truncation=True,
                max_length=130,
                return_tensors='pt'
            )

            with torch.no_grad():
                outputs = model(**inputs)
                _, predictions = torch.max(outputs.logits, dim=1)
                sentiment = ['Negative', 'Neutral', 'Positive'][predictions.item()]
                paragraph_sentiments.append(sentiment)

    return paragraph_sentiments

def predict_document_sentiment(document):
    paragraph_sentiments = predict_paragraph_sentiments(document)
    # Positive -> Negative -> Neutral (if they have same number)
    sentiment_count = {'Positive': 0,'Negative': 0, 'Neutral': 0}
    
    for sentiment in paragraph_sentiments:
        sentiment_count[sentiment] += 1
    print(sentiment_count)
    return max(sentiment_count, key=sentiment_count.get)
    

sample_document = """HOUSTON  Pa. (Reuters) - The conservative southwest corner of Pennsylvania  a patchwork of small towns  farms and Pittsburgh suburbs where voters backed Republican Donald Trump by 20 percentage points in 2016  seems an unlikely spot for a possible Democratic renaissance.
In a district where  Trump  remains popular and Democrats have not even fielded a candidate in the previous two congressional elections  Saccone has tied himself closely to the president. The conservative 59-year-old state legislator has joked he ‚Äúwas  Trump  before  Trump  was  Trump .‚Äù
Democrat Lamb  who hails from a prominent Pennsylvania political family  rarely mentions  Trump   focusing on economic issues  healthcare and protecting Social Security and Medicare.
To head off Republican charges he would be a loyal follower of Democratic House of Representatives leader Nancy Pelosi  he said he would not support her for speaker and has promised to work with  Trump  if it would help the district."""

predicted_paragraph_sentiments = predict_paragraph_sentiments(sample_document)
print(f'Predicted paragraph sentiments: {predicted_paragraph_sentiments}')

predicted_document_sentiment = predict_document_sentiment(sample_document)
print(f'Predicted document sentiment: {predicted_document_sentiment}')


In [None]:
  
def evaluate_document(path):
    data = pd.read_csv(path, delimiter=',')
    count = 0
    numRow = 0
    for _, row in data.iterrows():
        if(predict_document_sentiment(row['DOCUMENT']) == row['TRUE_SENTIMENT']):
          count +=1
        numRow += 1
    return count/numRow


In [None]:
test_accuracy = evaluate_document('data/PerSenT-main/fixed_test.csv')
print(f'Test Accuracy: {test_accuracy:.4f}')