# FinBERT Sentiment Analysis

In [None]:
!git clone https://github.com/AI4Finance-Foundation/FinGPT.git
%cd FinGPT



In [None]:
%pip install -r requirements.txt
%pip install -q transformers datasets sentencepiece



In [None]:
%pip install transformers==4.40.1 peft==0.4.0
%pip install sentencepiece
%pip install accelerate
%pip install torch
%pip install peft
%pip install datasets
%pip install bitsandbytes


In [None]:
from transformers import pipeline

sentiment_model = pipeline("sentiment-analysis", model="yiyanghkust/finbert-tone")

In [None]:
from os import cpu_count
print(cpu_count())

In [None]:
# Example financial news headline
text = "Apple shares fall after iPhone sales miss expectations"
result = sentiment_model(text)
print(result)


In [None]:
import pandas as pd
import numpy as np

In [None]:
dataframe = pd.read_csv("../Data Collection/data.csv")

dataframe.head()


In [None]:
%pip install colorama

In [None]:
def preprocess(text):
    new_text = []
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)

In [None]:
from concurrent.futures import ThreadPoolExecutor
from colorama import Fore, Style, init
import pandas as pd
import os
import time
import warnings
from transformers import pipeline

init(autoreset=True)

# Global Constants
NUM_PROCESSES = 1
MODEL_NAME = "yiyanghkust/finbert-tone"
INPUT_FILE_ADDRESS = "../Data Collection/data.csv"
OUTPUT_FILE = "finbert_sentiment_analysis_data.csv"
COMPUTATION_TIME_DATA = "model_prediction_time_data.csv"

def load_model(model):
  sentiment_model = pipeline("sentiment-analysis", model=model)
  return sentiment_model

sentiment_model = load_model(MODEL_NAME)




def find_sentiment(text):
  try:
    text = preprocess(text)
    print(f"{Fore.GREEN} Input:- {text[:20]}{Style.RESET_ALL}")
    result = sentiment_model(text)
    print(f"{Fore.GREEN} Output:- {result}{Style.RESET_ALL}")
    return list((text, result[0]["label"], result[0]["score"]))
  except Exception as e:
    print(f"{Fore.RED} Error: {e}{Style.RESET_ALL}")
    return list((text, np.nan, np.nan))

def process_texts_in_parallel(texts):
    """
        process sentiment from the texts in paprallel
    """
    print(f"{Fore.CYAN}🔹 Using {NUM_PROCESSES} parallel threads...{Style.RESET_ALL}")

    with ThreadPoolExecutor(NUM_PROCESSES) as executor:
        results = list(executor.map(find_sentiment, texts))

    return results


def sentiment_analysis_of_text_data():
    if not os.path.exists(INPUT_FILE_ADDRESS):
        print(f"{Fore.RED}Error: '{INPUT_FILE_ADDRESS}' not found!{Style.RESET_ALL}")
        return

    df = pd.read_csv(INPUT_FILE_ADDRESS)

    if "News" not in df.columns:
        print(f"{Fore.RED}Error: 'News' column missing in CSV!{Style.RESET_ALL}")
        return

    texts = df["News"].dropna().tolist()

    # when the scrapping stops unexpectedly
    already_processed_texts = set()
    if os.path.exists(OUTPUT_FILE):
        already_processed_texts = set(
            pd.read_csv(OUTPUT_FILE)["News"].dropna().tolist()
        )

    # filter the links if they are already processed

    links = [text for text in texts if text not in already_processed_texts]

    # Split into batches to avoid excessive memory usage
    batch_size = 500
    total_batches = (len(texts) // batch_size) + 1

    all_results = []

    for i in range(total_batches):
        batch_texts = texts[i * batch_size: (i + 1) * batch_size]
        print(f"{Fore.YELLOW} Processing batch {i + 1}/{total_batches} ({len(batch_texts)} links){Style.RESET_ALL}")

        results = process_texts_in_parallel(batch_texts)  # Removed `[0]`
        all_results.extend(results)

        # Save intermediate results
        temp_df = pd.DataFrame(all_results, columns=["News", "sentiment_label", "sentiment_score"])
        temp_df.to_csv(OUTPUT_FILE, index=False, encoding="utf-8")

    print(f"{Fore.GREEN}✔️ Sentiment Analysis complete! Data saved to '{OUTPUT_FILE}'{Style.RESET_ALL}")

In [None]:
# if __name__ == "__main__":
#    sentiment_analysis_of_text_data()

In [None]:
import time

if __name__ == "__main__":
    start_time = time.time()
    print("Running financial sentiment analysis...")
    sentiment_analysis_of_text_data()

    end_time = time.time()
        
    computation_time = end_time - start_time
    
    data = list((MODEL_NAME, computation_time)) # model time data
    
    time_dataframe = pd.DataFrame(
        data=data,columns=["Model", "Computation_time"],
    )
    
    time_dataframe.to_csv(
        path_or_buf=COMPUTATION_TIME_DATA,
        mode='a',
        index=False, 
        header=not os.path.exists(COMPUTATION_TIME_DATA), 
        encoding="utf-8"
    )