In [None]:
!pip install transformers
!pip install datasets

In [None]:
from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModelForSequenceClassification, AutoConfig
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import Trainer, TrainingArguments
from transformers.modeling_outputs import TokenClassifierOutput

from tqdm.auto import tqdm

from datasets import Dataset, load_metric
from datasets import load_metric

import numpy as np
import torch
from torch import nn
import re
import os
import pandas as pd

from google.colab import drive
drive.mount('/content/drive')

In [12]:
experiment = 'bert_classification_short_vocab'
checkpoint = 'bert-base-cased' #"climatebert/distilroberta-base-climate-s" # 

TXT_PATH_TRAIN = '/content/drive/MyDrive/nlp-project/data/sec-filings/txt files 5 key words train/'
TXT_PATH_TEST = '/content/drive/MyDrive/nlp-project/data/sec-filings/txt files 5 key words test/'


SCORES_PATH = '/content/drive/MyDrive/nlp-project/data/esg-scores/Sustainalytics_scores_original.csv'
#SAVE_PATH = '/content/drive/MyDrive/nlp-project/outputs/' + experiment + '.csv'

CHECKPOINTS_PATH =   '/content/drive/MyDrive/nlp-project/checkpoints/'
WEIGHTS_PATH =   '/content/drive/MyDrive/nlp-project/weights/test-' + experiment + '.pt'
 
num_epochs = 5
num_labels = 5 # 1 for regression, n for n-classification

In [None]:
scores = pd.read_csv(SCORES_PATH)
scores['Text'] = np.nan
scores['Dataset'] = np.nan

def clean_text(text):
    text = text.replace('\n', ' ').replace('\t', '').replace(',', '').replace(';', '')
    #regex = '\xc2\xb7'
    text = re.sub('[^a-zA-Z0-9 \.]', ' ', text) # remove any characters other than letters, numbers, spaces, periods
    text = re.sub(' +', ' ', text) # remove repeated spaces
    text = text.strip()

    return text

for f in os.listdir(TXT_PATH_TRAIN):
  try:
    row = scores.index[scores['Ticker'] == f.strip('.txt')][0]
    
    # read text files and remove any newline, tab, comma characters
    with open(TXT_PATH_TRAIN + f) as txt_file:
      text = txt_file.read()

    text = clean_text(text)
    scores.loc[row, 'Text'] = text
    scores.loc[row, 'Dataset'] = 'train'

  except IndexError as e:
    print('File ', f, ' not present')

for f in os.listdir(TXT_PATH_TEST):
  try:
    row = scores.index[scores['Ticker'] == f.strip('.txt')][0]
    # read text files and remove any newline, tab, comma characters
    with open(TXT_PATH_TEST + f) as txt_file:
      text = txt_file.read()

    text = clean_text(text)
    scores.loc[row, 'Text'] = text
    scores.loc[row, 'Dataset'] = 'test'
  except IndexError as e:
    print('File ', f, ' not present')
    
scores = scores.dropna()
scores = scores.reset_index(drop=True)

In [5]:
# split the data for long text 
def longtext_split(text):
  total = []

  if len(text.split()) // 150 > 0:
    n = len(text.split()) // 150
  else:
    n = 1

  for w in range(n):
    if w == 0:
      partial = text.split()[:200]
    else:
      partial = text.split()[w*150:w*150+200]
    total.append(' '.join(partial))

  return total

scores['text_split'] = scores['Text'].apply(longtext_split)
scores = scores.explode('text_split')
      

In [None]:
scores.head()

In [None]:
#tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
#model = BertForSequenceClassification.from_pretrained("bert-base-uncased", problem_type="single_label_classification", num_labels=num_labels)

tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model =  AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=num_labels)

In [9]:
train_df = scores.loc[scores['Dataset'] == 'train']
test_df = scores.loc[scores['Dataset'] == 'test']
del scores

In [None]:
def transform_labels(label):
  label = label['Sustainalytics Class']
  
  if label == 'Negligible':
    num = 0
  elif label == 'Low':
    num = 1
  elif label == 'Medium':
    num = 2
  elif label == 'High':
    num = 3
  elif label == 'Severe':
    num = 4

  return {'labels': num}


def encode(example):
    return tokenizer(example['text_split'], example['Industry'], truncation=True, padding='max_length') 

train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

# encode data
train_dataset = train_dataset.map(encode, batched=True) 
test_dataset = test_dataset.map(encode, batched=True)

# transform labels to integers
remove_columns = ['Sustainalytics Score', 'Company Name', 'Ticker', 'Sustainalytics Class', 'Text', '__index_level_0__', 'text_split', 'Industry', 'Dataset']
train_dataset = train_dataset.map(transform_labels, remove_columns=remove_columns) # change target column name to labels
test_dataset = test_dataset.map(transform_labels, remove_columns=remove_columns) # change target column name to labels

train_dataset = train_dataset.shuffle(seed=10)
test_dataset = test_dataset.shuffle(seed=10)

In [None]:
training_args = TrainingArguments(output_dir=CHECKPOINTS_PATH, num_train_epochs=num_epochs)
trainer = Trainer(model = model, args=training_args, train_dataset=train_dataset, eval_dataset=test_dataset)
trainer.train()
torch.save(model.state_dict(), WEIGHTS_PATH)

In [None]:
outputs = trainer.predict(test_dataset)

In [None]:
np.argmax(outputs.predictions, axis=-1)

In [None]:
metric = load_metric('accuracy')
def compute_metrics(eval_pred):
  logits, labels, _ = eval_pred
  predictions = np.argmax(logits, axis=-1)

  return metric.compute(predictions=predictions, references=labels)

print(compute_metrics(outputs))

In [None]:
metric = load_metric('f1')
def compute_metrics(eval_pred):
  logits, labels, _ = eval_pred
  predictions = np.argmax(logits, axis=-1)

  return metric.compute(predictions=predictions, references=labels, average=None)

print(compute_metrics(outputs))