In [None]:
pip install mstarpy

Collecting mstarpy
  Downloading mstarpy-2.0.0-py3-none-any.whl.metadata (18 kB)
Downloading mstarpy-2.0.0-py3-none-any.whl (29 kB)
Installing collected packages: mstarpy
Successfully installed mstarpy-2.0.0


In [None]:
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any
import mstarpy
from collections import defaultdict, deque

# Function to calculate the XIRR
def calculate_xirr(cash_flows: List[float], dates: List[datetime], guess=0.1, max_iterations=100) -> float:
    def xirr_func(rate):
        return sum(cf / (1 + rate) ** ((date - dates[0]).days / 365.0) for cf, date in zip(cash_flows, dates))

    tolerance = 1e-6
    iteration = 0

    while iteration < max_iterations:
        rate = guess
        f_val = xirr_func(rate)
        f_deriv = sum(-cf * (date - dates[0]).days / 365.0 / (1 + rate) ** ((date - dates[0]).days / 365.0 + 1)
                      for cf, date in zip(cash_flows, dates))
        new_guess = rate - f_val / f_deriv
        if abs(new_guess - rate) < tolerance:
            return rate * 100  # Return as percentage
        if abs(new_guess) > 1000:
            raise ValueError("XIRR rate calculation is diverging.")
        guess = new_guess
        iteration += 1

    return None

# Load data from JSON
def load_data_from_json(json_file_path: str) -> Dict[str, Any]:
    with open(json_file_path) as file:
        return json.load(file)

# Optimized Function to get current NAV using mstarpy with caching
nav_cache = {}

def get_current_nav(isin: str) -> float:
    # Return from cache if already fetched
    if isin in nav_cache:
        return nav_cache[isin]

    # Fetch NAV if not in cache
    fund = mstarpy.Funds(term=isin, country="in")
    end_date = datetime.now()
    start_date = end_date - timedelta(days=1)

    nav_data = fund.nav(start_date=start_date, end_date=end_date, frequency="daily")
    if len(nav_data) > 0:
        nav = nav_data[-1]['nav']
        nav_cache[isin] = nav  # Cache the result
        return nav
    return None

# Function to calculate total portfolio value, gain, and XIRR using DTtransaction
def calculate_portfolio_values(data: Dict[str, Any]) -> None:
    total_investment = 0
    total_current_value = 0
    total_gain = 0
    cash_flows = []
    cash_flow_dates = []

    # Track units and acquisition price per scheme-broker combination (FIFO)
    holdings = defaultdict(deque)  # deque to store unit batches with acquisition price

    # Process each scheme in the DTtransaction
    for scheme in data["data"]:
        for transaction in scheme["dtTransaction"]:
            trxn_units = float(transaction["trxnUnits"])
            trxn_amount = float(transaction["trxnAmount"])
            trxn_date = datetime.strptime(transaction["trxnDate"], "%d-%b-%Y")
            isin = transaction["isin"]
            folio = transaction["folio"]

            # Create a unique key for each scheme-broker (folio) combination
            folio_key = (isin, folio)

            # Fetch the current NAV dynamically using mstarpy
            nav = get_current_nav(isin)
            if nav is None:
                print(f"Failed to retrieve NAV for ISIN: {isin}")
                continue  # Skip this scheme if NAV retrieval failed

            # For buying units
            if trxn_units > 0:
                # Track holdings with units and purchase price using FIFO
                holdings[folio_key].append({
                    'units': trxn_units,
                    'price': trxn_amount / trxn_units,  # Acquisition price per unit
                    'date': trxn_date
                })
                total_investment += trxn_amount  # Track total investment

                # Prepare cash flows for XIRR calculation
                cash_flows.append(-trxn_amount)  # Outflow (investment)
                cash_flow_dates.append(trxn_date)

            # For selling units, apply FIFO
            elif trxn_units < 0:
                units_to_sell = abs(trxn_units)
                total_selling_value = 0
                acquisition_cost = 0
                units_sold = 0

                # Apply FIFO - Sell from oldest holdings
                while units_to_sell > 0 and holdings[folio_key]:
                    batch = holdings[folio_key][0]  # Get the oldest batch
                    available_units = batch['units']
                    if available_units <= units_to_sell:
                        # Sell all units from this batch
                        units_sold += available_units
                        acquisition_cost += available_units * batch['price']
                        total_selling_value += available_units * nav
                        units_to_sell -= available_units
                        holdings[folio_key].popleft()  # Remove the sold batch
                    else:
                        # Sell partial units from this batch
                        units_sold += units_to_sell
                        acquisition_cost += units_to_sell * batch['price']
                        total_selling_value += units_to_sell * nav
                        batch['units'] -= units_to_sell  # Update remaining units in batch
                        units_to_sell = 0  # All units sold

                total_current_value += total_selling_value
                gain = total_selling_value - acquisition_cost  # Calculate gain
                total_gain += gain

    # Calculate current value for remaining units
    for folio_key, batches in holdings.items():
        isin = folio_key[0]
        for batch in batches:
            if batch['units'] > 0:
                current_value = batch['units'] * get_current_nav(isin)
                total_current_value += current_value

    # Add final current value as inflow for XIRR
    cash_flows.append(total_current_value)
    cash_flow_dates.append(datetime.now())

    # Calculate XIRR
    try:
        xirr_value = calculate_xirr(cash_flows, cash_flow_dates)
    except ValueError as e:
        print(f"Error in XIRR calculation: {e}")
        xirr_value = "N/A"

    # Output results
    print(f"Total Investment: {total_investment:.2f}")
    print(f"Current Portfolio Value: {total_current_value:.2f}")
    print(f"Portfolio Gain: {total_gain:.2f}")
    print(f"XIRR: {xirr_value:.2f}%" if xirr_value != "N/A" else "XIRR: N/A")

# Main function to run the calculations
def main():
    json_file_path = "/content/input.json"  # Update with your actual path
    data = load_data_from_json(json_file_path)
    calculate_portfolio_values(data)

if __name__ == "__main__":
    main()


Total Investment: 3577231.33
Current Portfolio Value: 4866831.31
Portfolio Gain: 138394.55
XIRR: 22.17%


In [None]:
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any
import mstarpy
from collections import defaultdict, deque

# Function to calculate the XIRR
def calculate_xirr(cash_flows: List[float], dates: List[datetime], guess=0.1, max_iterations=100) -> float:
    def xirr_func(rate):
        return sum(cf / (1 + rate) ** ((date - dates[0]).days / 365.0) for cf, date in zip(cash_flows, dates))

    tolerance = 1e-6
    iteration = 0

    while iteration < max_iterations:
        rate = guess
        f_val = xirr_func(rate)
        f_deriv = sum(-cf * (date - dates[0]).days / 365.0 / (1 + rate) ** ((date - dates[0]).days / 365.0 + 1)
                      for cf, date in zip(cash_flows, dates))
        new_guess = rate - f_val / f_deriv
        if abs(new_guess - rate) < tolerance:
            return rate * 100  # Return as percentage
        if abs(new_guess) > 1000:
            raise ValueError("XIRR rate calculation is diverging.")
        guess = new_guess
        iteration += 1

    return None

# Load data from JSON
def load_data_from_json(json_file_path: str) -> Dict[str, Any]:
    with open(json_file_path) as file:
        return json.load(file)

# Function to get current NAV using mstarpy
def get_current_nav(isin: str) -> float:
    fund = mstarpy.Funds(term=isin, country="in")
    end_date = datetime.now()
    start_date = end_date - timedelta(days=1)

    nav_data = fund.nav(start_date=start_date, end_date=end_date, frequency="daily")
    if len(nav_data) > 0:
        return nav_data[-1]['nav']
    return None

# Function to calculate total portfolio value, net units, net value, and total gain for each fund
def calculate_portfolio_values(data: Dict[str, Any]) -> None:
    total_investment = 0
    total_current_value = 0
    total_gain = 0
    cash_flows = []
    cash_flow_dates = []

    # Track units and acquisition price per scheme-broker combination (FIFO)
    holdings = defaultdict(deque)  # deque to store unit batches with acquisition price
    net_units = defaultdict(float)  # Store net units for each ISIN

    # Process each scheme in the DTtransaction
    for scheme in data["data"]:
        for transaction in scheme["dtTransaction"]:
            trxn_units = float(transaction["trxnUnits"])
            trxn_amount = float(transaction["trxnAmount"])
            trxn_date = datetime.strptime(transaction["trxnDate"], "%d-%b-%Y")
            isin = transaction["isin"]
            folio = transaction["folio"]

            # Create a unique key for each scheme-broker (folio) combination
            folio_key = (isin, folio)

            # Fetch the current NAV dynamically using mstarpy
            nav = get_current_nav(isin)
            if nav is None:
                print(f"Failed to retrieve NAV for ISIN: {isin}")
                continue  # Skip this scheme if NAV retrieval failed

            # For buying units
            if trxn_units > 0:
                # Track holdings with units and purchase price using FIFO
                holdings[folio_key].append({
                    'units': trxn_units,
                    'price': trxn_amount / trxn_units,  # Acquisition price per unit
                    'date': trxn_date
                })
                total_investment += trxn_amount  # Track total investment
                net_units[isin] += trxn_units  # Update net units for the ISIN

                # Prepare cash flows for XIRR calculation
                cash_flows.append(-trxn_amount)  # Outflow (investment)
                cash_flow_dates.append(trxn_date)

            # For selling units, apply FIFO
            elif trxn_units < 0:
                units_to_sell = abs(trxn_units)
                total_selling_value = 0
                acquisition_cost = 0
                units_sold = 0

                # Apply FIFO - Sell from oldest holdings
                while units_to_sell > 0 and holdings[folio_key]:
                    batch = holdings[folio_key][0]  # Get the oldest batch
                    available_units = batch['units']
                    if available_units <= units_to_sell:
                        # Sell all units from this batch
                        units_sold += available_units
                        acquisition_cost += available_units * batch['price']
                        total_selling_value += available_units * nav
                        units_to_sell -= available_units
                        holdings[folio_key].popleft()  # Remove the sold batch
                    else:
                        # Sell partial units from this batch
                        units_sold += units_to_sell
                        acquisition_cost += units_to_sell * batch['price']
                        total_selling_value += units_to_sell * nav
                        batch['units'] -= units_to_sell  # Update remaining units in batch
                        units_to_sell = 0  # All units sold

                net_units[isin] -= trxn_units  # Update net units for the ISIN
                total_current_value += total_selling_value
                gain = total_selling_value - acquisition_cost  # Calculate gain
                total_gain += gain

    # Calculate current value for remaining units and display per fund
    fund_values = {}
    for folio_key, batches in holdings.items():
        isin = folio_key[0]
        current_value = sum(batch['units'] * get_current_nav(isin) for batch in batches)
        total_current_value += current_value

        # Unrealized gain for remaining units
        unrealized_gain = sum((batch['units'] * get_current_nav(isin)) - (batch['units'] * batch['price']) for batch in batches)
        total_gain += unrealized_gain

        fund_values[isin] = {
            'net_units': net_units[isin],
            'net_value': current_value,
            'unrealized_gain': unrealized_gain
        }

    # Add final current value as inflow for XIRR
    cash_flows.append(total_current_value)
    cash_flow_dates.append(datetime.now())

    # Calculate XIRR
    try:
        xirr_value = calculate_xirr(cash_flows, cash_flow_dates)
    except ValueError as e:
        print(f"Error in XIRR calculation: {e}")
        xirr_value = "N/A"

    # Output results
    print("Fund-wise details:")
    for isin, values in fund_values.items():
        print(f"ISIN: {isin}, Net Units: {values['net_units']:.2f}, Net Value: {values['net_value']:.2f}, Unrealized Gain: {values['unrealized_gain']:.2f}")

    print(f"\nTotal Portfolio Value: {total_current_value:.2f}")
    print(f"Total Portfolio Gain: {total_gain:.2f}")
    print(f"XIRR: {xirr_value:.2f}%" if xirr_value != "N/A" else "XIRR: N/A")

# Main function to run the calculations
def main():
    json_file_path = "/content/input.json"  # Update with your actual path
    data = load_data_from_json(json_file_path)
    calculate_portfolio_values(data)

if __name__ == "__main__":
    main()


Fund-wise details:
ISIN: INF209K01UN8, Net Units: 291.05, Net Value: 19572.91, Unrealized Gain: 9572.91
ISIN: INF090I01JR0, Net Units: 296.10, Net Value: 0.00, Unrealized Gain: 0.00
ISIN: INF194K01Y29, Net Units: 147.12, Net Value: 26176.91, Unrealized Gain: 14677.43
ISIN: INF179K01WN9, Net Units: 989.85, Net Value: 760664.88, Unrealized Gain: 180693.88
ISIN: INF174KA1FQ7, Net Units: 5114.31, Net Value: 86712.63, Unrealized Gain: 36715.13
ISIN: INF174K01LT0, Net Units: 176.11, Net Value: 26880.00, Unrealized Gain: 18880.00
ISIN: INF200KA1507, Net Units: 4826.98, Net Value: 0.00, Unrealized Gain: 0.00
ISIN: INF200K01TP4, Net Units: 1515.70, Net Value: 409385.55, Unrealized Gain: 79402.05
ISIN: INF200K01T51, Net Units: 1930.06, Net Value: 406556.86, Unrealized Gain: 94572.46
ISIN: INF109K01VQ1, Net Units: 3.48, Net Value: 0.00, Unrealized Gain: 0.00
ISIN: INF109K01Q49, Net Units: 1.73, Net Value: 642.24, Unrealized Gain: 139.33
ISIN: INF109K01Z14, Net Units: 131.58, Net Value: 0.00, Unre