In [1]:
# This script uses llama2 to classify the headlines in the test set and store the results.
# The script requires significant ammount of RAM and GPU memory to run
# and is intended to run in Google Colab (on A100 instance).

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd drive/MyDrive/your_path/Refined_Applied_Project/llama

/content/drive/MyDrive/Refined_Applied_Project/llama


In [None]:
!pip install huggingface_hub langchain transformers accelerate bitsandbytes

In [4]:
import pandas as pd
import transformers
import time
import torch
import gc
import configparser
import datetime
import pytz
import os

from transformers import AutoModelForCausalLM, AutoTokenizer
from huggingface_hub.hf_api import HfFolder
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

In [None]:
# Set the directory path and read in the config file
# Get the current working directory
script_dir = os.getcwd()
abs_config_path = os.path.join(script_dir, '../config.ini')
config = configparser.ConfigParser()
config.read(abs_config_path)

# Set the variables from the config file
hf_token = config.get('DEFAULT', 'hf_token') # HuggingFace token for the API
HfFolder.save_token(hf_token) # Save the HuggingFace token
model_path = config.get('llama2_classifier', 'model')
train_size = config.getfloat('DEFAULT', 'train_size')
test_df_path = '../data/test_' + str(round(1-train_size,2)) + '.csv'
test_df_path = os.path.join(script_dir, test_df_path)
few_shot = config.getboolean('DEFAULT', 'few_shot') # Few shot learning

# Load the test dataframe
df = pd.read_csv(test_df_path)

# Load Llama and the respective tokenizer from huggingface
model = AutoModelForCausalLM.from_pretrained(
    model_path, device_map="auto", load_in_4bit=True,
    use_auth_token=True)

tokenizer = AutoTokenizer.from_pretrained(model_path, use_auth_token=True,
                                          load_in_4_bit=True)

pipeline = transformers.pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    torch_dtype='auto',
    device_map="auto",
    max_new_tokens=10,
    do_sample=True,
    top_k=10,
    num_return_sequences=1,
    eos_token_id = tokenizer.eos_token_id,
    pad_token_id = tokenizer.pad_token_id
)
tokenizer.pad_token = tokenizer.eos_token

# Due to tehcincal issues, the padding side needs to be set to right for the 7B model
if model_path == 'meta-llama/Llama-2-7b-chat-hf':
  tokenizer.padding_side = "right"
else:
  tokenizer.padding_side = "left"

In [None]:
if few_shot == True:
    prompt_template = """### Instruction: As a retail investor, you are presented with a financial headline. Your task is to classify the sentiment expressed in the headline using one of the following labels: [NEGATIVE, POSITIVE, NEUTRAL].

  # Example 1:
  ### Headline: Consolidated pretax profit decreased by 69.2 % to EUR 41.0 mn from EUR 133.1 mn in 2007 .
  ### Response: NEGATIVE

  # Example 2:
  ### Headline: In 2007 , Huhtamaki will continue to invest in organic growth .
  ### Response: NEUTRAL

  # Example 3:
  ### Headline: MD Henning Bahr of Stockmann Gruppen praises the trend , since the chains become stronger and their decision-making processes more clear .
  ### Response: POSITIVE

  ### Headline: {headline}
  ### Response: The sentiment expressed in the headline is"""

else:
    prompt_template = """### Instruction:
    As a retail investor, you are presented with a financial headline. Your task is to classify the sentiment expressed in the headline using one of the following labels: [NEGATIVE, POSITIVE, NEUTRAL].

    ### Headline:
    {headline}

    ### Please respond with only one of the following labels: NEGATIVE, POSITIVE, or NEUTRAL.

    ### Response: The sentiment expressed in the headline is"""

In [None]:
# This inserts the headline into the prompt template
# Initialize an empty list to store the formatted strings
prompts = []

# Iterate over each headline in the DataFrame column and apply the f-string
for headline in df['Headline'].tolist():
    prompts.append(prompt_template.format(headline = headline))

# Apply the pipeline classifier
start_time = time.time()
raw_responses = pipeline(prompts, batch_size=16)
end_time = time.time()

elapsed_time = end_time - start_time
print(f"The function took {elapsed_time} seconds to complete.")

torch.cuda.empty_cache()  # Free up memory
gc.collect()  # Collect any garbage

In [None]:
# Sample code to filter out unwanted characters and retain only the label in the
# 'response' variable
responses = []
unknown_responses = []
unknown_count = 0

for item in raw_responses:
    for sub_item in item:
        generated_text = sub_item['generated_text']
        response_parts = generated_text.split("The sentiment expressed in the headline is")
        if len(response_parts) < 2:
            response = 'Empty Response'
            unknown_responses.append(response_parts)

        else:
            response = response_parts[1].strip()
            # Check if the response is empty
            if not response:
                response = 'UNKNOWN'
            else:
                # Strip away everything but the first word (assuming the first word is the label)
                response_list = response.split()
                for i in response_list:
                  i = i.strip('.,;')
                  i = i.upper()
                  # print('i upper is ', i)
                  for label in ["NEGATIVE", "NEUTRAL", "POSITIVE"]:
                    if i == label:
                      response = i

        if response.upper() not in ['NEGATIVE', 'NEUTRAL', 'POSITIVE']:
            # Append the list so unrecognised reposnses can be examined later
            unknown_responses.append(response)
            unknown_count += 1
            # This is consistent with Zhang, Yang & Liu (2023)
            response = 'NEUTRAL'

        responses.append(response)

# Add the classified labels to the df
df['Predicted_Label'] = responses

# Output the number of rows where NEUTRAL was subsituted due to errors/unrecognised output
# This is consistent with Zhang, Yang & Liu (2023)
print(f"Number of rows with substituted 'NEUTRAL' in the Predicted_Label column: {unknown_count}.")

# Define a dictionary to map the old values to the new values
mapping = {'NEGATIVE': -1, 'NEUTRAL': 0, 'POSITIVE': 1}

# Replace the values in the two columns using the mapping dictionary
df['True_Label'] = df['True_Label'].map(mapping)
df['Predicted_Label'] = df['Predicted_Label'].map(mapping)

# Calculate accuracy
accuracy = accuracy_score(df['True_Label'], df['Predicted_Label'])
print(f"Accuracy: {accuracy}")
# Calculate precision
precision = precision_score(df['True_Label'], df['Predicted_Label'], average='weighted')
print(f"Precision: {precision}")
# Calculate recall
recall = recall_score(df['True_Label'], df['Predicted_Label'], average='weighted')
print(f"Recall: {recall}")
# Calculate F1 score
f1 = f1_score(df['True_Label'], df['Predicted_Label'], average='weighted')
print(f"F1 score: {f1}")

In [None]:
#Store the results in the dataframe
df_results_path = os.path.join(script_dir, '../results/df_results.csv')
df_results = pd.read_csv(df_results_path)
error_count = unknown_count
bst = pytz.timezone('Europe/London')
now = datetime.datetime.now(bst)
formatted_time = now.strftime('%d/%m/%Y/%H:%M')

# Storing the results
df_results = pd.concat([
    df_results,
    pd.DataFrame({
        'Model': model_path,
        'Test_Size': round(1-train_size,2),
        'Accuracy': accuracy,
        'F1': f1,
        'Precision': precision,
        'Recall': recall,
        'Prompt': prompt_template,
        'Error Count': error_count,
        'DateTime': formatted_time,
        'Few_Shot': few_shot
    }, index=[0])
], ignore_index=True)

df_results.to_csv(df_results_path, index=False)