In [1]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Fri Feb 21 04:33:56 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA A100-SXM4-40GB          Off |   00000000:00:04.0 Off |                    0 |
| N/A   31C    P0             47W /  400W |       0MiB /  40960MiB |      0%      Default |
|                                         |                        |             Disabled |
+-----------------------------------------+------------------------+----------------------+
                                                

In [3]:
# !git clone https://github.com/AI4Finance-Foundation/FinGPT.git
# %cd FinGPT/fingpt/FinGPT_Forecaster

In [4]:
# Install required packages
!pip install -q transformers datasets peft torch finnhub-python


In [5]:
!pip install --upgrade yfinance==0.2.54



In [6]:
import re
import yfinance as yf
import finnhub
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
from datetime import datetime, timedelta
from google.colab import userdata
import pandas as pd
import os
import json

# Initialize Finnhub client
finnhub_client = finnhub.Client(api_key=userdata.get('Finhub_API'))

# Initialize huggingface
from huggingface_hub import login
login(token=userdata.get('HF_API'))


def get_company_info(symbol):
    stock = yf.Ticker(symbol)
    info = stock.info
    profile = finnhub_client.company_profile2(symbol=symbol)

    return {
        'name': info.get('longName'),
        'finnhubIndustry': profile.get('finnhubIndustry'),
        'ipo': profile.get('ipo'),
        'marketCapitalization': info.get('marketCap'),
        'currency': info.get('currency'),
        'shareOutstanding': info.get('sharesOutstanding'),
        'country': profile.get('country'),
        'ticker': symbol,
        'exchange': info.get('exchange')
    }

def get_stock_prices(symbol, start_date, end_date):
    stock = yf.Ticker(symbol)
    hist = stock.history(start=start_date, end=end_date)
    return {
        'startPrice': hist.iloc[0]['Close'],
        'endPrice': hist.iloc[-1]['Close'],
        'priceChange': 'increased' if hist.iloc[-1]['Close'] > hist.iloc[0]['Close'] else 'decreased'
    }

def get_company_news(symbol, start_date, end_date):
    news = finnhub_client.company_news(symbol, _from=start_date, to=end_date)
    formatted_news = []
    random_news = random.sample(news, min(5, len(news)))  # Get top 5 news articles, handles cases with fewer than 5 articles

    for article in random_news:
        formatted_news.append({
            'headline': article['headline'],
            'summary': article['summary']
        })
    return formatted_news

def get_basic_financials(symbol):
    financials = finnhub_client.company_basic_financials(symbol, 'all')
    metrics = financials['metric']
    return {k: v for k, v in list(metrics.items())[:5] if v is not None}  # Get top 5 metrics

def format_prompt(company_info, prices, news, financials, cur_date):
    news_text = "\n".join([f"[Headline]: {n['headline']}\n[Summary]: {n['summary']}\n" for n in news])
    financials_text = "\n".join([f"{k}: {v}" for k, v in financials.items()])

    next_week = (datetime.strptime(cur_date, '%Y-%m-%d') + timedelta(days=7)).strftime('%Y-%m-%d')

    SYSTEM_PROMPT = """You are a seasoned stock market analyst. Your task is to list the positive developments and potential concerns for companies based on relevant news and basic financials from the past weeks, then provide an analysis and prediction for the companies' stock price movement for the upcoming week. Your answer format should be as follows:

[Positive Developments]:
1. ...

[Potential Concerns]:
1. ...

[Prediction & Analysis]:
..."""

    prompt = f"""
[Company Introduction]:
{company_info['name']} is a leading entity in the {company_info['finnhubIndustry']} sector. Incorporated and publicly traded since {company_info['ipo']}, the company has established its reputation as one of the key players in the market. As of today, {company_info['name']} has a market capitalization of {company_info['marketCapitalization']:.2f} in {company_info['currency']}, with {company_info['shareOutstanding']:.2f} shares outstanding. {company_info['name']} operates primarily in the {company_info['country']}, trading under the ticker {company_info['ticker']} on the {company_info['exchange']}.

From {start_date} to {end_date}, {company_info['name']}'s stock price {prices['priceChange']} from {prices['startPrice']:.2f} to {prices['endPrice']:.2f}.

Company news during this period are listed below:
{news_text}

Some recent basic financials of {company_info['name']} are presented below:
[Basic Financials]:
{financials_text}

Based on all the information before {cur_date}, let's first analyze the positive developments and potential concerns for {company_info['ticker']}. Come up with 2-4 most important factors respectively and keep them concise. Most factors should be inferred from company-related news. Then make your prediction of the {company_info['ticker']} stock price movement for next week ({cur_date} to {next_week}). Provide a summary analysis to support your prediction.
"""

    B_INST, E_INST = "[INST]", "[/INST]"
    B_SYS, E_SYS = "<<SYS>>\n", "\n<</SYS>>\n\n"

    return B_INST + B_SYS + SYSTEM_PROMPT + E_SYS + prompt + E_INST

# Load the model
base_model = AutoModelForCausalLM.from_pretrained(
    'meta-llama/Llama-2-7b-chat-hf',
    trust_remote_code=True,
    device_map="auto",
    torch_dtype=torch.float16,
)
tokenizer = AutoTokenizer.from_pretrained('meta-llama/Llama-2-7b-chat-hf')
model = PeftModel.from_pretrained(base_model, 'FinGPT/fingpt-forecaster_dow30_llama2-7b_lora')
model = model.eval()

def generate_forecast(prompt):
    inputs = tokenizer(prompt, return_tensors='pt')
    inputs = {k: v.to(model.device) for k, v in inputs.items()}

    res = model.generate(
        **inputs,
        max_length=4096,
        do_sample=True,
        eos_token_id=tokenizer.eos_token_id,
        use_cache=True
    )

    output = tokenizer.decode(res[0], skip_special_tokens=True)
    return re.sub(r'.*\[/INST\]\s*', '', output, flags=re.DOTALL)



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [7]:
# Example usage
import random
symbol = "AVGO" # ticker
cur_date = "2025-02-20" # end date
weeks_back = 3 # less than 4

start_date = (datetime.strptime(cur_date, '%Y-%m-%d') - timedelta(weeks=weeks_back)).strftime('%Y-%m-%d')
end_date = cur_date

company_info = get_company_info(symbol)
prices = get_stock_prices(symbol, start_date, end_date)
news = get_company_news(symbol, start_date, end_date)
financials = get_basic_financials(symbol)

prompt = format_prompt(company_info, prices, news, financials, cur_date)
forecast = generate_forecast(prompt)
print(forecast)

[Positive Developments]:
1. The company is considered a leader in the AI chip market, which is a growing sector.
2. The company is featured in the list of five stocks near a buy point, indicating a potential increase in investor interest.
3. The company's stock has shown a strong upward trend in recent months, indicating a positive market sentiment.

[Potential Concerns]:
1. The company's stock is being impacted by the rise of competitors, such as Nvidia, which has been soaring in recent months.
2. The company's stock is also being influenced by broader market trends, such as the Dow Jones Futures rise and fall, which may affect the overall sentiment towards tech stocks.
3. The company's basic financials show a relatively low trading volume, which could be a concern for investors who prefer to see a higher trading volume in a company.

[Prediction & Analysis]:
Based on the above factors, my prediction for the stock price of Broadcom Inc. (AVGO) for the upcoming week (2025-02-20 to 2025

In [8]:
!pip install -q datasets
from datasets import load_dataset
import pandas as pd

splits = {'train': 'data/train-00000-of-00001-7c4c80aa07272d4c.parquet', 'test': 'data/test-00000-of-00001-28531804b005ddc6.parquet'}
df = pd.read_parquet("hf://datasets/FinGPT/fingpt-forecaster-dow30-202305-202405/" + splits["train"])


In [9]:
df.head()

Unnamed: 0,prompt,answer,period,label,symbol
0,[INST]<<SYS>>\nYou are a seasoned stock market...,[Positive Developments]:\n1. Market Outperform...,2023-05-14 to 2023-05-21,up by 3-4%,AXP
1,[INST]<<SYS>>\nYou are a seasoned stock market...,[Positive Developments]:\n1. American Express'...,2023-05-21 to 2023-05-28,up by 2-3%,AXP
2,[INST]<<SYS>>\nYou are a seasoned stock market...,[Positive Developments]:\n1. Increased dividen...,2023-05-28 to 2023-06-04,up by more than 5%,AXP
3,[INST]<<SYS>>\nYou are a seasoned stock market...,[Positive Developments]:\n1. The stock outperf...,2023-06-04 to 2023-06-11,up by 1-2%,AXP
4,[INST]<<SYS>>\nYou are a seasoned stock market...,[Positive Developments]:\n1. American Express ...,2023-06-11 to 2023-06-18,up by 0-1%,AXP


In [None]:
import pandas as pd
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
import time
from datetime import datetime
from typing import Dict, List, Any
import re
import numpy as np

class FinGPTEvaluator:
    def __init__(self):
        """Initialize the FinGPT evaluator with the base Llama-2 model"""
        print("Loading base Llama-2 model...")
        self.base_model = AutoModelForCausalLM.from_pretrained(
            'meta-llama/Llama-2-7b-chat-hf',
            trust_remote_code=True,
            device_map="auto",
            torch_dtype=torch.float16
        )
        self.tokenizer = AutoTokenizer.from_pretrained('meta-llama/Llama-2-7b-chat-hf')
        self.model = None

    def load_dataset(self, dataset_path: str, split: str = 'train') -> pd.DataFrame:
        """Load the FinGPT dataset from the specified path"""
        splits = {
            'train': 'data/train-00000-of-00001-7c4c80aa07272d4c.parquet',
            'test': 'data/test-00000-of-00001-28531804b005ddc6.parquet'
        }

        full_path = f"hf://datasets/FinGPT/fingpt-forecaster-dow30-202305-202405/{splits[split]}"
        return pd.read_parquet(full_path)

    def load_model(self, lora_path: str) -> None:
        """Load the FinGPT model with specified LoRA weights"""
        print(f"Loading FinGPT model with LoRA weights from: {lora_path}")
        self.model = PeftModel.from_pretrained(
            self.base_model,
            lora_path
        ).eval()

    def generate_forecast(self, prompt: str) -> Dict[str, Any]:
        """Generate forecast using the loaded model with metrics"""
        if self.model is None:
            raise ValueError("Model not loaded. Call load_model first.")

        # Tokenize input
        inputs = self.tokenizer(prompt, return_tensors='pt')
        inputs = {k: v.to(self.model.device) for k, v in inputs.items()}

        # Generate with timing
        start_time = time.time()
        with torch.no_grad():
            output = self.model.generate(
                **inputs,
                max_length=4096,
                do_sample=True,
                eos_token_id=self.tokenizer.eos_token_id,
                use_cache=True
            )
        end_time = time.time()

        # Process output
        generated_text = self.tokenizer.decode(output[0], skip_special_tokens=True)
        cleaned_output = re.sub(r'.*\[/INST\]\s*', '', generated_text, flags=re.DOTALL)

        return {
            'text': cleaned_output,
            'generation_time': end_time - start_time,
            'input_tokens': len(inputs['input_ids'][0]),
            'output_tokens': len(output[0])
        }

    def parse_prediction(self, text: str) -> str:
        """Extract predicted movement from generated text"""
        # Look for common prediction patterns
        up_patterns = ['up by', 'increase', 'rise', 'positive', 'upward']
        down_patterns = ['down by', 'decrease', 'fall', 'negative', 'downward']

        text_lower = text.lower()

        # Check for up patterns
        for pattern in up_patterns:
            if pattern in text_lower:
                return 'up'

        # Check for down patterns
        for pattern in down_patterns:
            if pattern in text_lower:
                return 'down'

        return 'unknown'

    def parse_label(self, label: str) -> str:
        """Convert label to standardized format"""
        if 'up' in label.lower():
            return 'up'
        elif 'down' in label.lower():
            return 'down'
        return 'unknown'

    def evaluate_on_dataset(self, df: pd.DataFrame, sample_size: int = None) -> pd.DataFrame:
        """Evaluate model on the dataset"""
        if sample_size:
            df = df.sample(n=sample_size, random_state=42)

        results = []

        for idx, row in df.iterrows():
            try:
                # Generate forecast
                forecast_result = self.generate_forecast(row['prompt'])

                # Parse prediction and actual label
                predicted_movement = self.parse_prediction(forecast_result['text'])
                actual_movement = self.parse_label(row['label'])

                results.append({
                    'idx': idx,
                    'symbol': row['symbol'],
                    'period': row['period'],
                    'status': 'success',
                    'predicted_movement': predicted_movement,
                    'actual_movement': actual_movement,
                    'is_correct': predicted_movement == actual_movement,
                    'ground_truth_answer': row['answer'],
                    **forecast_result
                })

            except Exception as e:
                results.append({
                    'idx': idx,
                    'symbol': row['symbol'],
                    'period': row['period'],
                    'status': f'error: {str(e)}',
                    'predicted_movement': 'error',
                    'actual_movement': self.parse_label(row['label']),
                    'is_correct': False,
                    'text': None,
                    'generation_time': None,
                    'input_tokens': None,
                    'output_tokens': None
                })

        return pd.DataFrame(results)

def analyze_results(results_df: pd.DataFrame) -> Dict[str, Any]:
    """Analyze prediction results and calculate metrics"""
    # Filter successful predictions
    success_df = results_df[results_df['status'] == 'success']

    metrics = {
        'total_samples': len(results_df),
        'success_rate': (results_df['status'] == 'success').mean() * 100,
        'avg_generation_time': success_df['generation_time'].mean(),
        'avg_output_length': success_df['output_tokens'].mean(),
        'error_rate': (results_df['status'].str.startswith('error')).mean() * 100,
        'prediction_accuracy': success_df['is_correct'].mean() * 100,
    }

    # Calculate per-symbol metrics
    symbol_metrics = results_df.groupby('symbol').agg({
        'is_correct': 'mean',
        'status': lambda x: (x == 'success').mean()
    }).rename(columns={
        'is_correct': 'accuracy',
        'status': 'success_rate'
    })

    # Add confusion matrix metrics
    success_df['actual_up'] = success_df['actual_movement'] == 'up'
    success_df['predicted_up'] = success_df['predicted_movement'] == 'up'

    true_positives = (success_df['actual_up'] & success_df['predicted_up']).sum()
    false_positives = (~success_df['actual_up'] & success_df['predicted_up']).sum()
    true_negatives = (~success_df['actual_up'] & ~success_df['predicted_up']).sum()
    false_negatives = (success_df['actual_up'] & ~success_df['predicted_up']).sum()

    metrics.update({
        'true_positives': true_positives,
        'false_positives': false_positives,
        'true_negatives': true_negatives,
        'false_negatives': false_negatives,
        'precision': true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0,
        'recall': true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0,
    })

    return metrics, symbol_metrics

# Example usage
if __name__ == "__main__":
    # Initialize evaluator
    evaluator = FinGPTEvaluator()

    # Load model
    evaluator.load_model('FinGPT/fingpt-forecaster_dow30_llama2-7b_lora')

    # Load dataset
    train_df = evaluator.load_dataset('train')
    test_df = evaluator.load_dataset('test')

    print("\nEvaluating on test set sample...")
    # Evaluate on a sample of test data first
    test_results = evaluator.evaluate_on_dataset(test_df, sample_size=50)

    # Analyze results
    metrics, symbol_metrics = analyze_results(test_results)

    print("\nOverall Evaluation Metrics:")
    for metric, value in metrics.items():
        print(f"{metric}: {value}")

    print("\nPer-Symbol Metrics:")
    print(symbol_metrics)

    # Save results
    test_results.to_csv('fingpt_evaluation_results.csv', index=False)
    symbol_metrics.to_csv('fingpt_symbol_metrics.csv')

Loading base Llama-2 model...


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Loading FinGPT model with LoRA weights from: FinGPT/fingpt-forecaster_dow30_llama2-7b_lora

Evaluating on test set sample...
