In [12]:
%%writefile requirements.txt
pandas>=1.3.0
numpy>=1.21.0
requests>=2.26.0
sqlalchemy>=1.4.0
pydantic-settings>=2.0.0
pydantic>=2.0.0
python-dotenv>=0.19.0
pandera>=0.6.0
pyarrow>=6.0.0
# For the forecasting notebook
prophet
statsmodels
# For database connection
pymysql

Overwriting requirements.txt


In [13]:
%%writefile config.py
"""
Configuration management with environment variables
"""
from pydantic_settings import BaseSettings
from pydantic import Field

class BaseConfig(BaseSettings):
    """Defines shared configuration for all settings classes."""
    class Config:
        env_file = '.env'
        env_file_encoding = 'utf-8'
        extra = 'ignore'

class DatabaseSettings(BaseConfig):
    """Database connection settings."""
    user: str = Field(..., alias='DB_USER')
    password: str = Field(..., alias='DB_PASSWORD')
    host: str = Field('localhost', alias='DB_HOST')
    port: int = Field(3306, alias='DB_PORT')
    database: str = Field(..., alias='DB_NAME')
    pool_size: int = Field(10, alias='DB_POOL_SIZE')
    max_overflow: int = Field(5, alias='DB_MAX_OVERFLOW')

    @property
    def url(self) -> str:
        """Constructs the database connection URL."""
        return f"mysql+pymysql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"

# Initialize the config object
db_config = DatabaseSettings()

Overwriting config.py


In [None]:
%%writefile db_loader.py
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import pandas as pd
from config import db_config
import logging
from pandera import Check, Column, DataFrameSchema
from pandera.errors import SchemaError

logger = logging.getLogger(__name__)

class DBLoader:
    def __init__(self):
        self.engine = self._create_engine()

    def _create_engine(self):
        return create_engine(
            db_config.url,
            pool_size=db_config.pool_size,
            max_overflow=db_config.max_overflow,
            pool_pre_ping=True,
            pool_recycle=3600
        )

    def validate_data(self, df: pd.DataFrame) -> bool:
        """Validates the DataFrame against a predefined schema."""
        schema = DataFrameSchema({
            "date": Column(pd.Timestamp),
            # FIX: Allow NAV to be zero or greater
            "nav": Column(float, checks=[Check.greater_than_or_equal_to(0)]),
            "scheme_code": Column(str),
            "scheme_name": Column(str)
        })
        try:
            schema.validate(df, lazy=True)
            return True
        except SchemaError as e:
            logger.error(f"Data validation failed: {e}")
            return False

    def load_to_db(self, df: pd.DataFrame, table_name: str, if_exists: str = 'append') -> bool:
        if not self.validate_data(df):
            return False
            
        try:
            with self.engine.begin() as conn:
                if if_exists == 'append' and not df.empty:
                    unique_dates = df['date'].dt.strftime('%Y-%m-%d').unique()
                    date_list_for_sql = ", ".join([f"'{d}'" for d in unique_dates])
                    
                    delete_sql = text(f"DELETE FROM {table_name} WHERE date IN ({date_list_for_sql})")
                    result = conn.execute(delete_sql)
                    logger.info(f"Removed {result.rowcount} existing records for dates being loaded.")

                df.to_sql(
                    name=table_name, con=conn, if_exists=if_exists,
                    index=False, method='multi', chunksize=1000
                )
                logger.info(f"Successfully loaded {len(df)} records to table '{table_name}' with mode '{if_exists}'.")
                return True
        except SQLAlchemyError as e:
            logger.error(f"DB Error during load to '{table_name}': {e}", exc_info=True)
            return False

Overwriting db_loader.py


In [23]:
%%writefile build_full_history_optimized.py
import requests
import pandas as pd
import logging
from tqdm import tqdm
from io import StringIO
import time
import os
import pickle
from db_loader import DBLoader
from concurrent.futures import ThreadPoolExecutor, as_completed

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
MAX_WORKERS = 20
CHECKPOINT_INTERVAL_SECONDS = 300
PROGRESS_FILE = "_progress.pkl"
ALL_FUNDS_FILE = "_all_funds.pkl"
BACKUP_FILE = "final_nav_data.parquet"

# (All the fetch functions remain the same as before)
def fetch_all_scheme_codes() -> pd.DataFrame:
    if os.path.exists(ALL_FUNDS_FILE):
        logging.info("Loading master fund list from local cache...")
        return pd.read_pickle(ALL_FUNDS_FILE)
    logging.info("Fetching master list of all fund schemes from API...")
    try:
        response = requests.get("https://api.mfapi.in/mf", timeout=30)
        response.raise_for_status()
        funds_df = pd.DataFrame(response.json())
        funds_df.to_pickle(ALL_FUNDS_FILE)
        logging.info(f"Found and cached {len(funds_df)} total schemes.")
        return funds_df
    except Exception as e:
        logging.error(f"Could not fetch the master fund list: {e}")
        return pd.DataFrame()

def fetch_one_fund_history(session: requests.Session, fund_info: dict) -> pd.DataFrame:
    scheme_code = fund_info['schemeCode']
    scheme_name = fund_info['schemeName']
    url = f"https://api.mfapi.in/mf/{scheme_code}"
    try:
        response = session.get(url, timeout=20)
        if response.status_code == 200:
            data = response.json()
            records = data.get("data")
            if data.get("status") == "SUCCESS" and records:
                df = pd.DataFrame(records)
                df['scheme_code'] = scheme_code
                df['scheme_name'] = scheme_name
                return df[['date', 'scheme_code', 'scheme_name', 'nav']]
    except requests.RequestException:
        pass
    return pd.DataFrame()


def main():
    """Main pipeline with all optimizations."""
    # Check if we have a backup file to use instead of re-downloading
    if os.path.exists(BACKUP_FILE):
        logging.warning(f"--- Found existing backup file '{BACKUP_FILE}'. Loading from it instead of re-downloading. ---")
        final_df = pd.read_parquet(BACKUP_FILE)
    else:
        # (The entire download logic is here, but will be skipped if backup exists)
        all_funds = fetch_all_scheme_codes()
        if all_funds.empty: return
        all_data_frames = []
        processed_codes = set()

        if os.path.exists(PROGRESS_FILE):
            with open(PROGRESS_FILE, 'rb') as f: all_data_frames = pickle.load(f)
            for df in all_data_frames: processed_codes.update(df['scheme_code'].unique())
            logging.info(f"Resuming with {len(processed_codes)} funds already downloaded.")
        
        remaining_funds = all_funds[~all_funds['schemeCode'].isin(processed_codes)]
        
        if not remaining_funds.empty:
            session = requests.Session()
            adapter = requests.adapters.HTTPAdapter(pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS)
            session.mount('https://', adapter)
            fund_list = remaining_funds.to_dict('records')
            last_checkpoint_time = time.time()
            
            with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                future_to_fund = {executor.submit(fetch_one_fund_history, session, fund): fund for fund in fund_list}
                for future in tqdm(as_completed(future_to_fund), total=len(fund_list), desc="Fetching Full History"):
                    result_df = future.result()
                    if not result_df.empty: all_data_frames.append(result_df)
                    if time.time() - last_checkpoint_time > CHECKPOINT_INTERVAL_SECONDS:
                        with open(PROGRESS_FILE, 'wb') as f: pickle.dump(all_data_frames, f)
                        tqdm.write("Checkpoint saved.")
                        last_checkpoint_time = time.time()
        
        with open(PROGRESS_FILE, 'wb') as f: pickle.dump(all_data_frames, f)
        if not all_data_frames:
            logging.error("CRITICAL: No data could be downloaded. Halting.")
            return

        logging.info("Combining all data sources and cleaning...")
        final_df = pd.concat(all_data_frames, ignore_index=True)
        final_df['nav'] = pd.to_numeric(final_df['nav'], errors='coerce')
        final_df['date'] = pd.to_datetime(final_df['date'], dayfirst=True, errors='coerce')
        final_df.dropna(subset=['nav', 'date', 'scheme_code', 'scheme_name'], inplace=True)
        final_df['scheme_code'] = final_df['scheme_code'].astype(str).str.strip()
        final_df.sort_values('date', ascending=True, inplace=True)
        final_df.drop_duplicates(subset=['date', 'scheme_code'], keep='last', inplace=True)
        
        logging.info(f"--- Backup --- Saving final combined data to '{BACKUP_FILE}'...")
        final_df.to_parquet(BACKUP_FILE, index=False)

    # --- FINAL PROCESSING ---
    logging.info("Final data processing and validation...")

    # --- ADD THIS LINE TO FIX THE ERROR ---
    final_df = final_df[final_df['nav'] >= 0].copy()
    
    logging.info(f"Preparing to load {len(final_df)} valid records into the database.")
    loader = DBLoader()
    success = loader.load_to_db(final_df, 'nav_data', if_exists='replace')

    if success:
        logging.info("✅ Full historical database has been built successfully!")
        if os.path.exists(PROGRESS_FILE): os.remove(PROGRESS_FILE)
        if os.path.exists(ALL_FUNDS_FILE): os.remove(ALL_FUNDS_FILE)
    else:
        logging.error("❌ Database loading failed.")

if __name__ == "__main__":
    main()

Writing build_full_history_optimized.py


In [27]:
%%writefile update_daily.py
import pandas as pd
import logging
from io import StringIO
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from db_loader import DBLoader

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def create_resilient_session():
    """
    Creates a requests session with automatic retry logic for network reliability.
    """
    session = requests.Session()
    retry_strategy = Retry(
        total=3,                # Total number of retries
        backoff_factor=1,       # Wait 1s, 2s, 4s between retries
        status_forcelist=[429, 500, 502, 503, 504], # Retry on these server errors
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("https://", adapter)
    return session

def fetch_latest_daily_data() -> pd.DataFrame:
    """
    Fetches the latest daily NAV data for all funds from AMFI using a resilient session.
    """
    logging.info("Fetching latest daily data from AMFI...")
    amfi_url = "https://www.amfiindia.com/spages/NAVAll.txt"
    session = create_resilient_session()

    try:
        response = session.get(amfi_url, timeout=30)
        response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)

        # The rest of the parsing logic remains the same
        all_lines = response.text.strip().splitlines()
        header_index = next((i for i, line in enumerate(all_lines) if "Scheme Name" in line), -1)
        if header_index == -1:
            logging.error("Could not find the header row in the AMFI data.")
            return pd.DataFrame()

        data_lines = [line for line in all_lines[header_index + 1:] if line.count(';') > 4]
        if not data_lines:
            logging.warning("No valid data lines found after the header.")
            return pd.DataFrame()

        data_string = "\n".join(data_lines)
        df = pd.read_csv(
            StringIO(data_string), sep=';', header=None, usecols=[0, 3, 4, 5],
            names=['scheme_code', 'scheme_name', 'nav', 'date']
        )

        # Clean the data
        df['nav'] = pd.to_numeric(df['nav'], errors='coerce')
        df['date'] = pd.to_datetime(df['date'], format='%d-%b-%Y', errors='coerce')
        df.dropna(subset=['nav', 'date', 'scheme_code'], inplace=True)
        df = df[df['nav'] >= 0].copy() # Filter out invalid negative NAVs
        df['scheme_code'] = df['scheme_code'].astype(str).str.strip()
        
        logging.info(f"Successfully parsed {len(df)} records from AMFI.")
        return df

    except requests.RequestException as e:
        logging.error(f"Failed to fetch data from AMFI after retries: {e}")
        return pd.DataFrame()
    except Exception as e:
        logging.error(f"An error occurred during data parsing: {e}")
        return pd.DataFrame()

def main():
    """Main function to run the daily update."""
    daily_df = fetch_latest_daily_data()
    if daily_df.empty:
        logging.warning("No daily data fetched. Exiting.")
        return

    logging.info(f"Fetched {len(daily_df)} new records. Loading to database...")
    loader = DBLoader()
    # Use 'append' mode - the loader will handle duplicates
    success = loader.load_to_db(daily_df, 'nav_data', if_exists='append')

    if success:
        logging.info("✅ Daily update completed successfully.")
    else:
        logging.error("❌ Daily update failed.")

if __name__ == "__main__":
    main()

Overwriting update_daily.py


In [25]:
%%writefile varify_data.py 
import pandas as pd
from sqlalchemy import create_engine, text
from config import db_config
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def view_data():
    """Connects to the database and displays the most recent data."""
    logging.info(f"Connecting to database '{db_config.database}'...")
    
    try:
        engine = create_engine(db_config.url)
        with engine.connect() as conn:
            logging.info("✅ Connection successful!")
            
            query = text("SELECT * FROM nav_data ORDER BY date DESC LIMIT 10")
            df = pd.read_sql(query, conn)
            
            if df.empty:
                logging.warning("Database connected, but the 'nav_data' table is empty.")
            else:
                print("\n--- 10 Most Recent Records in Your Database ---")
                print(df.to_string())

    except Exception as e:
        logging.error(f"❌ Failed to connect or query the database: {e}")

if __name__ == "__main__":
    view_data()

Writing varify_data.py


In [2]:
%%writefile Analysis_Optimized.ipynb
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Mutual Fund Analysis & Recommendation Engine\n",
    "\n",
    "This notebook provides a three-part, in-depth analysis of mutual funds:\n",
    "1.  **Comprehensive Analysis Engine:** Calculates a full statistical profile for every fund.\n",
    "2.  **Dynamic Recommendation Engine:** Recommends the best funds based on your specific investment timeline.\n",
    "3.  **Supporting Visual Analysis:** Tools for diversification and deep-diving into individual funds."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Part 1: The Analysis Engine (The \"Backend\")\n",
    "\n",
    "### 🔍 Step 1: Setup & Memory-Safe SQL Data Loading\n",
    "\n",
    "**Objective:** \n",
    "Establish a robust and memory-efficient connection to the mutual fund database. The logic first filters out non-relevant or structurally unsuitable mutual funds (e.g., closed-ended, FMPs, ETFs) using a comprehensive keyword-based exclusion list.\n",
    "\n",
    "**Why:** \n",
    "This filtering ensures we're analyzing only **open-ended, investable funds** meant for long-term investors and avoids skewed analysis from temporary or fixed-structure funds. Memory efficiency is critical for large-scale databases, especially in Jupyter environments."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- Part 1: Setup and SQL-Only Memory-Safe Data Loading (Corrected Logic) ---\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "from sqlalchemy import create_engine\n",
    "from config import db_config # Assuming config.py is in the same directory\n",
    "import matplotlib.pyplot as plt\n",
    "import seaborn as sns\n",
    "from scipy.cluster import hierarchy\n",
    "\n",
    "# --- Setup plotting and display styles ---\n",
    "sns.set_style('whitegrid')\n",
    "plt.rcParams['figure.figsize'] = (18, 9)\n",
    "pd.set_option('display.float_format', lambda x: f'{x:.2f}')\n",
    "pd.set_option('display.width', 1000)\n",
    "\n",
    "# --- MEMORY-SAFE DATA LOADING FROM SQL ---\n",
    "print(\"--- Step 1: Connecting to SQL Server to identify unsuitable funds to exclude ---\")\n",
    "try:\n",
    "    engine = create_engine(db_config.url)\n",
    "\n",
    "    # Query 1: Get the small, unique list of all fund schemes\n",
    "    all_funds_query = \"SELECT DISTINCT scheme_code, scheme_name FROM nav_data\"\n",
    "    all_funds_df = pd.read_sql(all_funds_query, engine)\n",
    "    \n",
    "    # Use the comprehensive keyword list to identify unsuitable funds\n",
    "    filter_keywords = [\n",
    "    # Closed-Ended / Fixed Term / Un-investable\n",
    "    'FMP', 'FIXED MATURITY', 'FIXED TERM', 'SERIES', 'INTERVAL FUND', \n",
    "    'CAPITAL PROTECTION', 'CLOSED ENDED', 'CLOSE ENDED', 'CLOSE-ENDED', \n",
    "    'CAP PROTECTION', 'LIMITED OFFER', 'NFO', 'MATURITY', 'TARGET MATURITY',\n",
    "    'SEGREGATED PORTFOLIO', 'LOCK-IN', 'LIMITED PERIOD',\n",
    "    # Cash-Equivalent / Extreme Low-Risk Debt (not for growth comparison)\n",
    "    'OVERNIGHT', 'LIQUID', 'ULTRA SHORT', 'ULTRA-SHORT', 'MONEY MARKET', \n",
    "    'GILT', 'ARBITRAGE', 'SHORT DURATION', 'LOW DURATION', 'CORPORATE BOND',\n",
    "    'CREDIT RISK', 'DYNAMIC BOND', 'BANKING AND PSU',\n",
    "    \n",
    "    # Other Structures\n",
    "    'ETF'\n",
    "    ]\n",
    "    keyword_pattern = '|'.join(filter_keywords)\n",
    "    \n",
    "    all_funds_df['scheme_name'] = all_funds_df['scheme_name'].astype(str)\n",
    "    unsuitable_funds = all_funds_df[all_funds_df['scheme_name'].str.contains(keyword_pattern, case=False, na=False)]\n",
    "    schemes_to_exclude = unsuitable_funds['scheme_code'].tolist()\n",
    "    \n",
    "    print(f\"Identified {len(schemes_to_exclude)} unsuitable schemes (FMPs, Fixed Term, etc.) to exclude from loading.\")\n",
    "\n",
    "    # --- Step 2: Build a specific SQL query to load ONLY the required historical data ---\n",
    "    print(\"\\n--- Step 2: Loading historical data for suitable, open-ended funds from SQL ---\")\n",
    "    if schemes_to_exclude:\n",
    "        # Create a string of scheme codes for the SQL 'NOT IN' clause\n",
    "        exclude_list_str = \", \".join([f\"'{code}'\" for code in schemes_to_exclude])\n",
    "        \n",
    "        # Query 2: The smart query that prevents MemoryErrors by excluding unsuitable funds\n",
    "        data_query = f\"SELECT * FROM nav_data WHERE scheme_code NOT IN ({exclude_list_str})\"\n",
    "        \n",
    "        df = pd.read_sql(data_query, engine, parse_dates=['date'])\n",
    "        print(f\"Successfully loaded a manageable subset of {len(df)} records.\")\n",
    "    else:\n",
    "        # If no funds are excluded, load everything (might cause MemoryError on large DBs)\n",
    "        print(\"No unsuitable funds found to exclude. Loading the entire dataset.\")\n",
    "        df = pd.read_sql(\"SELECT * FROM nav_data\", engine, parse_dates=['date'])\n",
    "\n",
    "except Exception as e:\n",
    "    print(f\"An error occurred while connecting to the database: {e}\")\n",
    "    df = pd.DataFrame(columns=['scheme_code', 'scheme_name', 'date', 'nav'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🧠 Step 2: Comprehensive Statistical Engine\n",
    "\n",
    "**Objective:** \n",
    "Build a complete historical and statistical profile for each mutual fund. We calculate historical NAV returns (1Y, 3Y, 5Y, 10Y), CAGR, fund age, and since-inception returns.\n",
    "\n",
    "**Innovations:** \n",
    "- A memory-efficient `.loc` and `.get_indexer()` based custom function is used to fetch historical NAVs—avoiding `merge_asof` which is error-prone on large data.  \n",
    "- Returns are normalized to **CAGR** format to make funds across different timeframes comparable.\n",
    "\n",
    "**Why:** \n",
    "These metrics are essential for investors to assess consistency, longevity, and performance expectations."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 📊 Step 3: Risk and Return Statistical Profile\n",
    "\n",
    "**Objective:** \n",
    "Add a quantitative risk layer to the fund analysis by calculating:\n",
    "\n",
    "- **Annual Expected Return**\n",
    "- **Annualized Volatility**\n",
    "- **Sharpe Ratio**\n",
    "\n",
    "**Why:** \n",
    "This moves us beyond raw returns into **risk-adjusted performance**—critical for understanding whether high returns came with disproportionately high volatility. Sharpe Ratio, in particular, helps to compare \"return per unit of risk\"."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 📋 Step 4: Curated Display Table\n",
    "\n",
    "**Objective:** \n",
    "Produce a **final investor-friendly table** that summarizes all key statistics and ranks funds by long-term performance (5Y returns).\n",
    "\n",
    "**Why:** \n",
    "This acts as the \"backend summary\" that feeds into both the visual layer and the recommendation engine. It’s the first high-level insight view for fund analysts or end-users."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- Part 2: The Main Analysis Engine (New, Robust Method) ---\n",
    "\n",
    "if not df.empty:\n",
    "    print(\"\\n--- Step 3: Calculating comprehensive metrics for all funds ---\")\n",
    "    df['daily_return'] = df.groupby('scheme_code')['nav'].pct_change()\n",
    "    df_sorted = df.sort_values(['scheme_code', 'date'])\n",
    "    \n",
    "    # Prepare clean, sorted base dataframes\n",
    "    first_navs = df_sorted.loc[df_sorted.groupby('scheme_code')['date'].idxmin()]\n",
    "    latest_navs = df_sorted.loc[df_sorted.groupby('scheme_code')['date'].idxmax()]\n",
    "\n",
    "    master_analysis_df = latest_navs[['scheme_code', 'scheme_name', 'date', 'nav']].rename(columns={'date': 'latest_date', 'nav': 'latest_nav'})\n",
    "    master_analysis_df = master_analysis_df.merge(first_navs[['scheme_code', 'date', 'nav']], on='scheme_code', how='inner')\n",
    "    master_analysis_df.rename(columns={'date': 'inception_date', 'nav': 'inception_nav'}, inplace=True)\n",
    "\n",
    "    # Activity Filter (remove funds that haven't reported recently)\n",
    "    last_valid_date = pd.Timestamp.now() - pd.DateOffset(days=30)\n",
    "    activity_mask = master_analysis_df['latest_date'] >= last_valid_date\n",
    "    master_analysis_df = master_analysis_df[activity_mask].copy()\n",
    "    print(f\"Final number of active, open-ended funds for analysis: {len(master_analysis_df)}\")\n",
    "\n",
    "    # --- NEW METHOD: Using an indexed lookup to avoid merge_asof ---\n",
    "    print(\"\\n--- Step 4: Calculating all historical returns (new robust method) ---\")\n",
    "    \n",
    "    # 1. Create a fast, indexed version of the historical data for lookups\n",
    "    df_indexed = df_sorted.set_index(['scheme_code', 'date'])\n",
    "\n",
    "    # 2. Define a function to get the nearest NAV for a given date\n",
    "    def get_historical_nav(scheme_code, target_date, indexed_df):\n",
    "        try:\n",
    "            # Get all dates for the specific fund\n",
    "            fund_dates = indexed_df.loc[scheme_code].index\n",
    "            # Find the index position of the nearest date\n",
    "            nearest_index_pos = fund_dates.get_indexer([target_date], method='nearest')[0]\n",
    "            # Get the actual nearest date from that position\n",
    "            nearest_date = fund_dates[nearest_index_pos]\n",
    "            # Return the NAV for that specific scheme and date\n",
    "            return indexed_df.loc[(scheme_code, nearest_date), 'nav']\n",
    "        except KeyError:\n",
    "            # This handles cases where a fund might not be in the indexed data\n",
    "            return np.nan\n",
    "\n",
    "    # 3. Apply this function for each period\n",
    "    for years in [1, 3, 5, 10]:\n",
    "        target_date_col = f'date_{years}y_ago'\n",
    "        nav_col = f'nav_{years}y_ago'\n",
    "        master_analysis_df[target_date_col] = master_analysis_df['latest_date'] - pd.DateOffset(years=years)\n",
    "        \n",
    "        master_analysis_df[nav_col] = master_analysis_df.apply(\n",
    "            lambda row: get_historical_nav(row['scheme_code'], row[target_date_col], df_indexed),\n",
    "            axis=1\n",
    "        )\n",
    "        \n",
    "    # --- Calculate CAGR using the new columns ---\n",
    "    def calculate_cagr(start_value, end_value, years):\n",
    "        if pd.isna(start_value) or pd.isna(end_value) or start_value <= 0 or years <= 0.25:\n",
    "            return np.nan\n",
    "        return ((end_value / start_value) ** (1 / years) - 1) * 100\n",
    "\n",
    "    for p in [1, 3, 5, 10]:\n",
    "        master_analysis_df[f'{p}Y Return (%)'] = master_analysis_df.apply(\n",
    "            lambda row: calculate_cagr(row[f'nav_{p}y_ago'], row['latest_nav'], p),\n",
    "            axis=1\n",
    "        )\n",
    "        \n",
    "    master_analysis_df['Fund Age (Yrs)'] = (master_analysis_df['latest_date'] - master_analysis_df['inception_date']).dt.days / 365.25\n",
    "    master_analysis_df['Since Inception Return (%)'] = master_analysis_df.apply(\n",
    "        lambda row: calculate_cagr(row['inception_nav'], row['latest_nav'], row['Fund Age (Yrs)']),\n",
    "        axis=1\n",
    "    )\n",
    "\n",
    "    # --- Statistical Profile (Risk and Expectation) ---\n",
    "    print(\"\\n--- Step 5: Calculating statistical profile ---\")\n",
    "    daily_returns_grouped = df.groupby('scheme_code')['daily_return']\n",
    "    mean_returns = daily_returns_grouped.mean().to_frame(name='mean')\n",
    "    std_returns = daily_returns_grouped.std().to_frame(name='std')\n",
    "    stats_df = mean_returns.join(std_returns).reset_index()\n",
    "    stats_df['Expected Annual Return (%)'] = stats_df['mean'] * 252 * 100\n",
    "    stats_df['Annualized Volatility (%)'] = stats_df['std'] * np.sqrt(252) * 100\n",
    "    risk_free_rate_daily = (1.04 ** (1/252)) - 1\n",
    "    stats_df['Sharpe Ratio'] = (stats_df['mean'] - risk_free_rate_daily) / stats_df['std'] * np.sqrt(252)\n",
    "    master_analysis_df = master_analysis_df.merge(stats_df[['scheme_code', 'Expected Annual Return (%)', 'Annualized Volatility (%)', 'Sharpe Ratio']], on='scheme_code', how='left')\n",
    "\n",
    "    # --- Final Display Table ---\n",
    "    print(\"\\n--- Step 6: Generating Final Report ---\")\n",
    "    display_cols = [\n",
    "        'scheme_name', 'Fund Age (Yrs)', 'Since Inception Return (%)', '10Y Return (%)', '5Y Return (%)',\n",
    "        '3Y Return (%)', '1Y Return (%)', 'Expected Annual Return (%)', 'Annualized Volatility (%)', 'Sharpe Ratio'\n",
    "    ]\n",
    "    display_df = master_analysis_df[display_cols].sort_values('5Y Return (%)', ascending=False).copy()\n",
    "    for col in ['10Y Return (%)', '5Y Return (%)', '3Y Return (%)', '1Y Return (%)', 'Since Inception Return (%)']:\n",
    "        display_df[col] = display_df[col].apply(lambda x: f\"{x:.2f}\" if pd.notna(x) else '-')\n",
    "\n",
    "    print(\"\\n--- Master Analysis Table: Curated for Investors ---\")\n",
    "    display(display_df.head(20))\n",
    "else:\n",
    "    print(\"\\nAnalysis skipped because no data was loaded in Part 1.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🧠 Part 2: The Recommendation Engine (Frontend)\n",
    "\n",
    "**Objective:** \n",
    "Based on the user's investment horizon and risk tolerance, score and rank mutual funds using a **custom suitability algorithm**.\n",
    "\n",
    "**Logic:**\n",
    "- Risk-normalized scoring weights based on user profile.\n",
    "- Funds must match or exceed the desired investment duration.\n",
    "- Split results into **Core (diversified)** and **Specialized (thematic)**.\n",
    "\n",
    "**Why:** \n",
    "This system generates **personalized, goal-aligned fund recommendations** while distinguishing between diversified vs. sector-specific options."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### --- Set Your Investment Goal Below ---"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- Part 3: The Recommendation Engine (with Core vs. Specialized Funds) ---\n",
    "\n",
    "# ======================================================\n",
    "# ===> SET YOUR INVESTMENT GOALS HERE <===\n",
    "INVESTMENT_HORIZON_YEARS = 1\n",
    "RISK_TOLERANCE = 'High'  # Options: 'Low', 'Medium', 'High'\n",
    "# ======================================================\n",
    "\n",
    "def calculate_suitability_score(row, risk_profile):\n",
    "    # --- Robust Normalization ---\n",
    "    exp_return_norm = max(0, min(1, row['Expected Annual Return (%)'] / 50))\n",
    "    sharpe_norm = max(0, min(1, row['Sharpe Ratio'] / 3))\n",
    "    volatility_norm = max(0, min(1, 1 - (row['Annualized Volatility (%)'] / 50)))\n",
    "    \n",
    "    # --- Scoring Logic based on Your Risk Tolerance ---\n",
    "    if risk_profile == 'Low':\n",
    "        score = (sharpe_norm * 0.6) + (volatility_norm * 0.3) + (exp_return_norm * 0.1)\n",
    "    elif risk_profile == 'Medium':\n",
    "        score = (exp_return_norm * 0.4) + (sharpe_norm * 0.4) + (volatility_norm * 0.2)\n",
    "    elif risk_profile == 'High':\n",
    "        score = (exp_return_norm * 0.7) + (sharpe_norm * 0.2) + (volatility_norm * 0.1)\n",
    "    else: # Default to medium\n",
    "        score = (exp_return_norm * 0.4) + (sharpe_norm * 0.4) + (volatility_norm * 0.2)\n",
    "        \n",
    "    if row['Fund Age (Yrs)'] > INVESTMENT_HORIZON_YEARS:\n",
    "        score *= 1.1\n",
    "    return score\n",
    "\n",
    "# --- Execution ---\n",
    "recommendation_df = master_analysis_df.copy()\n",
    "recommendation_df = recommendation_df[recommendation_df['Fund Age (Yrs)'] >= 1].dropna(subset=['Sharpe Ratio'])\n",
    "recommendation_df['Suitability Score'] = recommendation_df.apply(\n",
    "    calculate_suitability_score, axis=1, risk_profile=RISK_TOLERANCE\n",
    ")\n",
    "\n",
    "# --- Split recommendations into Core and Specialized/Thematic ---\n",
    "# Define keywords for specialized funds\n",
    "specialized_keywords = [\n",
    "    'DEFENCE', 'PHARMA', 'HEALTHCARE', 'TECHNOLOGY', 'INFRASTRUCTURE',\n",
    "    'BANKING', 'FINANCIAL SERVICES', 'PSU', 'COMMODITIES', 'CONSUMPTION',\n",
    "    'ENERGY', 'AUTO', 'CHILDREN', 'BENEFIT', 'RETIREMENT', 'SAVER'\n",
    "]\n",
    "specialized_pattern = '|'.join(specialized_keywords)\n",
    "\n",
    "# Create a boolean mask to identify specialized funds\n",
    "is_specialized_mask = recommendation_df['scheme_name'].str.contains(specialized_pattern, case=False, na=False)\n",
    "\n",
    "# Create two separate dataframes\n",
    "core_recommendations = recommendation_df[~is_specialized_mask]\n",
    "specialized_recommendations = recommendation_df[is_specialized_mask]\n",
    "\n",
    "# Sort both dataframes by Suitability Score\n",
    "core_recommendations = core_recommendations.sort_values('Suitability Score', ascending=False)\n",
    "specialized_recommendations = specialized_recommendations.sort_values('Suitability Score', ascending=False)\n",
    "\n",
    "\n",
    "# --- Display the Final Recommendations ---\n",
    "recommendation_cols = [\n",
    "    'scheme_name', 'Suitability Score', 'Expected Annual Return (%)', \n",
    "    'Annualized Volatility (%)', 'Sharpe Ratio', 'Fund Age (Yrs)'\n",
    "]\n",
    "\n",
    "print(f'\\n--- Recommendations for a {INVESTMENT_HORIZON_YEARS}-Year Horizon with {RISK_TOLERANCE} Risk Tolerance ---\\n')\n",
    "\n",
    "# Display Top Core Funds\n",
    "print(\"\\n--- Top 15 Core Diversified Funds ---\")\n",
    "print(\"These are generally suitable as the main part of a portfolio.\")\n",
    "display(core_recommendations[recommendation_cols].head(15))\n",
    "\n",
    "# Display Top Specialized Funds\n",
    "if not specialized_recommendations.empty:\n",
    "    print(\"\\n\\n--- Top 5 Specialized & Thematic Funds ---\")\n",
    "    print(\"These funds focus on specific sectors. They have performed exceptionally well but carry higher concentration risk.\")\n",
    "    display(specialized_recommendations[recommendation_cols].head(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Part 3: Diversification Analysis\n",
    "\n",
    "A well-diversified portfolio contains funds that don't all move together. Use the **Dendrogram** to pick funds from different color clusters. Use the **Correlation Heatmap** to see the exact relationship strength (lower is better for diversification)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 📈 Part 3a: Diversification Analysis via Clustering and Correlation\n",
    "\n",
    "**Objective:** \n",
    "Enable portfolio construction that avoids over-concentration by analyzing **co-movement between top fund returns**.\n",
    "\n",
    "**Tools Used:**\n",
    "- **Hierarchical Clustering (Dendrogram):** Visual groupings of similar funds\n",
    "- **Correlation Heatmap:** Precise relationship quantification\n",
    "\n",
    "**Why:** \n",
    "Helps investors select funds that do not all rise and fall together, increasing portfolio stability through **uncorrelated assets**."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🔬 Part 3b: Technical Deep Dive into the Top Fund\n",
    "\n",
    "**Objective:** \n",
    "Zoom in on the **top recommended core fund** to assess its NAV behavior, momentum, volatility trends, and potential entry signals.\n",
    "\n",
    "**Techniques Used:**\n",
    "- SMAs (20, 50, 200-day)\n",
    "- Bollinger Bands\n",
    "- RSI (Relative Strength Index)\n",
    "\n",
    "**Why:** \n",
    "These indicators help detect overbought/oversold signals and general trend direction—useful for tactical timing decisions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- Part 4: Diversification Analysis for Your Recommended Funds ---\n",
    "\n",
    "# This code assumes the 'core_recommendations' and 'specialized_recommendations' DataFrames exist from the previous cell.\n",
    "\n",
    "# We will create a combined list of top funds for a holistic diversification view\n",
    "top_core = core_recommendations.head(10)['scheme_name']\n",
    "top_specialized = specialized_recommendations.head(5)['scheme_name']\n",
    "top_funds_to_analyze = pd.concat([top_core, top_specialized]).unique()\n",
    "\n",
    "print(f\"\\n--- Analyzing Diversification for Top Recommended Funds ---\")\n",
    "print(f\"This analysis helps you pick funds that don't all move in the same direction.\")\n",
    "\n",
    "# Pivot the data to get daily returns for each fund in columns\n",
    "# This correctly handles funds with different histories.\n",
    "returns_pivot = df[df['scheme_name'].isin(top_funds_to_analyze)].pivot_table(\n",
    "    index='date', columns='scheme_name', values='daily_return'\n",
    ")\n",
    "\n",
    "# --- Data Check ---\n",
    "min_observations = 60  # Require at least ~3 months of common data\n",
    "min_funds = 3          # Minimum number of funds to analyze\n",
    "\n",
    "# Keep only the days where at least `min_funds` have data\n",
    "valid_days = returns_pivot.dropna(thresh=min_funds)\n",
    "\n",
    "if len(valid_days) < min_observations:\n",
    "    print(f\"\\nWarning: Not enough overlapping historical data ({len(valid_days)} days) found for the top recommended funds.\")\n",
    "    print(f\"A reliable diversification analysis could not be performed.\")\n",
    "else:\n",
    "    # Calculate correlation on the data from the valid, overlapping days\n",
    "    correlation_matrix = valid_days.corr()\n",
    "    \n",
    "    # --- 1. Dendrogram (The 'Family Tree' of Your Funds) ---\n",
    "    print(\"\\n--- Fund Cluster Analysis (Dendrogram) ---\")\n",
    "    print(\"Tip: For good diversification, try to pick funds from different main branches (colors).\")\n",
    "    \n",
    "    plt.figure(figsize=(20, 8))\n",
    "    plt.title(f'Hierarchical Clustering of Recommended Funds', fontsize=18, pad=20)\n",
    "    \n",
    "    # The linkage function creates the clusters\n",
    "    linked = hierarchy.linkage(1 - correlation_matrix, method='ward')\n",
    "    \n",
    "    hierarchy.dendrogram(linked, \n",
    "                       labels=correlation_matrix.columns, \n",
    "                       leaf_rotation=90, \n",
    "                       leaf_font_size=10)\n",
    "    plt.ylabel('Cluster Distance (Higher means more different)', fontsize=12)\n",
    "    plt.tight_layout()\n",
    "    plt.show()\n",
    "\n",
    "    # --- 2. Correlation Heatmap (The Detailed View) ---\n",
    "    print(\"\\n--- Correlation Matrix (Heatmap) ---\")\n",
    "    print(\"This shows the exact relationship strength. Lower numbers (blue/cooler colors) are better for diversification.\")\n",
    "    \n",
    "    plt.figure(figsize=(16, 14))\n",
    "    sns.heatmap(correlation_matrix, \n",
    "              annot=True, \n",
    "              cmap='coolwarm', \n",
    "              center=0,\n",
    "              vmin=-1, \n",
    "              vmax=1,\n",
    "              fmt='.2f', \n",
    "              linewidths=.5)\n",
    "    plt.title(f'Correlation Matrix of Daily Returns', fontsize=18, pad=20)\n",
    "    plt.show()\n",
    "    \n",
    "    # --- 3. Actionable Summary ---\n",
    "    print(\"\\n--- Diversification Summary ---\")\n",
    "    # Calculate average correlation from the upper triangle of the matrix to avoid duplicates\n",
    "    avg_corr = correlation_matrix.values[np.triu_indices_from(correlation_matrix, k=1)].mean()\n",
    "    print(f\"The average correlation among your selected funds is: {avg_corr:.2f}\")\n",
    "    if avg_corr > 0.7:\n",
    "        print(\"Suggestion: Your selected funds are quite similar. Consider replacing one with a fund from a different cluster.\")\n",
    "    else:\n",
    "        print(\"Suggestion: This looks like a well-diversified selection.\")\n",
    "\n",
    "# --- Part 5: Individual Fund Deep-Dive ---\n",
    "print(\"\\n\\n--- Individual Fund Deep-Dive ---\")\n",
    "\n",
    "# Select the top recommended CORE fund for a detailed look\n",
    "if not core_recommendations.empty:\n",
    "    selected_fund_name = core_recommendations.iloc[0]['scheme_name']\n",
    "    print(f\"Analyzing the top recommended core fund: {selected_fund_name}\")\n",
    "\n",
    "    fund_df = df[df['scheme_name'] == selected_fund_name].set_index('date').copy()\n",
    "\n",
    "    # Calculate Technical Indicators\n",
    "    fund_df['50_day_sma'] = fund_df['nav'].rolling(window=50).mean()\n",
    "    fund_df['200_day_sma'] = fund_df['nav'].rolling(window=200).mean()\n",
    "    fund_df['20_day_sma'] = fund_df['nav'].rolling(window=20).mean()\n",
    "    fund_df['20_day_std'] = fund_df['nav'].rolling(window=20).std()\n",
    "    fund_df['bollinger_upper'] = fund_df['20_day_sma'] + (fund_df['20_day_std'] * 2)\n",
    "    fund_df['bollinger_lower'] = fund_df['20_day_sma'] - (fund_df['20_day_std'] * 2)\n",
    "    delta = fund_df['nav'].diff()\n",
    "    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()\n",
    "    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()\n",
    "    rs = gain / loss\n",
    "    fund_df['rsi'] = 100 - (100 / (1 + rs))\n",
    "\n",
    "    # Create Subplots for visualization\n",
    "    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 12), sharex=True, gridspec_kw={'height_ratios': [3, 1]})\n",
    "    fig.suptitle(f'Technical Analysis for {selected_fund_name}', fontsize=20)\n",
    "\n",
    "    # Plot 1: NAV, SMAs, and Bollinger Bands\n",
    "    ax1.plot(fund_df.index, fund_df['nav'], label='NAV', color='blue', alpha=0.8)\n",
    "    ax1.plot(fund_df.index, fund_df['50_day_sma'], label='50-Day SMA', color='orange', linestyle='--')\n",
    "    ax1.plot(fund_df.index, fund_df['200_day_sma'], label='200-Day SMA', color='red', linestyle='--')\n",
    "    ax1.fill_between(fund_df.index, fund_df['bollinger_upper'], fund_df['bollinger_lower'], color='gray', alpha=0.2, label='Bollinger Bands')\n",
    "    ax1.set_ylabel('NAV (₹)')\n",
    "    ax1.set_title('Price Trend & Volatility')\n",
    "    ax1.legend()\n",
    "    ax1.grid(True)\n",
    "\n",
    "    # Plot 2: RSI\n",
    "    ax2.plot(fund_df.index, fund_df['rsi'], label='RSI', color='purple')\n",
    "    ax2.axhline(70, linestyle='--', color='red', alpha=0.5, label='Overbought (70)')\n",
    "    ax2.axhline(30, linestyle='--', color='green', alpha=0.5, label='Oversold (30)')\n",
    "    ax2.set_ylabel('RSI')\n",
    "    ax2.set_title('Relative Strength Index (Momentum)')\n",
    "    ax2.set_ylim(0, 100)\n",
    "    ax2.legend()\n",
    "    ax2.grid(True)\n",
    "\n",
    "    plt.xlabel('Date')\n",
    "    plt.tight_layout(rect=[0, 0, 1, 0.97])\n",
    "    plt.show()\n",
    "else:\n",
    "    print(\"\\nDeep-dive analysis skipped as no core funds were recommended.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 💼 Part 4: Personalized Final Portfolio Recommendation\n",
    "\n",
    "**Objective:** \n",
    "Construct a **final diversified investment portfolio** of high-suitability mutual funds that also **minimize internal correlation**.\n",
    "\n",
    "**Steps:**\n",
    "- Start with highest scoring fund\n",
    "- Iteratively add low-correlation funds\n",
    "- Optionally include a thematic/specialized satellite fund\n",
    "\n",
    "**Why:** \n",
    "This creates a **smart portfolio**—well-performing but not over-exposed to a single trend or asset behavior. We also ensure that recommendations are practical (i.e., limited to funds with enough historical data)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- Part 6: Your Personalized Diversified Portfolio Recommendation (Corrected) ---\n",
    "\n",
    "print(\"\\n\" + \"=\"*80)\n",
    "print(\"== YOUR FINAL DIVERSIFIED PORTFOLIO RECOMMENDATION ==\")\n",
    "print(\"=\"*80)\n",
    "\n",
    "# This code assumes 'core_recommendations', 'specialized_recommendations', \n",
    "# and 'correlation_matrix' exist from the previous cells.\n",
    "\n",
    "# --- DEFINITIVE FIX for KeyError ---\n",
    "# First, ensure we only consider funds that are actually in our correlation matrix.\n",
    "# This prevents errors if a recommended fund was too new for the diversification analysis.\n",
    "portfolio_candidates = core_recommendations[core_recommendations['scheme_name'].isin(correlation_matrix.columns)]\n",
    "specialized_candidates = specialized_recommendations[specialized_recommendations['scheme_name'].isin(correlation_matrix.columns)]\n",
    "# --- End of Fix ---\n",
    "\n",
    "# Set the desired size of your core portfolio\n",
    "CORE_PORTFOLIO_SIZE = 3\n",
    "DIVERSIFICATION_THRESHOLD = 0.85 # We want funds with correlation less than this\n",
    "\n",
    "# Start with your best fund as the anchor\n",
    "diversified_portfolio = []\n",
    "if not portfolio_candidates.empty:\n",
    "    diversified_portfolio.append(portfolio_candidates.iloc[0])\n",
    "\n",
    "# Iteratively add the most different funds from our filtered list of candidates\n",
    "for index, candidate_fund in portfolio_candidates.iloc[1:].iterrows():\n",
    "    if len(diversified_portfolio) >= CORE_PORTFOLIO_SIZE:\n",
    "        break\n",
    "\n",
    "    is_different_enough = True\n",
    "    # Check the candidate against every fund already in our portfolio\n",
    "    for selected_fund in diversified_portfolio:\n",
    "        correlation_value = correlation_matrix.loc[selected_fund['scheme_name'], candidate_fund['scheme_name']]\n",
    "        if correlation_value > DIVERSIFICATION_THRESHOLD:\n",
    "            is_different_enough = False\n",
    "            break\n",
    "    \n",
    "    if is_different_enough:\n",
    "        diversified_portfolio.append(candidate_fund)\n",
    "\n",
    "# Convert the list of funds into a clean DataFrame\n",
    "portfolio_df = pd.DataFrame(diversified_portfolio)\n",
    "\n",
    "# --- The Report (in Simple Terms) ---\n",
    "\n",
    "print(\"\\nHello! Based on your goal of a \"\n",
    "      f\"{INVESTMENT_HORIZON_YEARS}-year investment with {RISK_TOLERANCE} risk, here is a sample portfolio designed for you.\")\n",
    "print(\"\\nThe main goal of this portfolio is **diversification**. This simply means not putting all your eggs in one basket. We've selected funds that have performed well but don't always move in the same direction, which helps to lower your overall risk.\")\n",
    "\n",
    "if not portfolio_df.empty:\n",
    "    print(\"\\n--- Your Recommended Core Portfolio ---\\n\")\n",
    "    display(portfolio_df[['scheme_name', 'Suitability Score', 'Expected Annual Return (%)', 'Annualized Volatility (%)']])\n",
    "\n",
    "    print(\"\\n--- Why These Funds Were Chosen ---\\n\")\n",
    "    for index, fund in portfolio_df.iterrows():\n",
    "        if index == 0:\n",
    "            print(f\"1. **{fund['scheme_name']}**: This is your 'Anchor' fund. It's the top-ranked fund based on your goals and forms the foundation of your investment.\")\n",
    "        else:\n",
    "            # Get the average correlation of this fund to the ones selected before it\n",
    "            avg_corr = correlation_matrix.loc[fund['scheme_name'], portfolio_df.iloc[0:index]['scheme_name']].mean()\n",
    "            print(f\"\\n{index + 1}. **{fund['scheme_name']}**: This fund was added because it's a strong performer that is also quite different from the others already in the portfolio (average correlation of only {avg_corr:.2f}). This adds a good layer of diversification.\")\n",
    "\n",
    "    # --- Optional: Add a 'Satellite' fund for extra growth ---\n",
    "    if not specialized_candidates.empty:\n",
    "        top_specialized_fund = specialized_candidates.iloc[0]\n",
    "        # Check if the specialized fund is different enough from the core portfolio\n",
    "        avg_corr_specialized = correlation_matrix.loc[top_specialized_fund['scheme_name'], portfolio_df['scheme_name']].mean()\n",
    "        \n",
    "        if avg_corr_specialized < DIVERSIFICATION_THRESHOLD:\n",
    "            print(\"\\n\\n--- Optional 'Satellite' Fund for Higher Growth ---\\n\")\n",
    "            print(\"For investors comfortable with more risk, you can add a small portion of your investment to a specialized fund. These are less diversified but have very high growth potential.\")\n",
    "            display(pd.DataFrame([top_specialized_fund])[['scheme_name', 'Suitability Score', 'Expected Annual Return (%)']])\n",
    "            print(f\"This fund focuses on a specific theme and has a low correlation ({avg_corr_specialized:.2f}) to your core portfolio, making it a good high-risk, high-reward addition.\")\n",
    "else:\n",
    "    print(\"\\nCould not generate a diversified portfolio. This may be due to a lack of funds with sufficient overlapping history.\")\n",
    "\n",
    "\n",
    "print(\"\\n\\n**Disclaimer:** This is a statistically generated recommendation based on historical data. Past performance is not indicative of future results. Please consider consulting with a financial advisor.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ✅ Final Thoughts & Summary\n",
    "\n",
    "This notebook presents a fully functional **Mutual Fund Analysis & Recommendation Engine**. Here's what we achieved:\n",
    "\n",
    "- 🧮 **Back-End Analysis Engine:** Calculated detailed historical and statistical performance profiles using efficient SQL and pandas logic.\n",
    "- 🧠 **Custom Recommendation Engine:** Personalized fund recommendations based on user-defined risk and timeline.\n",
    "- 📊 **Visual Diversification Tools:** Used clustering and correlation to enable smarter fund selection.\n",
    "- 🔬 **Technical Deep-Dive:** Fund-specific indicators helped investors interpret timing and performance signals.\n",
    "- 💼 **Portfolio Builder:** Automated selection of low-correlation, high-return mutual funds for a diversified investment basket.\n",
    "\n",
    "**This project demonstrates how data science can drive real-world, investor-facing financial intelligence.**"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}

Writing Analysis_Optimized.ipynb


In [36]:
%%writefile dont_know.ipynb
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Part 1: Setup and Memory-Safe Data Loading\n",
    "\n",
    "**Goal:** Load all necessary libraries and use a smart SQL query to load only data for open-ended, accessible funds, preventing memory errors and ensuring data relevance."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "from sqlalchemy import create_engine\n",
    "from config import db_config # Assuming config.py is in the same directory\n",
    "import matplotlib.pyplot as plt\n",
    "import seaborn as sns\n",
    "from scipy.cluster import hierarchy\n",
    "\n",
    "# --- Setup plotting and display styles ---\n",
    "sns.set_style('whitegrid')\n",
    "plt.rcParams['figure.figsize'] = (18, 9)\n",
    "pd.set_option('display.float_format', lambda x: f'{x:.2f}')\n",
    "pd.set_option('display.width', 1000)\n",
    "\n",
    "# --- MEMORY-SAFE DATA LOADING FROM SQL ---\n",
    "print(\"--- Step 1: Connecting to SQL Server to identify unsuitable funds to exclude ---\")\n",
    "try:\n",
    "    engine = create_engine(db_config.url)\n",
    "\n",
    "    # Query 1: Get the small, unique list of all fund schemes\n",
    "    all_funds_query = \"SELECT DISTINCT scheme_code, scheme_name FROM nav_data\"\n",
    "    all_funds_df = pd.read_sql(all_funds_query, engine)\n",
    "    \n",
    "    # Comprehensive keyword list to filter funds not for regular investors\n",
    "    filter_keywords = [\n",
    "        'FMP', 'FIXED MATURITY', 'FIXED TERM', 'SERIES',\n",
    "        'INTERVAL FUND', 'CAPITAL PROTECTION', 'CLOSED ENDED',\n",
    "        'CLOSE ENDED', 'CLOSE-ENDED', 'CAP PROTECTION', 'LIMITED OFFER',\n",
    "        'NFO', 'OPPORTUNITY FUND', 'MATURITY', 'TARGET MATURITY',\n",
    "        'SEGREGATED PORTFOLIO', 'LOCK-IN', 'LIMITED PERIOD'\n",
    "    ]\n",
    "    keyword_pattern = '|'.join(filter_keywords)\n",
    "    \n",
    "    all_funds_df['scheme_name'] = all_funds_df['scheme_name'].astype(str)\n",
    "    unsuitable_funds = all_funds_df[all_funds_df['scheme_name'].str.contains(keyword_pattern, case=False, na=False)]\n",
    "    schemes_to_exclude = unsuitable_funds['scheme_code'].tolist()\n",
    "    \n",
    "    print(f\"Identified {len(schemes_to_exclude)} unsuitable schemes to exclude.\")\n",
    "\n",
    "    # --- Step 2: Build a specific SQL query to load ONLY the required historical data ---\n",
    "    print(\"\\n--- Step 2: Loading historical data for suitable, open-ended funds from SQL ---\")\n",
    "    if schemes_to_exclude:\n",
    "        exclude_list_str = \", \".join([f\"'{code}'\" for code in schemes_to_exclude])\n",
    "        data_query = f\"SELECT * FROM nav_data WHERE scheme_code NOT IN ({exclude_list_str})\"\n",
    "        df = pd.read_sql(data_query, engine, parse_dates=['date'])\n",
    "        print(f\"Successfully loaded a manageable subset of {len(df)} records.\")\n",
    "    else:\n",
    "        print(\"No funds to exclude. Loading entire dataset.\")\n",
    "        df = pd.read_sql(\"SELECT * FROM nav_data\", engine, parse_dates=['date'])\n",
    "\n",
    "except Exception as e:\n",
    "    print(f\"An error occurred during data loading: {e}\")\n",
    "    df = pd.DataFrame()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Part 2: The Main Analysis Engine\n",
    "\n",
    "**Goal:** Take the filtered data from Part 1 and perform the complete, robust analysis. This version contains the definitive fix for the `ValueError: left keys must be sorted`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if not df.empty:\n",
    "    print(\"\\n--- Step 3: Calculating comprehensive metrics for all funds ---\")\n",
    "    df['daily_return'] = df.groupby('scheme_code')['nav'].pct_change()\n",
    "    df_sorted = df.sort_values(['scheme_code', 'date'])\n",
    "    \n",
    "    # Prepare clean, sorted base dataframes\n",
    "    first_navs = df_sorted.loc[df_sorted.groupby('scheme_code')['date'].idxmin()]\n",
    "    # Sort 'latest_navs' by the 'by' key ('scheme_code') to ensure stability for the loop\n",
    "    latest_navs = df_sorted.loc[df_sorted.groupby('scheme_code')['date'].idxmax()].sort_values('scheme_code').reset_index(drop=True)\n",
    "\n",
    "    master_analysis_df = latest_navs[['scheme_code', 'scheme_name', 'date', 'nav']].rename(columns={'date': 'latest_date', 'nav': 'latest_nav'})\n",
    "    master_analysis_df = master_analysis_df.merge(first_navs[['scheme_code', 'date', 'nav']], on='scheme_code', how='inner')\n",
    "    master_analysis_df.rename(columns={'date': 'inception_date', 'nav': 'inception_nav'}, inplace=True)\n",
    "\n",
    "    # Activity Filter (remove funds that haven't reported recently)\n",
    "    last_valid_date = pd.Timestamp.now() - pd.DateOffset(days=30)\n",
    "    activity_mask = master_analysis_df['latest_date'] >= last_valid_date\n",
    "    master_analysis_df = master_analysis_df[activity_mask].copy()\n",
    "    print(f\"Final number of active, open-ended funds for analysis: {len(master_analysis_df)}\")\n",
    "\n",
    "    # --- Historical Performance (CAGR) with guaranteed sorting ---\n",
    "    def calculate_cagr(start_value, end_value, years):\n",
    "        years = pd.to_numeric(years, errors='coerce')\n",
    "        start_value = pd.to_numeric(start_value, errors='coerce')\n",
    "        end_value = pd.to_numeric(end_value, errors='coerce')\n",
    "        result = pd.Series(np.nan, index=start_value.index)\n",
    "        valid_mask = (start_value > 0) & (end_value > 0) & (years > 0.25)\n",
    "        s, e, y = start_value[valid_mask], end_value[valid_mask], years[valid_mask]\n",
    "        result.loc[valid_mask] = ((e / s) ** (1 / y) - 1) * 100\n",
    "        return result\n",
    "\n",
    "    right_df_for_merge = df_sorted[['date', 'scheme_code', 'nav']]\n",
    "\n",
    "    for years in [1, 3, 5, 10]:\n",
    "        target_date_col = f'date_{years}y_ago'\n",
    "        nav_col = f'nav_{years}y_ago'\n",
    "        master_analysis_df[target_date_col] = master_analysis_df['latest_date'] - pd.DateOffset(years=years)\n",
    "        left_df = master_analysis_df[['scheme_code', target_date_col]].sort_values(by=['scheme_code', target_date_col])\n",
    "        period_navs = pd.merge_asof(left_df, right_df_for_merge, left_on=target_date_col, right_on='date', by='scheme_code', direction='nearest')\n",
    "        master_analysis_df = master_analysis_df.merge(period_navs[['scheme_code', 'nav']].rename(columns={'nav': nav_col}), on='scheme_code', how='left')\n",
    "        master_analysis_df[f'{years}Y Return (%)'] = calculate_cagr(master_analysis_df[nav_col], master_analysis_df['latest_nav'], years)\n",
    "\n",
    "    master_analysis_df['Fund Age (Yrs)'] = (master_analysis_df['latest_date'] - master_analysis_df['inception_date']).dt.days / 365.25\n",
    "    master_analysis_df['Since Inception Return (%)'] = calculate_cagr(master_analysis_df['inception_nav'], master_analysis_df['latest_nav'], master_analysis_df['Fund Age (Yrs)'])\n",
    "\n",
    "    # --- Statistical Profile (Risk and Expectation) ---\n",
    "    daily_returns_grouped = df.groupby('scheme_code')['daily_return']\n",
    "    mean_returns = daily_returns_grouped.mean().to_frame(name='mean')\n",
    "    std_returns = daily_returns_grouped.std().to_frame(name='std')\n",
    "    stats_df = mean_returns.join(std_returns).reset_index()\n",
    "    stats_df['Expected Annual Return (%)'] = stats_df['mean'] * 252 * 100\n",
    "    stats_df['Annualized Volatility (%)'] = stats_df['std'] * np.sqrt(252) * 100\n",
    "    risk_free_rate_daily = (1.04 ** (1/252)) - 1\n",
    "    stats_df['Sharpe Ratio'] = (stats_df['mean'] - risk_free_rate_daily) / stats_df['std'] * np.sqrt(252)\n",
    "    master_analysis_df = master_analysis_df.merge(stats_df[['scheme_code', 'Expected Annual Return (%)', 'Annualized Volatility (%)', 'Sharpe Ratio']], on='scheme_code', how='left')\n",
    "\n",
    "    # --- Final Display Table ---\n",
    "    display_cols = [\n",
    "        'scheme_name', 'Fund Age (Yrs)', 'Since Inception Return (%)', '10Y Return (%)', '5Y Return (%)',\n",
    "        '3Y Return (%)', '1Y Return (%)', 'Expected Annual Return (%)', 'Annualized Volatility (%)', 'Sharpe Ratio'\n",
    "    ]\n",
    "    display_df = master_analysis_df[display_cols].sort_values('5Y Return (%)', ascending=False).copy()\n",
    "    for col in ['10Y Return (%)', '5Y Return (%)', '3Y Return (%)', '1Y Return (%)', 'Since Inception Return (%)']:\n",
    "        display_df[col] = display_df[col].apply(lambda x: f\"{x:.2f}\" if pd.notna(x) else '-')\n",
    "\n",
    "    print(\"\\n--- Master Analysis Table: Curated for Investors ---\")\n",
    "    display(display_df.head(20))\n",
    "else:\n",
    "    print(\"\\nAnalysis skipped because no data was loaded.\")\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}



Writing dont_know.ipynb


In [31]:
%%writefile forecasting_final.ipynb
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Predictive Forecasting with Growth Analysis"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Setup and Data Loading"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "from sqlalchemy import create_engine\n",
    "from config import db_config\n",
    "from prophet import Prophet\n",
    "from statsmodels.tsa.holtwinters import ExponentialSmoothing\n",
    "import matplotlib.pyplot as plt\n",
    "import seaborn as sns\n",
    "import warnings\n",
    "\n",
    "warnings.filterwarnings('ignore')\n",
    "sns.set_style('whitegrid')\n",
    "plt.rcParams['figure.figsize'] = (15, 7)\n",
    "\n",
    "print(\"Libraries loaded.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "engine = create_engine(db_config.url)\n",
    "query = \"SELECT * FROM nav_data\"\n",
    "df = pd.read_sql(query, engine)\n",
    "df['date'] = pd.to_datetime(df['date'])\n",
    "\n",
    "print(f\"Loaded {len(df)} records and {df['scheme_code'].nunique()} unique funds from the database.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Select a Fund to Forecast"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fund_list = sorted(df['scheme_name'].unique())\n",
    "selected_fund_name = 'HDFC Flexi Cap Fund - Direct Plan - Growth Option'\n",
    "if selected_fund_name not in fund_list:\n",
    "    selected_fund_name = fund_list[0]\n",
    "\n",
    "print(f\"\\nAnalyzing fund: {selected_fund_name}\")\n",
    "\n",
    "fund_df_raw = df[df['scheme_name'] == selected_fund_name][['date', 'nav']].copy()\n",
    "fund_df_raw.drop_duplicates(subset=['date'], keep='last', inplace=True)\n",
    "\n",
    "MIN_DATA_POINTS = 365 # We need at least a year of data for yearly analysis\n",
    "has_enough_data = len(fund_df_raw) >= MIN_DATA_POINTS\n",
    "\n",
    "if not has_enough_data:\n",
    "    print(f\"🛑 ANALYSIS HALTED: Not enough data for the selected fund. Need at least {MIN_DATA_POINTS} days of data for yearly analysis.\")\n",
    "else:\n",
    "    print(f\"✅ Sufficient data found. Proceeding with forecast...\")\n",
    "    fund_df = fund_df_raw.sort_values('date').set_index('date').asfreq('D').fillna(method='ffill').reset_index()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Forecasting Models & Detailed Visualization\n",
    "This section will only run if the selected fund has enough data. It now includes the main forecast plot, a bar chart for historical growth, and a table with specific future NAV predictions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if has_enough_data:\n",
    "    # --- Run Forecasting Models (Prophet & Holt-Winters) ---\n",
    "    print(\"\\nFitting forecasting models...\")\n",
    "    prophet_df = fund_df.rename(columns={'date': 'ds', 'nav': 'y'})\n",
    "    model_prophet = Prophet(daily_seasonality=True).fit(prophet_df)\n",
    "    future_prophet = model_prophet.make_future_dataframe(periods=365)\n",
    "    forecast_prophet = model_prophet.predict(future_prophet)\n",
    "    \n",
    "    hw_df = fund_df.set_index('date')\n",
    "    model_hw = ExponentialSmoothing(hw_df['nav'], trend='add', seasonal='add', seasonal_periods=365).fit()\n",
    "    forecast_hw = model_hw.forecast(365)\n",
    "    \n",
    "    # --- Create Ensemble Forecast ---\n",
    "    print(\"Creating ensemble forecast...\")\n",
    "    prophet_future_values = forecast_prophet[-365:]['yhat']\n",
    "    hw_future_values = forecast_hw\n",
    "    ensemble_forecast = (prophet_future_values.values + hw_future_values.values) / 2\n",
    "    forecast_dates = pd.date_range(start=fund_df['date'].iloc[-1], periods=365 + 1)[1:]\n",
    "\n",
    "    # --- NEW: Create a Summary Table of Forecasted Values ---\n",
    "    print(\"\\n--- Key Forecasted NAV Values ---\")\n",
    "    forecast_summary = pd.DataFrame({\n",
    "        'Timeframe': ['1 Month', '3 Months', '6 Months', '1 Year'],\n",
    "        'Date': [\n",
    "            forecast_dates[29].date(),\n",
    "            forecast_dates[89].date(),\n",
    "            forecast_dates[179].date(),\n",
    "            forecast_dates[364].date()\n",
    "        ],\n",
    "        'Forecasted NAV (₹)': [\n",
    "            ensemble_forecast[29],\n",
    "            ensemble_forecast[89],\n",
    "            ensemble_forecast[179],\n",
    "            ensemble_forecast[364]\n",
    "        ]\n",
    "    })\n",
    "    forecast_summary['Forecasted NAV (₹)'] = forecast_summary['Forecasted NAV (₹)'].round(2)\n",
    "    display(forecast_summary.set_index('Timeframe'))\n",
    "\n",
    "    # --- Create Figure with Two Subplots ---\n",
    "    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 14), gridspec_kw={'height_ratios': [3, 2]})\n",
    "    fig.suptitle(f'In-Depth Analysis for: {selected_fund_name}', fontsize=20)\n",
    "\n",
    "    # --- Plot 1: The Main Forecast ---\n",
    "    ax1.plot(fund_df['date'], fund_df['nav'], label='Historical NAV')\n",
    "    ax1.plot(forecast_dates, ensemble_forecast, color='purple', linestyle='--', label='Ensemble Forecast (1-Year)')\n",
    "    ax1.set_title('Historical NAV and Future Forecast')\n",
    "    ax1.set_ylabel('NAV (₹)')\n",
    "    ax1.legend()\n",
    "\n",
    "    # --- Plot 2: Year-over-Year Growth Bar Chart ---\n",
    "    print(\"\\nCalculating year-over-year growth...\")\n",
    "    yearly_df = fund_df.set_index('date').resample('A-DEC').last()\n",
    "    yearly_df['yearly_growth_pct'] = yearly_df['nav'].pct_change() * 100\n",
    "    yearly_df.dropna(inplace=True)\n",
    "    yearly_df.index = yearly_df.index.year\n",
    "    \n",
    "    colors = ['green' if x > 0 else 'red' for x in yearly_df['yearly_growth_pct']]\n",
    "    sns.barplot(x=yearly_df.index, y=yearly_df['yearly_growth_pct'], ax=ax2, palette=colors)\n",
    "    ax2.set_title('Historical Year-over-Year Growth')\n",
    "    ax2.set_xlabel('Year')\n",
    "    ax2.set_ylabel('Growth (%)')\n",
    "    for p in ax2.patches:\n",
    "        ax2.annotate(f'{p.get_height():.1f}%', (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='center', xytext=(0, 9), textcoords='offset points')\n",
    "\n",
    "    plt.tight_layout(rect=[0, 0.03, 1, 0.96])\n",
    "    plt.show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}


Overwriting forecasting_final.ipynb


In [19]:
%%writefile data_inspection.ipynb
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Data Inspection and Diagnostics"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook traces the data for a single fund through the entire pipeline to find any discrepancies."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "from sqlalchemy import create_engine\n",
    "from config import db_config\n",
    "import requests\n",
    "\n",
    "# The fund name we are investigating\n",
    "TARGET_FUND_NAME = 'Axis Bluechip Fund - Direct Plan - Growth'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 1: Inspect the Raw Source Data\n",
    "**Goal:** Let's get the master list of all funds directly from the API to find the **exact** `schemeName` and `schemeCode` for our target fund. A tiny difference in the name can cause it to be missed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Fetching master fund list from API...\")\n",
    "all_funds_url = \"https://api.mfapi.in/mf\"\n",
    "response = requests.get(all_funds_url)\n",
    "all_funds_df = pd.DataFrame(response.json())\n",
    "\n",
    "# Search for our target fund in the raw list\n",
    "target_fund_info = all_funds_df[all_funds_df['schemeName'].str.contains('Axis Bluechip Fund - Direct Plan - Growth', case=False)]\n",
    "\n",
    "print(\"--- Raw Fund Information from API Source ---\")\n",
    "if not target_fund_info.empty:\n",
    "    print(\"Fund found at the source!\")\n",
    "    display(target_fund_info)\n",
    "    # Store the exact code and name for the next steps\n",
    "    exact_scheme_code = target_fund_info.iloc[0]['schemeCode']\n",
    "    exact_scheme_name = target_fund_info.iloc[0]['schemeName']\n",
    "else:\n",
    "    print(\"🛑 CRITICAL: Fund not found in the master list from the API.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 2: Inspect the Data Loaded into the Database\n",
    "**Goal:** Now let's check our SQL database. Did the data for this fund, using the **exact name** we found in Step 1, actually get saved correctly?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if 'exact_scheme_name' in locals():\n",
    "    print(f\"Querying the database for: '{exact_scheme_name}'\")\n",
    "    engine = create_engine(db_config.url)\n",
    "    query = f\"SELECT * FROM nav_data WHERE scheme_name = '{exact_scheme_name}'\"\n",
    "    \n",
    "    try:\n",
    "        db_df = pd.read_sql(query, engine)\n",
    "        print(\"--- Data Found in SQL Database ---\")\n",
    "        if not db_df.empty:\n",
    "            print(f\"Success! Found {len(db_df)} records in the database.\")\n",
    "            print(\"Sample data from DB:\")\n",
    "            display(db_df.head())\n",
    "        else:\n",
    "            print(\"🛑 CRITICAL: Fund data was NOT found in the database, even though it exists at the source.\")\n",
    "            print(\"This suggests a problem during the data extraction or loading phase in your pipeline.\")\n",
    "    except Exception as e:\n",
    "        print(f\"An error occurred while querying the database: {e}\")\n",
    "else:\n",
    "    print(\"Skipping database check because fund was not found at the source.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 3: Replicate the Analysis Notebook's Loading Process\n",
    "**Goal:** Finally, let's replicate exactly what the forecasting notebook does to see why it's getting 0 records."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"--- Simulating the Forecasting Notebook --- \")\n",
    "engine = create_engine(db_config.url)\n",
    "full_query = \"SELECT * FROM nav_data\"\n",
    "full_df_from_db = pd.read_sql(full_query, engine)\n",
    "\n",
    "# This is the exact filter that was failing\n",
    "analysis_df = full_df_from_db[full_df_from_db['scheme_name'] == TARGET_FUND_NAME]\n",
    "\n",
    "print(f\"Attempting to filter for: '{TARGET_FUND_NAME}'\")\n",
    "print(f\"Number of records found: {len(analysis_df)}\")\n",
    "\n",
    "if len(analysis_df) == 0 and 'exact_scheme_name' in locals() and TARGET_FUND_NAME != exact_scheme_name:\n",
    "    print(\"\\n--- DIAGNOSIS ---\")\n",
    "    print(\"The problem is a name mismatch!\")\n",
    "    print(f\"You are filtering for: '{TARGET_FUND_NAME}'\")\n",
    "    print(f\"But the name in the database is: '{exact_scheme_name}'\")\n",
    "    print(\"Please use the exact name from the database in your analysis notebooks.\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}

Overwriting data_inspection.ipynb


In [20]:
%%writefile run_local_pipeline.py
"""
This script runs the local data ingestion pipeline to populate the database.
"""
import logging
from data_extractor import MFDataExtractor
from db_loader import DBLoader

# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def run_pipeline():
    """Runs the data extraction and loading pipeline."""
    logging.info("--- Starting Local Data Pipeline Run ---")
    try:
        # 1. Initialize components
        extractor = MFDataExtractor()
        loader = DBLoader()

        # 2. Extraction from the official AMFI source
        logging.info("Extracting data from AMFI source...")
        nav_df = extractor.get_all_nav_data()

        if nav_df.empty:
            raise ValueError("No data was extracted from AMFI. Halting pipeline.")

        # 3. Loading the data into the database using 'append'
        logging.info(f"Loading {len(nav_df)} records to the database...")
        # This will now correctly append data and build a history
        success = loader.load_to_db(nav_df, 'nav_data', if_exists='append')
        
        if not success:
            raise ValueError("Data loading failed during database operation.")
        
        logging.info("--- Data Pipeline Completed Successfully ---")

    except Exception as e:
        logging.error(f"Pipeline failed: {e}", exc_info=True)

if __name__ == "__main__":
    run_pipeline()

Overwriting run_local_pipeline.py
