## Import Libraries

In [None]:
# --- Import Libraries ---

# Pandas to handle the data
import pandas as pd

# For checking URL and HTTP errors
from urllib.error import URLError, HTTPError

## Global Variables

In [None]:
# --- GLOBAL VARIABLES ---

# AMFI Site URL that holds a CSV with all mutual funds
AMFI_SCHEME_URL = 'https://portal.amfiindia.com/DownloadSchemeData_Po.aspx?mf=0'
AMFI_SCHEME_LATEST_NAV_URL = 'https://www.amfiindia.com/spages/NAVAll.txt'

## Fetch Mutual Fund Scheme List

In [None]:
# Changing the column names to a more uniform and descriptive names
mf_col_names = [
    'AMC', 
    'Scheme_Code', 
    'Scheme_Name', 
    'Scheme_Type', 
    'Scheme_Category', 
    'Scheme_NAV_Name', 
    'Scheme_Min_Amt', 
    'Launch_Date', 
    'Closure_Date', 
    'ISIN_Div_Payout/Growth/Div_Reinvestment']

# --- Read Data ---
try:
    # Reading the URL, updating col names and datatypes
    mf_scheme_df = pd.read_csv(
        AMFI_SCHEME_URL, 
        names = mf_col_names, 
        # dtype = mf_scheme_dtypes, takes a dict of col:dtype 
        header = 0
    )
    print("📖 Successfully read scheme data from AMFI Website!")
    # Row and column count 
    print(f"📋 Total Rows & Columns: {mf_scheme_df.shape}")
except (URLError, HTTPError) as e:
    # Exception to handle URL and HTTP errors
    print(f"🛜 Error reading CSV from URL: {e}.")
except Exception as e:
    # Exception to catch all other errors
    print(f"❌ Error occurred while reading CSV: {e}.")

## Fetch Latest NAV Data From AMFI

In [None]:
# Changing the column names to a more uniform and descriptive names
nav_col_names = [
    'Scheme_Code',
    'ISIN_Div_Payout/Growth',
    'ISIN_Div_Reinvestment',
    'Scheme_Name', 
    'NAV', 
    'Latest_NAV_Date'
]

# --- Read Data ---
try:
    # Getting the latest NAV(Net Asset Value)
    latest_nav_df = pd.read_csv(
        AMFI_SCHEME_LATEST_NAV_URL,
        names = nav_col_names,
        header = 0, 
        sep = ';',    
    )
    print("📊 Successfully read latest NAV data from AMFI Website!")
    # Row and column count 
    print(f"📋 Total Rows & Columns: {latest_nav_df.shape}")
except (URLError, HTTPError) as e:
    # Exception to handle URL and HTTP errors
    print(f"🛜 Error reading CSV from URL: {e}.")
except Exception as e:
    # Exception to catch all other errors
    print(f"❌ Error occurred while reading TXT: {e}.")

# --- Sort & Filter Data ---
try:
    # Sort the values by scheme code in ascending order and updating the original df
    latest_nav_df.sort_values(
        'Scheme_Code', ascending = True, inplace = True)
    # Drop NaN, if Scheme name, nav and nav date are NaN as the TXT we are importing has these weird headings of AMC and fund categories
    latest_nav_df.dropna(
        subset = ['Scheme_Name', 'NAV','Latest_NAV_Date'], 
        inplace = True)
    print("✅ Successfully removed NaN from latest_nav_df.")
    # Row and column count 
    print(f"🆕 Updated Rows & Columns: {latest_nav_df.shape}")
except Exception as e:
    # Exception to catch all other errors
    print(f"❌ Error occured while sorting scheme code/dropping NaN.")

## Data Type Conversion Function

In [None]:
# Function to convert the dataframe column dtypes (not a dataframe copy)
# Need to import Pandas for this function to work!

print("⏳ Defining data type conversion function...")


def data_type_conversion (input_dict, input_df, df_name):
    """
    Converts dataframe columns to various (int, float, date/datetime, category and string) data types.

    Args:
        input_dict(dict): Key-Value pairs of 'col': 'dtype' to convert.
        input_df(dataframe): Dataframe you want the changes to be made.
        df_name(string): Pass the name of the Dataframe as a string for logs.

    Returns:
        Nothing returned, data type conversions are made to the passed dataframe(input_df)
    """

    print(f"🔀 Sarting data type conversion for the dataframe: {df_name}.")
    
    # --- Loop over the dict's items ---
    for col, dtype in input_dict.items():
        # --- Check if the column actually exists in the dataframe before changing data type ---
        if col in input_df.columns:
            print(f"  🚩 Attempting to convert '{col}' to type: '{dtype}'.")
            # Try-except for error handling
            try:
                # --- Apply conversion based on the dtype string from the dictionary ---
                if dtype == 'int':
                    # pd.to_numeric(errors='coerce') handles the dirty work of converting non-numeric garbage to NaN
                    # .astype(dtype) comes after to enforce the specific target numeric type (float or the special Int64 that handles NaNs for integers) that you originally requested
                    input_df[col] = pd.to_numeric(input_df[col], errors = 'coerce').astype('Int64')
                elif dtype == 'float':
                    # Coerce to numeric first, then ensure float type
                    input_df[col] = pd.to_numeric(input_df[col], errors = 'coerce').astype('float')
                elif dtype == 'date' or dtype == 'datetime':
                    input_df[col] = pd.to_datetime(input_df[col], errors = 'coerce')
                elif dtype == 'category':
                    input_df[col] = input_df[col].astype('category')
                elif dtype ==  'string':
                    input_df[col] = input_df[col].astype('string')
                else:
                    print(f"    🟨 Warning: Unknown dtype '{dtype}' specified for column '{col}'. Could not attempt conversion.")

                print(f"  ✅ Successfully converted '{col}' to ''{input_df[col].dtype}''")
            except Exception as e:
                print(f"  ❌ Error converting column '{col}' to ''{dtype}'', Error: {e}")
        else:
            print(f"🟨 Warning: '{col}' does not exists in '{df_name}' dataframe.")
    
    # --- Data conversion done ---
    print(f"✅ Finished data conversion for dataframe: {df_name}.")

print("✅ Defined data type conversion function.")

## Process Data (Data Type Conversion)
1. Convert categorical values to 'category' dtype.
2. Convert dated columns to 'datetime' dtype.
3. Convert numerical columns to 'Int64' and 'float' based on the data.

In [None]:
# --- Data Type Conversion ---

# Converting to 'category' datatype will reduce the memory and increase the speed of queries on the dataframe
mf_scheme_dtypes = {
    'AMC': 'category',
    'Scheme_Code': 'int',
    'Scheme_Type': 'category',
    'Scheme_Category': 'category',
    'Launch_Date': 'datetime', 
    'Closure_Date': 'datetime'
}

# Call the function to convert the columns of mf_scheme_df
data_type_conversion(mf_scheme_dtypes, mf_scheme_df, 'mf_scheme_df')


# Dictionary of 'Col' : 'dtype' to convert the columns to a different dtype
nav_dtypes = {
    'Scheme_Code': 'int',
    'NAV': 'float',
    'Latest_NAV_Date': 'datetime'
}

# Call the function to convert the columns of latest_nav_df
data_type_conversion(nav_dtypes, latest_nav_df, 'latest_nav_df')


# # Data Type conversion for merged_df
# merged_dtypes = {
    
# }

## Merge Scheme Data & Latest NAV Data

In [None]:
print("⏳ Merging scheme data & latest NAV data...")

# Merge MF scheme data and latest NAV sata fetched from AMFI into a single dataframe
merged_df = pd.merge(
    mf_scheme_df, 
    latest_nav_df, 
    # INNER JOIN on Scheme_Code column, we fetch the common records from both mf_scheme_df and latest_nav_df
    on = 'Scheme_Code',
    how = 'inner'
)

print("✅ Sucessfull merged scheme and NAV data.")

# Row and column count 
print(f"📋 Total Rows & Columns: {merged_df.shape}")

# --- Clean DataFrame ---

try:
    # Drop duplicate columns
    cols_to_drop = ['ISIN_Div_Payout/Growth/Div_Reinvestment', 'Scheme_Name_y']
    
    merged_df.drop(
        labels = cols_to_drop,
        axis = 'columns',
        inplace = True
    )
    
    # A dictionary of 'old_col':'new_col' to map column names from old to new
    merged_mapper = {'Scheme_Name_x': 'Scheme_Name'}
    # Rename column Scheme_Name_x to Scheme_Name
    merged_df.rename(columns = merged_mapper, inplace = True)
    
    # Set index to Scheme_Code for faster queries on the dataframe
    merged_df.set_index('Scheme_Code', inplace = True)
    
    # Sort the DataFrame by Scheme_Code(Index) for greater performace
    merged_df.sort_index(ascending = True, inplace = True)

    print("✅ Sucessfully dropped columns, renamed columns, set and sorted index.")

    # Row and column count 
    print(f"🆕 Updated Rows & Columns after cleaning: {merged_df.shape}")
    
except Exception as e:
    print(f"❌ Unable to clean the merged dataframe: {e}")

## Handling Data in merged_df

In [None]:
# Null Latest_NAV_Date after joining mf_scheme_df and latest_nav_df
merged_df['Latest_NAV_Date'].isna().sum()

In [None]:
# Drop the rows with no Latest_NAV_Date i.e schemes that are not being updated daily
# Get rid of them for a smaller dataframe
df = merged_df.dropna(subset=['Latest_NAV_Date'])
df.info()

## Fetching Historical NAV (Work in progress)

In [None]:
# Installing mftool to fetch mutual fund data
!pip install mftool --quiet

# Importing Mftool, and calling API function calls with mf.function()
from mftool import Mftool
mf = Mftool()

In [None]:
#
def get_historical_nav_data (scheme_code):
    """
    Fetches historical NAV data from Mftool library into a dataframe
    Args:
        scheme_code(int): 
    Return:
    
    """

    # --- Fetch Data ---

    # Call the .get_scheme_historical_nav('scheme_code')
    hist_nav_df = mf.get_scheme_historical_nav(scheme_code, as_Dataframe=True)

    # --- Clean DataFrame ---

    # Rename dataframe index from 'date' to 'NAV_Date'
    hist_nav_df.index.name = 'NAV_Date'

    # Mapper to map old column names to new column names
    hist_mapper = {
        'nav': 'NAV',
        'dayChange': 'Daily_Change'
    }

    # Rename column names
    hist_nav_df.rename(columns = hist_mapper, inplace = True)

    # --- Handle Data ---

    # Converting the index(NAV_Date) from object to datetime for better query performance 
    hist_nav_df.index = pd.to_datetime(
        hist_nav_df.index, 
        format = '%d-%m-%Y', # Format to expect
        errors = 'coerce')

    # Dict to pass into data type conversion function
    hist_nav_dtypes = {
        'NAV': 'float'
    }
    
    # Call data type conversion function and passing the dict(with col and dtype), dataframe, dataframe name as a string(for logs)
    data_type_conversion(hist_nav_dtypes, hist_nav_df, 'hist_nav_df')

    return hist_nav_df

### Testing Historical NAV Data

In [None]:
# Test
code = 101525

new_df = get_historical_nav_data(code)
new_df.info()

# hist_nav_df = mf.get_scheme_historical_nav(code, as_Dataframe=True)
# hist_nav_df.info()