# Cheap Talk and Cherry-Picking: What ClimateBert has to say on Corporate Climate Risk Disclosures

### Julia Anna Bingler, Mathias Kraus, Markus Leippold, Nicolas Webersinke

### DOI:

This notebook provides an example of the model training from our paper. It can be used as a reference for further analysis.

#### Import relevant libraries

In [None]:
import pandas as pd
import numpy as np
import os
import shutil
import json
import csv

from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.linear_model import LogisticRegression
import statsmodels.api as sm

import torch
import torch.nn as nn
import torch.nn.functional as F
import gc

from transformers import EarlyStoppingCallback
from transformers import RobertaTokenizerFast
from transformers import RobertaForSequenceClassification, Trainer, TrainingArguments
from transformers.trainer_utils import set_seed

#### Define labels and set seeds

In [None]:
num_classes = 5

label_to_id = {
    'Governance': 0,
    'Metrics and Targets': 1,
    'Risk Management': 2,
    'Strategy': 3,
    'None': 4
}

id_to_label = {
    0: 'Governance',
    1: 'Metrics and Targets',
    2: 'Risk Management',
    3: 'Strategy',
    4: 'None'
}

np.random.seed(0)
set_seed(0)

#### Define a custom pytorch dataset class

In [None]:
class TCFDDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

#### Load training data

In [None]:
texts = []
labels = []
companies = []

with open("training_data.json", "r") as file:
    training_data = json.load(file)

    for sample in training_data:
        texts.append(sample["text"])
        labels.append(label_to_id[sample["label"]])
        companies.append(sample["company"])

assert len(texts) == len(labels) == len(companies)

texts = np.array(texts)
labels = np.array(labels)
companies = np.array(companies)

#### Training loop

This cell includes the actual training loop. Adjust paths and parameters for your needs.

In [None]:
comp_list = np.unique(companies)
np.random.shuffle(comp_list)

for comp_i in np.arange(len(comp_list))[::10]:
    comp_l = comp_list[comp_i:comp_i+10]

    print(comp_l)

    #shutil.rmtree('results', ignore_errors=True)

    comp_texts = texts[np.isin(companies, comp_l)]
    comp_labels = labels[np.isin(companies, comp_l)]
    comp_comps = companies[np.isin(companies, comp_l)]

    non_comp_texts = texts[~np.isin(companies, comp_l)]
    non_comp_labels = labels[~np.isin(companies, comp_l)]

    train_texts, val_texts, train_labels, val_labels = train_test_split(non_comp_texts, non_comp_labels, test_size=.2, stratify=non_comp_labels, random_state=0)

    print('Train samples: {}'.format(len(train_texts)))
    print('Validation samples: {}'.format(len(val_texts)))
    print('Test samples: {}'.format(len(comp_texts)))

    tokenizer = RobertaTokenizerFast.from_pretrained('distilroberta-base')

    train_encodings = tokenizer(list(train_texts), truncation=True, padding=True)
    val_encodings = tokenizer(list(val_texts), truncation=True, padding=True)

    train_dataset = TCFDDataset(train_encodings, train_labels)
    val_dataset = TCFDDataset(val_encodings, val_labels)

    training_args = TrainingArguments(
       output_dir='./results',          # output directory
       overwrite_output_dir=True,
       num_train_epochs=10,             # total number of training epochs
       per_device_train_batch_size=24,  # batch size per device during training
       per_device_eval_batch_size=24,   # batch size for evaluation
       warmup_steps=500,                # number of warmup steps for learning rate scheduler
       weight_decay=0.01,               # strength of weight decay
       logging_dir='./logs',            # directory for storing logs
       logging_steps=10,
       fp16=True,                       # enable mixed precision training if supported by GPU
       gradient_accumulation_steps=4,
       load_best_model_at_end=True,
       evaluation_strategy='epoch',
       save_strategy='epoch'
    )

    model = RobertaForSequenceClassification.from_pretrained("distilroberta-base",
                                                            num_labels=num_classes)

    early_stop = EarlyStoppingCallback(2)

    trainer = Trainer(
       model=model,
       args=training_args,
       train_dataset=train_dataset,
       eval_dataset=val_dataset,
       callbacks=[early_stop]
    )

    trainer.train()
    
    for comp in np.unique(comp_comps):
        for label in np.unique(labels[companies == comp]):

            comp_label_text = comp_texts[(comp_comps == comp) & (comp_labels == label)]
            comp_label_labels = comp_labels[(comp_comps == comp) & (comp_labels == label)]
            test_encodings = tokenizer(list(comp_label_text), truncation=True, padding=True)
            test_dataset = TCFDDataset(test_encodings, comp_label_labels)


            x = trainer.predict(test_dataset)[0]
        
            with open('output.txt', 'a', encoding='utf-8') as fd:
                for i, sent in enumerate(comp_label_text):
                    fd.write('{}\t{}\t{}\t{}\t{}\t{}\t{}\n'.format(comp, label, x[i,0], x[i,1], x[i,2], x[i,3], x[i, 4]))  

#### Logistic Regression

In [None]:
df = pd.read_csv('output.txt', sep='\t', header=None, names=['comp', 'label', 'pred_0',
                                                          'pred_1', 'pred_2', 'pred_3',
                                                          'pred_4'])

# df = df[df[['pred_0', 'pred_1', 'pred_2', 'pred_3', 'pred_4']].max(axis=1) > 1.]

df['pred_class'] = np.argmax(df[['pred_0', 'pred_1', 'pred_2', 'pred_3', 'pred_4']].values, axis=1)

X = []
y = []

for comp in df.comp.unique():
    df_tmp = df[df.comp == comp]
    for lab in [0, 1, 2, 3, 4]:
        if len(df_tmp[df_tmp.label == lab]) == 0:
            continue
        df_comp_tmp = df_tmp[df_tmp.label == lab]
        x = [np.sum(df_comp_tmp['pred_class'] == 0) / len(df_comp_tmp['pred_class']),
              np.sum(df_comp_tmp['pred_class'] == 1) / len(df_comp_tmp['pred_class']),
              np.sum(df_comp_tmp['pred_class'] == 2) / len(df_comp_tmp['pred_class']),
              np.sum(df_comp_tmp['pred_class'] == 3) / len(df_comp_tmp['pred_class'])]#,
              #np.sum(df_comp_tmp['pred_class'] == 4) / len(df_comp_tmp['pred_class'])]
        X.append(x)
        y.append(lab)
        
clf = LogisticRegression(penalty='none')
clf.fit(X, y)

print(clf.coef_)
pred = clf.predict(X)

cm = confusion_matrix(y, pred)
print(cm)

for y_select in np.unique(y):
    print(cm[y_select, y_select] / np.sum(cm[:,y_select]))