# **Lock numpy and genism version**
+ Lock numpy version: 1.26.4
+ Lock gensim version: 4.3.3

In [None]:
# Lock numpy version to prevent compatibility issues with gensim
!pip install gensim
import os
os._exit(0)



In [None]:
# Install gensim and lock numpy version to avoid compatibility issues
import gensim
import numpy

print(f"Gensim version: {gensim.__version__}")
print(f"Numpy version: {numpy.__version__}")

# **Import required packages**

In [None]:
# 安裝函式庫
!pip install gradio flair openai chromadb langchain langchain_openai langchain-community neo4j python-dotenv



# **Mount Google Drive folder for file access**

In [None]:
from google.colab import drive
import sys
drive.mount('/content/drive', force_remount=True)

# Set the working directory to ensure `ner_utils.py` can be found
NER_FOLDER = "/content/drive/MyDrive/FinScope3D/Ner_Module"
sys.path.append(NER_FOLDER)

Mounted at /content/drive


# **FinScope3D**
1. Import Required Packages

2. Import OpenAI for Translation

3. Import NER (Named Entity Recognition)

4. Determine User Intent and Select Appropriate Model

5. Integrate Querying for Structured Data

6. Integrate Querying for Unstructured Data

7. Integrate Querying for Prediction Module

8. Implement User Personalization

9. Launch Gradio Interface


# **1. Import Required Packages**

In [None]:
# --- Standard Library ---
import os
import re
import json
import pickle
import datetime

# --- Third-party Libraries ---
import pandas as pd
import numpy as np
import shap
import yfinance as yf
import pandas_datareader.data as web
from langdetect import detect
from dotenv import load_dotenv
import gradio as gr

# Machine Learning
from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import NMF
from sklearn.impute import SimpleImputer
from sentence_transformers import CrossEncoder

# OpenAI & LangChain
import openai
from langchain_openai import ChatOpenAI
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from langchain.graphs import Neo4jGraph
from langchain.chains import LLMChain
from langchain.vectorstores import Chroma
from langchain.schema import Document

import chromadb

# --- Custom Utilities ---
from ner_utils import extract_company_ticker, extract_year
from nasdaq_companies import company_to_ticker, companies_20f

# --- Config ---
SEC_URL = "https://www.sec.gov/edgar/search/"
MIN_YEAR_SUPPORTED = 2020
MAX_YEAR_SUPPORTED = 2026
CURRENT_YEAR = 2025
CHROMA_PATH = "/content/drive/My Drive/FinScope3D/Structured_Data/chroma_db"
RAG_CHROMA_PATH = '/content/drive/My Drive/FinScope3D/Unstructured_Data/chroma_db/fr_database'
env_path = "/content/drive/My Drive/.env"
load_dotenv(dotenv_path=env_path)
openai.api_key = os.getenv("OPENAI_API_KEY")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


2025-03-30 04:17:29,835 SequenceTagger predicts: Dictionary with 76 tags: <unk>, O, B-CARDINAL, E-CARDINAL, S-PERSON, S-CARDINAL, S-PRODUCT, B-PRODUCT, I-PRODUCT, E-PRODUCT, B-WORK_OF_ART, I-WORK_OF_ART, E-WORK_OF_ART, B-PERSON, E-PERSON, S-GPE, B-DATE, I-DATE, E-DATE, S-ORDINAL, S-LANGUAGE, I-PERSON, S-EVENT, S-DATE, B-QUANTITY, E-QUANTITY, S-TIME, B-TIME, I-TIME, E-TIME, B-GPE, E-GPE, S-ORG, I-GPE, S-NORP, B-FAC, I-FAC, E-FAC, B-NORP, E-NORP, S-PERCENT, B-ORG, E-ORG, B-LANGUAGE, E-LANGUAGE, I-CARDINAL, I-ORG, S-WORK_OF_ART, I-QUANTITY, B-MONEY


# **2. Import OpenAI for Translation**

In [None]:
def detect_language(text):
    """
    Detect the language of the input text.

    If the number of Chinese characters exceeds the number of English words,
    return "zh-tw". Otherwise, return the result from langdetect.
    If detection fails, default to "en".
    """
    try:
        lang = detect(text)

        # Count Chinese characters and English words
        chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
        english_words = re.findall(r'[a-zA-Z]+', text)

        if len(chinese_chars) > len(english_words):
            return "zh-tw"

        return lang

    except:
        return "en"

In [None]:
def safe_detect_language(text, fallback="en", whitelist=None):
    """
    Detect the language of the input text with a fallback mechanism.

    If the detected language is not in the whitelist, return the fallback language.
    By default, fallback is "en" and the whitelist includes major Asian languages and English.
    """
    if whitelist is None:
        whitelist = ["en", "zh", "zh-tw", "zh-cn", "ja", "ko"]

    lang = detect_language(text)
    return lang if lang in whitelist else fallback

In [None]:
def translate_to_english(text):
    """
    Translate the input text into English using the OpenAI API.
    Translation is only performed if the input is not already in English.
    """
    try:
        client = openai.OpenAI(api_key=openai.api_key)

        prompt = f"""
        Translate the following text into English. Only return the translation, without any additional comments or explanations.

        Input:
        {text}
        """

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500
        )

        return response.choices[0].message.content.strip()

    except Exception as e:
        return f"Translation failed: {str(e)}"

In [None]:
def translate_to_language(text, target_lang):
    """
    Translate an English response into the user's language.

    Uses the OpenAI API to translate the given text into the specified target language.
    Only the translated result is returned without any extra commentary.
    """
    client = openai.OpenAI(api_key=openai.api_key)

    prompt = f"""
    Translate the following text into {target_lang}.
    Only return the translation, without any additional comments or explanations.

    Input:
    {text}
    """

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a translation AI."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=500
    )

    return response.choices[0].message.content.strip()

# **3. Import NER (Named Entity Recognition)**



In [None]:
def handle_company_query(english_text):
    """
    Handle logic related to company name detection and validation.

    Returns a standardized company-ticker dictionary or an appropriate message
    depending on the result.
    """
    english_text = english_text.upper()

    # Extract raw company and ticker info from the input text
    company_result_raw = extract_company_ticker(english_text)

    # Clean and standardize the result
    company_result = {
        "company": (company_result_raw["company"] or "").strip().lower(),
        "ticker": (company_result_raw["ticker"] or "").strip().upper()
    }

    # No company name detected
    if not company_result["company"]:
        return "Please make sure the company name is correct!"

    # Company not found in the NASDAQ-100 list
    if company_result["company"] not in company_to_ticker:
        return "Sorry, we only have information on NASDAQ-100 companies!"

    # Company is in NASDAQ-100 but provides a 20-F report instead of 10-K
    if company_result["company"] in companies_20f:
        return (
            f"Sorry, {company_result['company']} is listed on NASDAQ-100 but does not provide a 10-K report. "
            f"Please check its 20-F report instead at {SEC_URL}."
        )

    # Return cleaned and verified company info
    return company_result

In [None]:
def handle_year_query(english_text):
    """
    Handle logic related to extracting and validating the year from input text.

    Returns the extracted year if valid, otherwise None or an appropriate message.
    """
    year_result = extract_year(english_text)

    # If no valid result or unexpected format, return None (year is optional)
    if year_result is None or not isinstance(year_result, dict):
        return None

    year = year_result.get("year")

    # If year not found, return None
    if year is None:
        return None

    # Year is too early — suggest checking the SEC website
    if year < MIN_YEAR_SUPPORTED:
        return f"We do not provide 10-K reports for {year}. Please check {SEC_URL}."

    # Year is too far in the future — forecasts not available
    if year >= MAX_YEAR_SUPPORTED:
        return "We can only provide forecasts up to the year 2025."

    return year_result

In [None]:
def replace_company_with_ticker(query):
    """
    Use a NER model to detect a company name and replace it with its stock ticker in the query.

    If no ticker is found, the original query is returned unchanged.
    """
    company_result = extract_company_ticker(query)  # Use NER to extract company info
    modified_query = query  # Default: original query

    # If a ticker is found, replace the company name with the ticker
    if company_result["ticker"]:
        original_company = company_result["company"]
        modified_query = re.sub(
            rf"\b{re.escape(original_company)}\b",
            company_result["ticker"],
            query,
            flags=re.IGNORECASE
        )

    return modified_query

# **4. Determine User Intent and Select Appropriate Model**

In [None]:
def classify_intent(user_input):
    """
    Classify the user's query intent using the OpenAI GPT-4o model.

    Categories:
    0 - Structured Financial Data
    1 - Unstructured Financial Analysis
    2 - Market Predictions
    3 - Other (not finance-related)

    Returns an integer between 0–3. Defaults to 3 if classification fails.
    """
    try:
        client = openai.OpenAI(api_key=openai.api_key)

        prompt = (
            "Classify the following user query into one of these categories:\n"
            "0 - Structured Financial Data (questions about revenue, profit, EPS, or other financial statement metrics)\n"
            "1 - Unstructured Financial Analysis (questions about business strategy, risks, or management discussions)\n"
            "2 - Market Predictions (any question related to stock prices, stock trends, or stock market performance)\n"
            "3 - Other (not finance-related)\n\n"
            f"User Query: {user_input}\n"
            "If the query is related to stock prices, stock trends, or stock market performance, classify it as 2.\n"
            "If the query asks about financial statement metrics (e.g., revenue, profit, EPS), classify it as 0, regardless of the year.\n"
            "If the query is about business strategy, risks, or management discussions, classify it as 1.\n"
            "If the query does not fit into any of the above categories, classify it as 3.\n"
            "Return only one number: 0, 1, 2, or 3."
        )

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a finance AI assistant."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=1
        )

        classification = response.choices[0].message.content.strip()

        # Ensure valid output; fallback to category 3 if invalid
        return int(classification) if classification in {"0", "1", "2", "3"} else 3

    except Exception as e:
        print(f"Error in classify_intent: {e}")
        return 3  # Fallback to "Other" on failure

# **5. Integrate Querying for Structured Data**

In [None]:
# Initialize ChromaDB persistent client
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)

# Create or retrieve the "financial_data" collection
collection = chroma_client.get_or_create_collection(name="financial_data")

In [None]:
def query_chromadb(question, n_results=5):
    """
    Query financial data from ChromaDB using the provided question.

    The function first replaces the company name in the query with its ticker,
    then searches the vector store for the most relevant documents.
    """
    query_with_ticker = replace_company_with_ticker(question)

    results = collection.query(
        query_texts=[query_with_ticker],
        n_results=n_results
    )

    documents = results["documents"][0] if results["documents"] else []
    return documents


In [None]:
def get_structured_data(user_input):
    """
    Retrieve structured financial data from ChromaDB and generate a natural language response using OpenAI.

    If no relevant data is found, a polite fallback message is returned.
    """
    retrieved_data = query_chromadb(user_input)

    client = openai.OpenAI(api_key=openai.api_key)

    if not retrieved_data:
        return "Sorry, I couldn't find any relevant financial data."

    # Format the retrieved documents
    structured_data = "\n".join(retrieved_data)

    # Compose the prompt for OpenAI response generation
    prompt = f"""
    You are an AI financial analyst. Based on the following financial data, answer the question:
    ---
    {structured_data}
    ---
    Question: {user_input}
    Remember you cannot say this is predicted data or say any word related to "future" or "project to".
    Please provide a professional yet easy-to-understand response.
    """

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a translation AI."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=500
    )

    return response.choices[0].message.content.strip()

# **6. Integrate Querying for Unstructured Data**

In [None]:
os.environ['OPENAI_API_KEY'] = openai.api_key

In [None]:
# Loading Chroma database for RAG
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
rag_db = Chroma(persist_directory=RAG_CHROMA_PATH, embedding_function=embeddings)

  embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
  rag_db = Chroma(persist_directory=RAG_CHROMA_PATH, embedding_function=embeddings)


In [None]:
# Loading neo4j database for GraphRAG
URI = os.getenv("URI")
USER = os.getenv("USER")
PASSWORD = os.getenv("PASSWORD")
graph = Neo4jGraph(url=URI, username=USER, password=PASSWORD)

  graph = Neo4jGraph(url=URI, username=USER, password=PASSWORD)


In [None]:
# Initiate LLM
model = ChatOpenAI()

In [None]:
# Incorporate prompt to use LLM to convert natural language query into a Cypher query
cypher_prompt = PromptTemplate(
    template="Convert the following natural language query into a Cypher query for a neo4j knowledge graph: {query}",
    input_variables=["query"],
)
cypher_chain = LLMChain(llm=model, prompt=cypher_prompt)

  cypher_chain = LLMChain(llm=model, prompt=cypher_prompt)


In [None]:
# Use cross_encoder to better rerank results from similarity search
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

In [None]:
def remove_numbers(user_input):
  return re.sub(r'\d+', '', user_input)

In [None]:
def query_rag(user_input):
  # Remove numbers for unstructured query
  user_input = remove_numbers(user_input)

  # Generate cypher query and do graph query, and store graph results
  generated_cypher_query = cypher_chain.run(user_input)
  graph_results = graph.query(generated_cypher_query)
  graph_text = "\n".join([
      ", ".join(f"{key}: {value}" for key, value in record.items())
      for record in graph_results]
                         )

  PROMPT_TEMPLATE = """
  As a professional equity analyst, answer the question based only on the following context within 100 words:

  {context}

  ---

  Answer the question based on the above context: {question}
  """
  results = rag_db.similarity_search_with_relevance_scores(user_input, k=10)

  # Store the preliminary results from similarity search
  ranked_documents = [
    {
        'page_content': doc.page_content,
        'metadata': doc.metadata,
        'score': score
    }
    for doc, score in results
    ]
  reranker_input = [
    {
        "query": user_input,
        "page_content": doc["page_content"],
        "metadata": doc["metadata"],
        "original_score": doc["score"]
    }
    for doc in ranked_documents]

  if len(results) == 0 or results[0][1] < 0.7:
    return 'Unable to find relevant results'
  else:
    pairs = [[user_input, doc['page_content']] for doc in reranker_input]
    rerank_scores = cross_encoder.predict(pairs)
    for i, score in enumerate(rerank_scores):
      reranker_input[i]["rerank_score"] = score

    final_text_results = sorted(reranker_input, key=lambda x: x["rerank_score"], reverse=True)

    top_k_documents = [
        {"page_content": doc["page_content"], "metadata": doc["metadata"]}
        for doc in final_text_results[:5]
        ]
    final_docs = [
        Document(page_content=doc["page_content"], metadata=doc["metadata"]) for doc in top_k_documents
        ]
    text_context = '\n\n---\n\n'.join([doc.page_content for doc in final_docs])
    full_context = text_context + "\n\nGraph-Based Knowledge:\n" + graph_text if graph_text else text_context
    prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
    prompt = prompt_template.format(context=full_context, question=user_input)

    response = model.predict(prompt)

    sources = [f"{doc.metadata.get('company_ticker', '')}_{doc.metadata.get('year', None)}" for doc, _score in results]
    formatted_response = f"{response}"
    return formatted_response

# **7. Integrate Querying for Prediction Module**

In [None]:
def fetch_annual_avg(year, indicator):
    """
    Helper function to fetch the annual average of a single FRED indicator.
    If data is unavailable for the given year, fetches the previous year's data.
    """
    start_date = datetime.datetime(year, 1, 1)
    end_date = datetime.datetime(year, 12, 31)

    try:
        data = web.DataReader(indicator, 'fred', start_date, end_date)
        if not data.empty:
            return data.mean().iloc[0]
    except Exception as e:
        print(f"Error fetching data for {indicator} in {year}: {e}")

    # If no data, try previous year
    print(f"No data for {indicator} in {year}, trying {year - 1}")
    return fetch_annual_avg(year - 1, indicator)

def get_fred_annual_avg(year):
    """
    Fetch the annual average of selected FRED macroeconomic indicators for a given year.
    If a value is None, tries fetching data from the previous year.
    """
    indicators = ["DGS10", "PCEPILFE", "INDPRO", "RBUSBIS", "M2SL", "UNRATE", "FEDFUNDS", "USEPUINDXD"]
    results = {}

    for indicator in indicators:
        results[indicator] = fetch_annual_avg(year, indicator)

    # Fetch GDP growth rate
    results["GDP_Growth_Rate"] = fetch_annual_avg(year, "A191RL1Q225SBEA") / 100  # Convert percentage to decimal

    return results

# Example usage:
#print(get_fred_annual_avg(2023))

In [None]:
def get_last_friday(year):
    """Find the last Friday of December for a given year."""
    last_day = datetime.date(year, 12, 31)

    # If it's not a Friday, move back to the most recent Friday
    while last_day.weekday() != 4:  # 4 = Friday
        last_day -= datetime.timedelta(days=1)

    return last_day.strftime("%Y-%m-%d")

In [None]:
def get_annual_financial_ratios(ticker, year):
    """
    Retrieve key financial ratios and performance metrics for a given stock and year.

    Parameters:
        ticker (str or list): Ticker symbol(s) of the stock(s).
        year (int): Target fiscal year for extracting financials.

    Returns:
        dict: Dictionary of financial metrics for each ticker.
    """
    if isinstance(ticker, str):
        tickers = [ticker]
    else:
        tickers = ticker

    data = {}

    for t in tickers:
        stock = yf.Ticker(t)

        try:
            financials = stock.financials
            balance_sheet = stock.balance_sheet
            cashflow = stock.cashflow
            info = stock.info

            def get_value(df, row_name, year_offset=0):
                if row_name in df.index:
                    col_values = df.loc[row_name].dropna()
                    if len(col_values) > year_offset:
                        return col_values.iloc[year_offset]
                return None

            latest_year = financials.columns[0].year
            year_offset = latest_year - year

            net_income = get_value(financials, "Net Income", year_offset)
            total_assets = get_value(balance_sheet, "Total Assets", year_offset)
            total_equity = get_value(balance_sheet, "Common Stock Equity", year_offset)
            total_debt = get_value(balance_sheet, "Total Debt", year_offset)
            revenue = get_value(financials, "Total Revenue", year_offset)
            operating_income = get_value(financials, "Operating Income", year_offset)
            current_assets = get_value(balance_sheet, "Current Assets", year_offset)
            current_liabilities = get_value(balance_sheet, "Current Liabilities", year_offset)
            shares_outstanding = get_value(balance_sheet, "Ordinary Shares Number", year_offset)
            total_cash = get_value(balance_sheet, "Cash And Cash Equivalents", year_offset)
            revenue_current = get_value(financials, "Total Revenue", year_offset)
            revenue_previous = get_value(stock.financials.shift(-1), "Total Revenue", year_offset + 1)

            revenue_growth = ((revenue_current - revenue_previous) / revenue_previous) * 100 \
                if revenue_current and revenue_previous else None

            trailing_eps = info.get("trailingEps")

            # Annual stock return
            start_price_data = stock.history(start=f"{year}-01-01", end=f"{year}-01-10")
            end_price_data = stock.history(start=f"{year}-12-20", end=f"{year + 1}-01-05")
            start_price = start_price_data["Close"].iloc[0] if not start_price_data.empty else None
            end_price = end_price_data["Close"].iloc[-1] if not end_price_data.empty else None
            return_on_stock = ((end_price - start_price) / start_price) * 100 if start_price and end_price else None

            # NDX benchmark return
            ndx = yf.Ticker("^NDX")
            ndx_start = ndx.history(start=f"{year}-01-01", end=f"{year}-01-10")
            ndx_end = ndx.history(start=f"{year}-12-20", end=f"{year + 1}-01-05")
            ndx_start_price = ndx_start["Close"].iloc[0] if not ndx_start.empty else None
            ndx_end_price = ndx_end["Close"].iloc[-1] if not ndx_end.empty else None
            ndx_annual_return = ((ndx_end_price - ndx_start_price) / ndx_start_price) * 100 \
                if ndx_start_price and ndx_end_price else None

            last_price = end_price
            market_cap = last_price * shares_outstanding if last_price and shares_outstanding else None

            return_on_assets = (net_income / total_assets) * 100 if net_income and total_assets else None
            return_on_equity = (net_income / total_equity) * 100 if net_income and total_equity else None
            profit_margin = (net_income / revenue) * 100 if net_income and revenue else None
            current_ratio = current_assets / current_liabilities if current_assets and current_liabilities else None
            price_to_book = market_cap / total_equity if market_cap and total_equity else None
            price_to_sales = market_cap / revenue if market_cap and revenue else None
            enterprise_to_revenue = (market_cap + total_debt - total_cash) / revenue \
                if market_cap and total_debt and total_cash and revenue else None
            operating_margin = (operating_income / revenue) * 100 if operating_income and revenue else None

            data[t] = {
                "Price to Book Ratio": price_to_book,
                "Profit Margins": profit_margin,
                "Enterprise to Revenue": enterprise_to_revenue,
                "Market Cap": market_cap,
                "Revenue Growth (%)": revenue_growth,
                "Return on Assets (%)": return_on_assets,
                "Trailing EPS": trailing_eps,
                "Return on Equity (%)": return_on_equity,
                "Current Ratio": current_ratio,
                "Price to Sales Ratio": price_to_sales,
                "Operating Margin (%)": operating_margin,
                "Annual Stock Return (%)": return_on_stock,
                "NDX Annual Return (%)": ndx_annual_return
            }

        except Exception as e:
            data[t] = {"Error": str(e)}

    return data

In [None]:
#feature_order = X.columns.tolist()
feature_order = ['return_on_stock', 'NDX_annual_return', 'Price to Book Ratio',
       'Profit Margins', 'Enterprise to Revenue', 'Market Cap',
       'Revenue Growth', 'Return on Assets', 'Trailing EPS',
       'Return on Equity', 'Current Ratio', 'operating margin', 'DGS10',
       'PCEPILFE', 'INDPRO', 'RBUSBIS', 'M2SL', 'GDP_Growth_Rate', 'UNRATE',
       'FEDFUNDS', 'USEPUINDXD', 'Cluster_0', 'Cluster_1', 'Cluster_2',
       'Cluster_3', 'Cluster_4', 'Profit_Margin_Zscore_Market', 'PB_Zscore_Market', 'year_sin', 'year_cos']

In [None]:
def data_preprocessing(new_data, historical_data):
    """
    Preprocess new input data for prediction using prior cluster labels and historical statistics.

    Steps:
    - Load existing cluster one-hot labels from CSV (no KMeans re-run).
    - Fill missing financial values using historical market averages (no NMF transform).
    - Encode time features and calculate financial Z-scores.
    - Align feature columns to match training data format.

    Parameters:
        new_data (pd.DataFrame): New observation(s) to preprocess.
        historical_data (pd.DataFrame): Historical panel data used to compute market-level stats.

    Returns:
        pd.DataFrame: Preprocessed feature-aligned data ready for prediction.
    """
    # Step 1: Load existing cluster labels
    cluster_labels = pd.read_csv("/content/drive/My Drive/cluster_labels.csv")

    # Step 2: Ensure ID format consistency
    cluster_labels["ID"] = cluster_labels["ID"].astype(str)
    new_data["ID"] = new_data["ID"].astype(str)

    # Step 3: Merge cluster labels and apply one-hot encoding
    new_data = new_data.merge(cluster_labels, on="ID", how="left")
    df_encoded = pd.get_dummies(new_data, columns=["Cluster"], prefix="Cluster", dtype=int)

    # Ensure Cluster_0 ~ Cluster_4 all exist
    for col in [f"Cluster_{i}" for i in range(5)]:
        if col not in df_encoded.columns:
            df_encoded[col] = 0

    new_data = df_encoded

    # Step 4: Encode year as cyclic features
    new_data["year"] = new_data["year"].astype(int)
    new_data["year_sin"] = np.sin(2 * np.pi * new_data["year"] / 14)
    new_data["year_cos"] = np.cos(2 * np.pi * new_data["year"] / 14)
    historical_data["year"] = historical_data["year"].astype(int)

    # Step 5: Compute past 3 years’ market average and std
    def get_past_3_years_stats(current_year, historical_df):
        past = historical_df[
            (historical_df["year"] >= current_year - 3) &
            (historical_df["year"] < current_year)
        ]
        avg = past[["Profit Margins", "Price to Book Ratio"]].mean()
        std = past[["Profit Margins", "Price to Book Ratio"]].std()
        return avg, std

    current_year = new_data["year"].values[0]
    market_avg, market_std = get_past_3_years_stats(current_year, historical_data)

    # Step 6: Compute market-relative Z-scores
    new_data["Profit_Margin_Zscore_Market"] = (
        (new_data["Profit Margins"] - market_avg["Profit Margins"]) / market_std["Profit Margins"]
    )
    new_data["PB_Zscore_Market"] = (
        (new_data["Price to Book Ratio"] - market_avg["Price to Book Ratio"]) / market_std["Price to Book Ratio"]
    )

    # Step 7: Handle NaNs and infinities in Z-scores
    for col in ["Profit_Margin_Zscore_Market", "PB_Zscore_Market"]:
        new_data[col] = new_data[col].replace([np.inf, -np.inf], 0).fillna(0)

    # # Step 8: Drop unnecessary columns
    # new_data.drop(
    #     columns=["Profit Margins", "Price to Book Ratio", "Price to Sales Ratio", "ID"],
    #     inplace=True,
    #     errors="ignore"
    # )

    # Step 9: Reorder columns to match training set
    new_data = new_data.reindex(columns=feature_order)

    print("🔍 Processed new data shape:", new_data.shape)
    return new_data

In [None]:
feature_name_map = {
    'return_on_stock': 'Stock Annual Return (%)',
    'NDX_annual_return': 'NASDAQ 100 Annual Return(%)',
    'Price to Book Ratio': 'Price-to-Book Ratio',
    'Profit Margins': 'Net Profit Margin (%)',
    'Enterprise to Revenue': 'Enterprise Value to Revenue',
    'Market Cap': 'Market Capitalization',
    'Revenue Growth': 'Revenue Growth Rate (%)',
    'Return on Assets': 'Return on Assets (%)',
    'Trailing EPS': 'Trailing Earnings per Share',
    'Return on Equity': 'Return on Equity (%)',
    'Current Ratio': 'Current Ratio',
    'operating margin': 'Operating Margin (%)',
    'DGS10': '10-Year Treasury Yield yoy',
    'PCEPILFE': 'Core Personal Consumption Expenditures yoy',
    'INDPRO': 'Industrial Production Index yoy',
    'RBUSBIS': 'Real Exchange Rate yoy',
    'M2SL': 'Money Supply M2 yoy',
    'GDP_Growth_Rate': 'GDP Growth Rate yoy',
    'UNRATE': 'Unemployment Rate yoy',
    'FEDFUNDS': 'Federal Funds Rate yoy',
    'USEPUINDXD': 'Economic Policy Uncertainty Index yoy',
    'Cluster_0': 'Cluster Group 0',
    'Cluster_1': 'Cluster Group 1',
    'Cluster_2': 'Cluster Group 2',
    'Cluster_3': 'Cluster Group 3',
    'Cluster_4': 'Cluster Group 4',
    'Profit_Margin_Zscore_Market':'Profit Margin Zscore transformation',
    'PB_Zscore_Market': 'Price to Book Zscore transformation',
    'year_sin': 'Year (sin encoded)',
    'year_cos': 'Year (cos encoded)'
}

In [None]:
def predictor(ticker, year):
    """
    Predict whether a company's stock will outperform the market in the given year.
    Uses historical labels if available; otherwise, performs model inference and SHAP-based explanation.

    Parameters:
        ticker (str): Stock ticker symbol.
        year (int): Target prediction year.

    Returns:
        str: A formatted performance report.
    """
    historical_data_path = '/content/drive/My Drive/historical_data.csv'
    historical_data = pd.read_csv(historical_data_path)

    # Use label if available for year - 1
    year_to_check = year - 1
    if 2010 <= year_to_check <= 2023:
        filtered_data = historical_data[
            (historical_data['ID'] == ticker) &
            (historical_data['year'] == year_to_check)
        ]

        if not filtered_data.empty:
            actual_label = filtered_data['Y'].iloc[0]
            result = "Outperformed the market 🚀" if actual_label == 1 else "Underperformed the market 📉"
            # report = (
            #     f"📢 Performance Report\n"
            #     f"----------------------------\n"
            #     f"🏢 Company: {ticker}\n"
            #     f"📅 Year: {year}\n"
            #     f"📊 Result: {result}\n"
            #     f"----------------------------\n"
            # )
            line = "─" * 30
            report = f"""
                📢 Performance Report
                {line}
                🏢 Company: {ticker}
                📅 Year: {year}
                📊 Result: {result}
                {line}
            """

            #print(report)
            return report
        else:
            return "Unable to determine performance (no historical label)."

    # If no historical label, proceed with prediction
    X_fred_dict = get_fred_annual_avg(year - 1)
    X_yfinance_dict = get_annual_financial_ratios(ticker, year - 1)[ticker]
    X_merged_dict = {**X_fred_dict, **X_yfinance_dict, "year": year - 1, "ID": ticker}
    X_new = pd.DataFrame([X_merged_dict])

    # Preprocess
    processed_new_data = data_preprocessing(X_new, historical_data)

    # Load model
    with open("/content/drive/My Drive/final_model.pkl", "rb") as file:
        loaded_model = pickle.load(file)

    # Predict
    predictions = loaded_model.predict(processed_new_data)
    binary_prediction = int(np.round(predictions[0]))

    # SHAP explanation
    explainer = shap.Explainer(loaded_model)
    shap_values = explainer(processed_new_data)
    values = shap_values.values[0]
    features = processed_new_data.columns
    inputs = processed_new_data.iloc[0]

    # Top 3 features
    mean_abs_shap = np.abs(values)
    top_idx = np.argsort(mean_abs_shap)[-3:][::-1]

    explanations = []
    for i in top_idx:
        name = features[i]
        shap_val = values[i]
        input_val = inputs[name]
        full_name = feature_name_map.get(name, name)
        direction = "🟢" if shap_val > 0 else "🔴"
        description = (
            f"{direction} {full_name} = {input_val:.2f}. \n\t"
            f"This value pushed the model’s decision toward "
            f"{'outperformance' if shap_val > 0 else 'underperformance'}, "
            f"contributing a SHAP impact of {abs(shap_val):.4f}."
        )
        explanations.append(description)

    result = "Outperformed the market 🚀" if binary_prediction == 1 else "Underperformed the market 📉"
    # report = (
    #     f"📢 Performance Report\n"
    #     f"----------------------------\n"
    #     f"🏢 Company: {ticker}\n"
    #     f"📅 Year: {year}\n"
    #     f"📊 Predicted Result: {result}\n"
    #     f"🔍 Key contributing factors (SHAP):\n"
    #     f"  {explanations[0]}\n"
    #     f"  {explanations[1]}\n"
    #     f"  {explanations[2]}\n"
    #     f"----------------------------\n"
    # )
    line = "─" * 30
    report = f"""
        📢 Performance Report
        {line}
        🏢 Company: {ticker}
        📅 Year: {year}
        📊 Predicted Result: {result}
        🔍 Key contributing factors (SHAP):
          {explanations[0]}
          {explanations[1]}
          {explanations[2]}
        {line}
    """

    #print(report)
    return report

# **8. Implement User Personalization**

In [None]:
def ask_for_missing_info(user_input, company, year, intent_category, history):
    client = openai.OpenAI(api_key=openai.api_key)

    # Recover from previous entries in history
    last_company = None
    last_year = None
    for entry in reversed(history):
        if not company and "company" in entry:
            last_company = entry["company"]
        if not year and "year" in entry:
            last_year = entry["year"]
        if last_company and last_year:
            break

    if not company and last_company:
        company = last_company
    if not year and last_year:
        year = last_year

    # Determine what is still missing
    missing_info = []
    if not company:
        missing_info.append("company")
    if not year and intent_category in [0, 2]:  # e.g., structured or predictive question
        missing_info.append("year")

    if not missing_info:
        return None  # Nothing missing

    # Ask user in natural way
    prompt = "You are a financial assistant. The user is asking about financial data, but some key information is missing.\n"
    if "company" in missing_info:
        prompt += "- Ask which company the user is referring to.\n"
    if "year" in missing_info:
        prompt += "- Ask which year's data they are looking for.\n"
    prompt += "Respond naturally and in a conversational way."

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": prompt},
            {"role": "user", "content": user_input}
        ],
        temperature=0.5
    )

    return response.choices[0].message.content.strip()

In [None]:
def detect_correction(user_input, history, company=None, year=None, intent_category=None):
    client = openai.OpenAI(api_key=openai.api_key)

    # Get the last 3 user messages from the history as correction candidates
    user_messages = [m["content"] for m in history if m["role"] == "user"]
    previous_candidates = user_messages[-3:]

    # Build the system prompt with context to guide the model's reasoning
    system_prompt = f"""You are an AI assistant that helps detect corrections or clarifications in follow-up user inputs.

    Use the provided context to help rewrite vague or incomplete queries.
    The user may refer to a company using pronouns like "its" or "their". Replace such pronouns with the correct company name if applicable.

    When the user's message is just a year, a pronoun, or otherwise too short to be complete,
    you should complete it based on the previously detected **intent category**:

    - If intent category is 0 (structured data), assume the user is asking about EPS.
    - If intent category is 1 (unstructured analysis), assume the user is asking about business strategies.
    - If intent category is 2 (market prediction), assume the user is asking for performance prediction.

    Context:
    - Previously detected company: {company.get("company") if company else "None"}
    - Previously detected year: {year.get("year") if year else "None"}
    - Previously detected intent category: {intent_category if intent_category is not None else "None"}

    If the user's latest message is intended to correct, add to, or clarify their previous question, rewrite a complete, standalone question using the provided context if helpful.

    If not, respond with "NO CHANGE".

    Examples:
    - Previous: "What was Apple's revenue?"
      User: "I meant 2021"
      Response: "What was Apple's revenue in 2021?"

    - Previous: "How did Netflix perform?"
      User: "2022"
      Response: "How did Netflix perform in 2022?"

    - Previous: "please tell me their EPS"
      Context company: Amazon
      Response: "Please tell me Amazon's EPS"

    If no correction is needed, respond with exactly: NO CHANGE
    """

    # Check each candidate message in reverse order to find possible corrections
    for prev_msg in reversed(previous_candidates):
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": f'Previous: "{prev_msg}"\nUser: "{user_input}"'}
            ],
            temperature=0.3
        )

        corrected_query = response.choices[0].message.content.strip()

        # If GPT rewrote the query, extract updated metadata
        if corrected_query.lower() != "no change":
            new_company = handle_company_query(corrected_query)
            new_year = handle_year_query(corrected_query)
            new_intent = classify_intent(corrected_query)

            return {
                "query": corrected_query,
                "company": new_company if not isinstance(new_company, str) else company,
                "year": new_year if not isinstance(new_year, str) else year,
                "intent_category": new_intent if new_intent is not None else intent_category
            }

    # If no correction detected, return original input and context
    return {
        "query": user_input,
        "company": company,
        "year": year,
        "intent_category": intent_category
    }

In [None]:
def ask_follow_up():
    """
    Politely ask the user if they have any other questions after receiving a response.

    Uses GPT to generate a natural follow-up prompt.
    """
    client = openai.OpenAI(api_key=openai.api_key)

    follow_up_prompt = """You are an AI assistant. The user has just received an answer.
    Politely ask them if they have any other questions.

    Example responses:
    - "Do you have any other questions I can help with?"
    - "Let me know if you need anything else!"
    """

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "system", "content": follow_up_prompt}],
        temperature=0.5
    )

    return response.choices[0].message.content.strip()

In [None]:
def get_loading_text(user_input):
    """
    Generate a localized loading message based on the user's input language.

    If the detected language is not English, the default spinner message will be translated.
    """
    default_message = "Thinking..."

    # Detect user language and translate the loading message if needed
    user_lang = safe_detect_language(user_input)
    spinner_text = translate_to_language(default_message, user_lang) if user_lang != "en" else default_message

    return gr.update(value=spinner_text, visible=True)

# **9. Launch Gradio Interface**

In [None]:
def chatbot_response(user_input, history, state):
    if history is None:
        history = []

    if state is None:
        state = {
            "last_company": None,
            "last_year": None,
            "last_intent": None,
            "internal_history": []
        }
    elif "internal_history" not in state:
        state["internal_history"] = []

    user_lang = safe_detect_language(user_input)

    # ---------- Correction / Clarification ----------
    correction_result = detect_correction(
        user_input,
        history,
        state.get("last_company"),
        state.get("last_year"),
        state.get("last_intent")
    )

    corrected_query = correction_result["query"]
    english_text = translate_to_english(corrected_query) if user_lang != "en" else corrected_query

    # Update state
    state["last_company"] = correction_result["company"]
    state["last_year"] = correction_result["year"]
    state["last_intent"] = correction_result["intent_category"]

    # Log correction for debug (optional)
    if corrected_query != user_input:
        print(f'🔁 Corrected/Rewritten query: "{corrected_query}"')
    else:
        print(f'✅ No correction applied, using original: "{user_input}"')

    print("📦 Detected company:", correction_result["company"])
    print("📆 Detected year:", correction_result["year"])
    print("📚 Detected intent category:", correction_result["intent_category"])

    # ---------- Company Processing ----------
    company_response = handle_company_query(english_text)
    if isinstance(company_response, str):
        if state.get("last_company"):
            company_result = state["last_company"]
        else:
            response = translate_to_language(company_response, user_lang) if user_lang != "en" else company_response
            history.append({"role": "user", "content": user_input})
            history.append({"role": "assistant", "content": response})
            return "", history, state
    else:
        company_result = company_response
        state["last_company"] = company_result

    # ---------- Year Processing ----------
    year_response = handle_year_query(english_text)
    if isinstance(year_response, str):
        if state.get("last_year"):
            year_result = state["last_year"]
        else:
            response = translate_to_language(year_response, user_lang) if user_lang != "en" else year_response
            history.append({"role": "user", "content": user_input})
            history.append({"role": "assistant", "content": response})
            return "", history, state
    else:
        year_result = year_response
        state["last_year"] = year_result

    # ---------- Intent Classification ----------
    intent_category = classify_intent(english_text)
    if intent_category == 3 and state.get("last_intent") in [0, 1, 2]:
        if len(english_text.strip()) <= 15:
            intent_category = state["last_intent"]

    # ---------- Ask for Missing Info ----------
    missing_info_response = ask_for_missing_info(
        english_text, company_result, year_result, intent_category, state["internal_history"]
    )
    if missing_info_response:
        final_response = translate_to_language(missing_info_response, user_lang) if user_lang != "en" else missing_info_response
        history.append({"role": "user", "content": user_input})
        history.append({"role": "assistant", "content": final_response})
        return "", history, state

    # ---------- Year Required But Missing ----------
    if intent_category in [0, 2] and not year_result:
        response = "Please specify a valid year for this request."
        final_response = translate_to_language(response, user_lang) if user_lang != "en" else response
        history.append({"role": "user", "content": user_input})
        history.append({"role": "assistant", "content": final_response})
        return "", history, state

    # ---------- Execute Query ----------
    if intent_category == 0:  # Structured
        ticker = company_result["ticker"]
        year = year_result["year"]
        if year >= 2025:
            response = "Sorry, the 2025 financial report has not been released yet."
        else:
            response = get_structured_data(english_text)

    elif intent_category == 1:  # Unstructured
        year = year_result.get("year") if year_result else None
        if year and year >= 2025:
            response = "Sorry, the 2025 financial report has not been released yet."
        else:
            response = query_rag(english_text)

    elif intent_category == 2:  # Prediction
        ticker = company_result["ticker"]
        year = year_result["year"]
        response = "English: " + predictor(ticker, year)

    elif intent_category == 3:
        response = "This question does not belong to financial data, financial analysis, or market predictions. Please check your query."
    else:
        response = "Unable to process your request. Please refine your question."

    # ---------- Follow-up ----------
    follow_up_response = ask_follow_up()
    if follow_up_response and follow_up_response not in response:
        response += "\n\n" + follow_up_response

    # ---------- Finalize ----------
    final_response = translate_to_language(response, user_lang) if user_lang != "en" else response

    # Append to user-visible chat history
    history.append({"role": "user", "content": user_input})
    history.append({"role": "assistant", "content": final_response})

    # Append corrected version to internal (invisible) history
    state["internal_history"].append({"role": "user", "content": corrected_query})
    state["internal_history"].append({"role": "assistant", "content": final_response})

    # Update intent tracking
    state["last_intent"] = intent_category

    return "", history, state

In [None]:
with gr.Blocks(theme="soft") as demo:
    # App title and description
    gr.Markdown("# FinScope 3D")
    gr.Markdown("Data Retrieval × Document Analysis × Data-Driven Prediction.<br><strong>For NASDAQ-100 companies within 2020–2025!</strong>")
    gr.Markdown("Multilingual: Ask your question in English, Chinese, Japanese, or Korean.")
    gr.Markdown("We support three types of financial questions:")
    gr.Markdown("1. <strong>Data Retrieval:</strong> Ask about specific numbers in financial reports, e.g., revenue, EPS, or profit.")
    gr.Markdown("2. <strong>Document Analysis:</strong> Ask about business strategies, risks, or management discussions.")
    gr.Markdown("3. <strong>Data-Driven Prediction:</strong> Get a general forecast on whether a company's stock is likely to outperform or underperform the market — no exact prices provided.")

    # Persistent memory state
    state = gr.State({
        "last_company": None,
        "last_year": None,
        "last_intent": None
    })

    # Chat interface
    chatbot = gr.Chatbot(height=500, label="Chatbot", type="messages")

    # Loading spinner (hidden by default)
    loading = gr.Markdown(visible=False)

    # User input textbox
    msg = gr.Textbox(
        placeholder="I can answer financial questions like 'What was Apple's EPS in 2021?'",
        label="Your Message",
        interactive=True
    )

    # Submit & Clear buttons
    enter_button = gr.Button("Enter", variant="primary")
    clear = gr.Button("Clear", variant="secondary")

    # === Bind Enter button ===
    enter_button.click(
        get_loading_text,
        inputs=msg,
        outputs=loading
    ).then(
        chatbot_response,
        inputs=[msg, chatbot, state],
        outputs=[msg, chatbot, state]
    ).then(
        lambda: gr.update(visible=False),  # Hide spinner after response
        outputs=loading
    )

    # === Bind Enter key (keyboard) ===
    msg.submit(
        get_loading_text,
        inputs=msg,
        outputs=loading
    ).then(
        chatbot_response,
        inputs=[msg, chatbot, state],
        outputs=[msg, chatbot, state]
    ).then(
        lambda: gr.update(visible=False),
        outputs=loading
    )

    # === Clear button logic ===
    clear.click(
        lambda: ([], "", {
            "last_company": None,
            "last_year": None,
            "last_intent": None
        }),
        outputs=[chatbot, msg, state]
    )

In [None]:
demo.launch(share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://58e7f23e7a3746c00a.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


✅ No correction applied, using original: "what was the profit rate in 2023 in apple"
📦 Detected company: None
📆 Detected year: None
📚 Detected intent category: None
Detected company by NER: apple
Exact match found: AAPL
Extracted year: 2023
No company detected by NER.


In [None]:
# Type Sample
"What was Apple's revenue in 2023?"
"please tell me apple's business model"
"can you tell me its eps?"
"2022"
"What was Apple's stock return in 2023?"
"How is Apple expected to perform in 2025?"

'How is Apple expected to perform in 2025?'