In [None]:
# Import packages
import duckdb
import pandas as pd
import json
from typing import Any, Dict, List




In [None]:
# connect to in-memory DuckDB
conn = duckdb.connect(':memory:')

trades_df = pd.read_csv('trades.csv')
conn.register('trades', trades_df)

holdings_df = pd.read_csv('holdings.csv')
conn.register('holdings', holdings_df)

print("Data loaded ")


In [None]:
# Define schema metadata
SCHEMA_METADATA = {
    "trades": {
        "description": "Transaction records of buy/sell/cover trades executed in various funds",
        "columns": {
            "id": "Unique trade identifier",
            "RevisionId": "Trade revision number",
            "TradeTypeName": "Type of trade (Buy, Sell, Sell Short, Buy Fixed/Floating Rate, Cover Short, Buy Protection)",
            "SecurityId": "Unique identifier for security/asset",
            "SecurityType": "Type of security (Equity, Bond, etc.)",
            "Name": "Security name/description",
            "Ticker": "Stock/Security ticker symbol",
            "TradeDate": "Date trade was executed",
            "SettleDate": "Date trade settled",
            "Quantity": "Number of shares/units traded",
            "Price": "Price per unit at trade",
            "Principal": "Total transaction value (Quantity × Price)",
            "Interest": "Interest accrued/paid on trade",
            "TotalCash": "Total cash flow for trade",
            "PortfolioName": "Fund/Portfolio name",
            "CustodianName": "Custodian/broker handling the trade",
            "StrategyName": "Investment strategy",
            "Counterparty": "Entity on other side of trade",
            "AllocationRule": "How trade is allocated across sub-portfolios"
        },
        "key_filters": ["PortfolioName", "TradeTypeName", "SecurityType", "Counterparty"]
    },
    "holdings": {
        "description": "Current positions held in each fund as of a specific date",
        "columns": {
            "AsOfDate": "Date for which holdings are reported",
            "PortfolioName": "Fund/Portfolio name",
            "SecurityId": "Unique identifier for security/asset",
            "SecurityTypeName": "Type of security (Bond, Equity, etc.)",
            "SecName": "Security identifier/ISIN",
            "Qty": "Current quantity held",
            "Price": "Current price per unit",
            "FXRate": "Foreign exchange rate for currency conversion",
            "MV_Local": "Market value in local currency (Qty × Price)",
            "MV_Base": "Market value in base currency (MV_Local × FXRate)",
            "PL_DTD": "Profit/Loss Day-To-Date",
            "PL_QTD": "Profit/Loss Quarter-To-Date",
            "PL_MTD": "Profit/Loss Month-To-Date",
            "PL_YTD": "Profit/Loss Year-To-Date (cumulative gain/loss from start of year)",
            "StartQty": "Quantity at start of year",
            "StartPrice": "Price at start of year",
            "StartFXRate": "FX rate at start of year"
        },
        "key_filters": ["PortfolioName", "SecurityTypeName", "AsOfDate"]
    }
}

trades_funds = conn.execute("SELECT DISTINCT PortfolioName FROM trades ORDER BY PortfolioName").fetchall()
holdings_funds = conn.execute("SELECT DISTINCT PortfolioName FROM holdings ORDER BY PortfolioName").fetchall()

print(f" Funds in TRADES data ({len(trades_funds)}):")
for fund in trades_funds:
    print(f"  - {fund[0]}")
    
print(f"\n Funds in HOLDINGS data ({len(holdings_funds)}):")
for fund in holdings_funds:
    print(f"  - {fund[0]}")


In [None]:
# making openai client
from openai import OpenAI
import os
from dotenv import load_dotenv

load_dotenv()

openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))



In [None]:
# decide wich data source to use with help of llm
def decide_data_source(user_question: str) -> Dict[str, Any]:
    prompt = f"""You are a financial data expert. Analyze this question and decide which data source(s) to use.

Question: {user_question}

Available sources:
1. TRADES - Contains buy/sell transaction history (649 records)
2. HOLDINGS - Contains current positions and P&L data (1,022 records)
3. BOTH - If you need information from both tables
4. you have a list of funds:{trades_funds} and you have list of holdings funds:{holdings_funds}.
     always try to match user input with these fund names or holding names.
5. dont forget to handle case sensitivity and incorrect wordings in user question.
6. you should know which table to choose based on user question.
7. if user question is related to trades, trade types, trade dates, quantities, prices, counterparties, etc then choose "trades" table.
8. if user question is related to holdings, quantities held, market values, profit/loss, as of dates, etc then choose "holdings" table.
Respond in JSON format ONLY:
{{
    "sources": ["trades"] or ["holdings"] or ["trades", "holdings"],
    "reasoning": "Brief explanation of why these sources"
}}

Be concise. Do not include any text outside the JSON."""
    
    response = openai.chat.completions.create(
        model="gpt-4o-mini",
        max_tokens=500,
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"}
    )
    raw = response.choices[0].message.content
    try:
        result = json.loads(raw)
        return result
    except json.JSONDecodeError:
        print(f"Failed to parse LLM response: {response.content[0].text}")
        return {"sources": ["trades", "holdings"], "reasoning": "Default to both sources"}


# Test the function
test_question = "How many trades did HoldCo 1 execute?"
print(f"\nTest Question: {test_question}")
result = decide_data_source(test_question)
print(f"\nDecision:")
print(f"  Sources: {result['sources']}")
print(f"  Reasoning: {result['reasoning']}")

In [None]:
# genrate sql query 
def generate_sql(user_question: str, sources: List[str]) -> Dict[str, Any]:

    schema_context = ""

    if "trades" in sources:
        schema_context += f"""
TRADES TABLE:
{json.dumps(SCHEMA_METADATA['trades'], indent=2)}
"""
    
    if "holdings" in sources:
        schema_context += f"""
HOLDINGS TABLE:
{json.dumps(SCHEMA_METADATA['holdings'], indent=2)}
"""
    
    prompt = f"""You are a SQL expert for financial data. Generate a SQL query to answer this question.

{schema_context}

Question: {user_question}

VERY IMPORTANT RULES (HARD):
RULES:
1. Use DuckDB SQL syntax - NO PostgreSQL syntax
2. NEVER use ILIKE, use LOWER() instead
3. NEVER use ARRAY or ANY - just use WHERE LOWER(col) = LOWER('value')
4. For top/best funds by profit: SELECT PortfolioName, SUM(PL_YTD) FROM holdings GROUP BY PortfolioName ORDER BY 2 DESC LIMIT 3
5. when asked about profit do not give answer in negative values. profit always should be positive. when claculating profit always make sure total profit is positive.
6. Return JSON: {{"sql": "...", "explanation": "..."}}


IMPORTANT RULES:
1. Only use tables and columns from the schema above
2. Use DuckDB SQL syntax
3. For aggregations, use appropriate GROUP BY
4. For date filtering, handle NULL values gracefully
5. Return results in a logical order
6. Include column names that make sense to business users
7. If multiple tables are used, join on relevant keys
8. Always check if user has entered a word incorrectly or incorrect format or there is LOWER() UPEER() ISSUE.
   example: 1) if user entered "ytd" instead of "YTD", handle it in SQL,
            2) if user entered "Holdco 1" instead of "HoldCo 1", handle it in SQL.
            3) if user entered "sell short" instead of "Sell Short", handle it in SQL.
            4) if user enterd "platpot" instead of "Platpot", handle it in SQL.
     in short you should take care of case sensitivity and incorrect wordings in user question.        
     you have a list of funds:{trades_funds} and you have list of holdings funds:{holdings_funds}.
     always try to match user input with these fund names or holding names.
     dont forget to handle these issues in SQL query.
9.always genrate sql based on user question and chosen data sources.do not answer user question directly on you own.
10.Use standard SQL syntax compatible with Postgres or your DB.
11.Avoid ANY/ALL subquery with `~~*`.
12. Use simple IN or ILIKE filters for fund names.
13.Return valid SQL only.

Respond in JSON format ONLY:
{{
    "sql": "SELECT ... FROM ...",
    "explanation": "What this query does"
}}

Do not include any text outside the JSON. Do not use markdown code blocks."""
    
    response = openai.chat.completions.create(
        model="gpt-4o-mini",
        max_tokens=500,
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"}
    )
    
    raw = response.choices[0].message.content
    try:
        result = json.loads(raw)
        return result
    except json.JSONDecodeError as e:
        print(f"Failed to parse SQL generation response: {raw}")
        return {"sql": "", "explanation": "Failed to generate SQL"}


# Test the function
test_sources = ["trades"]
print(f"\nGenerating SQL for: {test_question}")
print(f"Using sources: {test_sources}")
sql_result = generate_sql(test_question, test_sources)
print(f"\nGenerated SQL:")
print(sql_result['sql'])
print(f"\nExplanation: {sql_result['explanation']}")

In [None]:
def execute_query(sql: str) -> Dict[str, Any]:

    try:
        result = conn.execute(sql).fetchall()
        columns = [desc[0] for desc in conn.execute(sql).description] if result else []
        
      
        data = []
        if columns:
            conn_for_desc = duckdb.connect(':memory:')
            conn_for_desc.register('trades', trades_df)
            conn_for_desc.register('holdings', holdings_df)
            result_with_desc = conn_for_desc.execute(sql).fetchall()
            columns = [desc[0] for desc in conn_for_desc.execute(sql).description]
            data = [dict(zip(columns, row)) for row in result_with_desc]
        
        return {
            "success": True,
            "data": data,
            "row_count": len(data),
            "error": None
        }
    except Exception as e:
        return {
            "success": False,
            "data": [],
            "row_count": 0,
            "error": str(e)
        }


# Test execution

exec_result = execute_query(sql_result['sql'])

if exec_result['success']:
    print(f"Query executed successfully")
    print(f"Rows returned: {exec_result['row_count']}")
    if exec_result['data']:
        print(f"\nSample results:")
        for row in exec_result['data'][:5]:
            print(f"  {row}")
else:
    print(f"Query failed: {exec_result['error']}")

In [None]:
def explain_results(user_question: str, query_data: Dict[str, Any]) -> str:


    if not query_data['success']:
        return f"Sorry, I couldn't find the answer. Error: {query_data['error']}"

    if query_data['row_count'] == 0:
        return "Sorry, I cannot find the answer to your question. No matching data found."

    data_str = json.dumps(query_data['data'], indent=2)

    prompt = f"""You are a financial analyst explaining query results to business users.


Original Question: {user_question}

Query Results ({query_data['row_count']} rows):
{data_str}

Provide a clear, concise explanation:
1. Answer the question directly
2. Include specific numbers and fund names
3. Highlight significant findings
4. Keep it under 150 words
5. Use business-friendly language
6.Provide a clear natural language answer with fund names and their profits or losses.
7.Do NOT use any asterisks, bold, italics, or other markdown.
8. Keep it human-readable, as plain text.
9. Include numbers exactly from the query results.

Do NOT mention SQL queries or data structures.
"""

    response = openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=500,
        temperature=0
    )

    raw = response.choices[0].message.content
    return raw


# Test explanation
print(f"\nGenerating natural language explanation...")
explanation = explain_results(test_question, exec_result)
print(f"\nAnswer:")
print(explanation)

In [None]:
# genrate natural response.
def financial_chatbot(user_question: str) -> str:
    
    
    print(f"QUESTION: {user_question}")
   
    #  Decide data source
    source_decision = decide_data_source(user_question)
    
    #  Gen. SQL
    sql_gen = generate_sql(user_question, source_decision['sources'])
    if not sql_gen['sql']:
        return " Failed to generate SQL query"
    
    # Execute query
    exec_result = execute_query(sql_gen['sql'])
    if not exec_result['success']:
        print(f"   Query execution failed: {exec_result['error']}")
        return f" Sorry, I cannot find the answer. (Error: {exec_result['error']})"
   
    # explanation
    final_answer = explain_results(user_question, exec_result)
    print("\n ANSWER:")
   
    print(final_answer)
    
    return final_answer


print("\n Chatbot function ready!")


In [None]:
# Run test questions
test_questions = [
    "How many trades did HoldCo 1 execute?",
    "Which fund has the best YTD performance?",
    "How many holdings does ytum have?",
    "give me best 3 funds by profit to year to date",
]


print(" test \n")

for i, question in enumerate(test_questions, 1):
    print(f"\n\n TEST {i}/{len(test_questions)}")
    financial_chatbot(question)
    print("\n")

In [None]:
import ipywidgets as widgets
from IPython.display import display

input_box = widgets.Text(
    placeholder='Ask a question about the funds...',
    layout=widgets.Layout(width='80%')
)

output = widgets.Output()

def on_submit(sender):
    with output:
        print(f"> {sender.value}")
        financial_chatbot(sender.value)
        sender.value = ""

input_box.on_submit(on_submit)

display(input_box, output)
