### Pretrained FinBERT model on earnings call transcripts (prepared remarks ensemble)

Load data:

In [None]:
from google.colab import files
upload = files.upload()

Saving accolade-inc-accd-q3-2021-earnings-call-transcript.json to accolade-inc-accd-q3-2021-earnings-call-transcript.json
Saving acuity-brands-inc-ayi-q1-2021-earnings-call-transc.json to acuity-brands-inc-ayi-q1-2021-earnings-call-transc.json
Saving albertsons-companies-inc-aci-q3-2020-earnings-call.json to albertsons-companies-inc-aci-q3-2020-earnings-call.json
Saving angiodynamics-inc-ango-q2-2021-earnings-call-trans.json to angiodynamics-inc-ango-q2-2021-earnings-call-trans.json
Saving aphria-inc-apha-q2-2021-earnings-call-transcript.json to aphria-inc-apha-q2-2021-earnings-call-transcript.json
Saving audiovox-voxx-q3-2021-earnings-call-transcript.json to audiovox-voxx-q3-2021-earnings-call-transcript.json
Saving azz-inc-azz-q3-2021-earnings-call-transcript.json to azz-inc-azz-q3-2021-earnings-call-transcript.json
Saving bed-bath-beyond-bbby-q3-2020-earnings-call-transcr.json to bed-bath-beyond-bbby-q3-2020-earnings-call-transcr.json
Saving blackrock-blk-q4-2020-earnings-call-trans

Imports:

In [None]:
import os
import json
import pandas
import numpy as np
from collections import Counter

import nltk.data
from transformers import BertTokenizer, BertForSequenceClassification
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score

Preprocess data:

In [None]:
directory = '/content'
transcript_data = []
input_data = []
test_data = []
input_data_labels = []
test_data_labels = []
label_map = {0:'neutral', 1:'positive', 2:'negative'}

nltk.download('punkt')
sent_tokenizer = nltk.data.load('tokenizers/punkt/PY3/english.pickle')

for filename in os.listdir(directory):
  f = os.path.join(directory, filename)
  if os.path.isfile(f) and f.endswith('.json'):

    # Iterate over .json files
    with open(f) as file:
      transcript_data.append(json.load(file))

      # Extract prepared remarks from transcript
      text = ' '.join([x['text'] for x in transcript_data[-1]['text_blocks'] if x['section'] == "Prepared Remarks"])

      # split text into groups within max input length (512 characters), maintaining complete sentences
      if len(input_data) < 40:
        sentences = sent_tokenizer.tokenize(text)
        input_data.append([])
        text_chunk = ""
        for sent in sentences:
          if len(text_chunk) + len(sent) <= 512:
            text_chunk += sent
          else:
            input_data[-1].append(text_chunk)
            text_chunk = sent
        input_data[-1].append(text_chunk)
      else:
        sentences = sent_tokenizer.tokenize(text)
        test_data.append([])
        text_chunk = ""
        for sent in sentences:
          if len(text_chunk) + len(sent) <= 512:
            text_chunk += sent
          else:
            test_data[-1].append(text_chunk)
            text_chunk = sent
        test_data[-1].append(text_chunk)

      # Extract stock prices and volatility
      price_before = transcript_data[-1]['closing_price_day_before'][-1]
      price_day_of = transcript_data[-1]['closing_price_day_of'][-1]
      price_after = transcript_data[-1]['closing_price_day_after'][-1]
      price_volatility = transcript_data[-1]['daily_volatility']

      # Get stock direction
      price_difference = price_after - price_before
      volatility_difference = price_day_of * price_volatility
      if abs(price_difference) - volatility_difference <= 0:
        # No price change (within volatility range)
        label = 0
      elif price_difference > 0:
        # Price increase
        label = 1
      else:
        # Price decrease
        label = 2
      if len(input_data_labels) < 40:
        input_data_labels.append(label)
      else:
        test_data_labels.append(label)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [None]:
print(input_data_labels)
print(test_data_labels)

[1, 0, 2, 2, 2, 1, 2, 0, 0, 2, 0, 1, 2, 2, 1, 0, 2, 0, 0, 0, 0, 2, 0, 0, 2, 1, 1, 2, 0, 1, 1, 0, 1, 2, 2, 1, 1, 1, 1, 1]
[0, 2, 0, 1, 2, 2, 0, 0, 2, 2, 0]


In [None]:
# Create tokenizer and model
tokenizer = BertTokenizer.from_pretrained('yiyanghkust/finbert-tone')
finbert = BertForSequenceClassification.from_pretrained('yiyanghkust/finbert-tone', num_labels=3)

vocab.txt:   0%|          | 0.00/226k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/533 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/439M [00:00<?, ?B/s]

In [None]:
# Chunk data to save RAM
def chunks(lst, n):
  for i in range(0, len(lst), n):
    yield lst[i:i + n]

### Validate

In [None]:
batch_size = 1
val_counts = []
for data in input_data:
  ensemble_labels = []
  for batch in chunks(data, batch_size):
    # Tokenize input data
    inputs = tokenizer(batch, padding = True, truncation = True, max_length = 512, return_tensors='pt')

    # Run model and get outputs
    outputs = finbert(**inputs)

    # Get output labels
    for result in outputs['logits']:
      result = result.tolist()
      ensemble_labels.append(result.index(max(result)))

  # Add to count
  val_counts.append(Counter(ensemble_labels))

In [None]:
# Find best negative count multiplier (weight)
max_acc = 0
weight = 0
for i in np.arange(1, 10, 0.1):
  pred = []
  for c in val_counts:
    temp = Counter(c)
    temp[2] = temp[2] * i
    pred.append(temp.most_common(1)[0][0])
  acc = accuracy_score(input_data_labels, pred)
  if acc >= max_acc:
    max_acc = acc
    weight = i
weight = round(weight, 1)
print("best performing weight is", weight, "with accuracy of", max_acc)

best performing weight is 3.1 with accuracy of 0.45


### Test

In [None]:
test_counts = []
batch_size = 1
for data in test_data:
  ensemble_labels = []
  for batch in chunks(data, batch_size):
    # Tokenize input data
    inputs = tokenizer(batch, padding = True, truncation = True, max_length = 512, return_tensors='pt')

    # Run model and get outputs
    outputs = finbert(**inputs)

    # Get output labels
    for result in outputs['logits']:
      result = result.tolist()
      ensemble_labels.append(result.index(max(result)))

  # Add to count
  test_counts.append(Counter(ensemble_labels))

In [None]:
output_labels = []
for c in test_counts:
  c[2] = c[2] * weight
  output_labels.append(c.most_common(1)[0][0])

In [None]:
print(output_labels)

[1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1]


In [None]:
# Generate and print performance metrics
target_names = ['no change', 'increase', 'decrease']
print(classification_report(test_data_labels, output_labels, target_names=target_names, digits=3))

              precision    recall  f1-score   support

   no change      0.000     0.000     0.000         5
    increase      0.100     1.000     0.182         1
    decrease      0.000     0.000     0.000         5

    accuracy                          0.091        11
   macro avg      0.033     0.333     0.061        11
weighted avg      0.009     0.091     0.017        11



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
