In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

Code mixed from https://towardsdatascience.com/text-classification-with-bert-in-pytorch-887965e5820f and https://www.tensorflow.org/text/tutorials/classify_text_with_bert#define_your_model

# Install Dependencies

In [61]:
#!pip install transformers

from transformers import BertTokenizer
import torch
from torch import nn
from transformers import BertModel

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

In [62]:
np.random.seed(123)

In [63]:
# Turn texts to strings of tokens
tokenizer = BertTokenizer.from_pretrained('bert-large-cased')

# Build A Dataset: OxML Data and Environmental Data

We can play around with encoding the whole text or just paragraphs at a time (aggregating/maxxing out over the per-paragraph predictions). The code for getting the env_lis_2 data is in the Sweep version of this notebook.

In [89]:
# Get OxML texts
my_file = open('oxml_esg_texts.txt', "r")
data = my_file.read()
texts = data.split("unique_linebreak \n")
my_file.close()

# Get ESG labels
df = pd.read_csv('oxml2023mlcases-esg-classifier/data/labels.csv')

labels = {
    'other': 0,
    'environmental': 1,
    'social': 2,
    'governance': 3
}

# Add text column to label dataframe
df['text'] = texts[:-1]

# Get extra environmental texts
#my_file = open('/content/env_lis-2.txt', "r")
my_file = open('gov_training.txt', "r")
data = my_file.read()

env_texts = data.split("unique_linebreak \n")
my_file.close()

# Create environmental labels for extra texts
env_labels = ['governance'] * len(env_texts)
env_labels_df = pd.DataFrame(env_labels)
env_labels_df.columns = ['class']
env_texts_df = pd.DataFrame(env_texts)
env_texts_df.columns = ['text']

df_training = env_labels_df.join(env_texts_df)
df_training['id'] = df_training.index
#rearange columns
df_training = df_training[['id', 'class', 'text']]
df = pd.concat([df, df_training], axis=0)

# # Add extra environmental texts to working dataframe
# for i, j in zip(env_texts, env_labels):
#     df = pd.concat({'text': i, 'class': j}, ignore_index=True)

In [90]:
print("The dataframe has {} columns".format(len(df)))

The dataframe has 1979 columns


In [91]:
# could change max length back to 512

class Dataset(torch.utils.data.Dataset):

    def __init__(self, df):

        self.labels = [labels[l] for l in df['class']]
        self.texts = [tokenizer(text,
                               padding='max_length', max_length = 64, truncation=True,
                                return_tensors="pt") for text in df['text']]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        # Fetch a batch of inputs
        return self.texts[idx]

    def __getitem__(self, idx):

        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)

        return batch_texts, batch_y # -> a batch of tokenized texts and the corresponding labels

In [92]:
#Split the data into train and test
df_train, df_val = train_test_split(df, test_size=0.2, shuffle=True)

print(len(df_train), len(df_val))

1583 396


# Build A BERT Classification Model
This is just layering a linear classifier on top of BERT, so we can grab its embedded class token and pass that through the classifier.

In [93]:
# Original classifier idea
class BertClassifier(nn.Module):
  def __init__(self, dropout=0.5):

    super(BertClassifier, self).__init__()

    self.bert = BertModel.from_pretrained('bert-base-cased')
    self.dropout = nn.Dropout(dropout)
    self.linear = nn.Linear(768, 4) # -> input is a pooled 768-dim class embedding vector from transformer, and output is 4 classes
    self.relu = nn.ReLU()

  def forward(self, input_id, mask):
    # _ contains embedding vectors for all tokens in a sequence, and out contains the embedding vector of the class token for that sequence
    _, out = self.bert(input_ids=input_id, attention_mask=mask, return_dict=False)

    # the embedding vector for the class token gets passed through these layers for classification
    out = self.dropout(out)
    out = self.linear(out)
    out = self.relu(out)

    return out

# Train the Classification Model

In [94]:
from torch.optim import Adam
#from transformers import AdamW # -> better for generalization than standard Adam
from tqdm import tqdm

In [95]:
def train(model, train_data, val_data, learning_rate, epochs):

  # set up datasets
  train, val = Dataset(train_data), Dataset(val_data)

  # load the datasets
  train_dataloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)
  val_dataloader = torch.utils.data.DataLoader(val, batch_size=batch_size)

  # try for a GPU
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")

  # define loss and optimizer
  loss = nn.CrossEntropyLoss()
  optimizer = Adam(model.parameters(), lr=learning_rate)

  if use_cuda:
    model = model.cuda()
    loss = loss.cuda()

  for epoch in range(epochs):

    train_acc = 0
    train_loss = 0

    for train_input, train_label in tqdm(train_dataloader):

      # pass this stuff to the GPU
      train_label = train_label.to(device)
      mask = train_input['attention_mask'].to(device)
      input_id = train_input['input_ids'].squeeze(1).to(device)

      # feed data to model
      output = model(input_id, mask)

      # calculate loss
      batch_loss = loss(output, train_label.long())
      train_loss += batch_loss.item()

      # calculate accuracy -> likeliest label correct?
      acc = (output.argmax(dim=1) == train_label).sum().item()
      train_acc += acc

      model.zero_grad()
      batch_loss.backward()
      optimizer.step()

    val_acc = 0
    val_loss = 0

    # proper backprop for validation mode
    with torch.no_grad():

      for val_input, val_label in val_dataloader:

        val_label = val_label.to(device)
        mask = val_input['attention_mask'].to(device)
        input_id = val_input['input_ids'].squeeze(1).to(device)

        output = model(input_id, mask)

        batch_loss = loss(output, val_label.long())
        val_loss += batch_loss.item()

        acc = (output.argmax(dim=1) == val_label).sum().item()
        val_acc += acc

    print(
    f'Epochs: {epoch + 1} | Train Loss: {train_loss / len(train_data): .3f} \
    | Train Accuracy: {train_acc / len(train_data): .3f} \
    | Val Loss: {val_loss / len(val_data): .3f} \
    | Val Accuracy: {val_acc / len(val_data): .3f}')

In [None]:
model = BertClassifier()

epochs = 10
batch_size = 32
learning_rate = 2e-5

train(model, df_train, df_val, learning_rate, epochs)

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/436M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
 64%|██████████████████████████████                 | 32/50 [07:01<04:34, 15.27s/it]

# Evaluate on Test Data

In [None]:
def evaluate(model, test_data):

  test = Dataset(test_data)

  test_dataloader = torch.utils.data.DataLoader(test, batch_size=batch_size)

  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")

  if use_cuda:
    model = model.cuda()

  test_acc = 0

  with torch.no_grad():

    for test_input, test_label in test_dataloader:
      test_label = test_label.to(device)
      mask = test_input['attention_mask'].to(device)
      input_id = test_input['input_ids'].squeeze(1).to(device)

      output = model(input_id, mask)

      acc = (output.argmax(dim=1) == test_label).sum().item()
      total_acc_test += acc

  print(f'Test Accuracy: {test_acc / len(test_data): .3f}')

In [None]:
# Run eval function
evaluate(model, df_test)