In [None]:
# ========================
# 🧠 DISTRIBUTED VERSION WITH MPI FOR KUBEFLOW
# ========================

from mpi4py import MPI
import os
import time
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime

# MPI Init
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# CONFIG
API_KEY = "your_api_key_here"  # Replace with your actual API key
DATA_DIR = "historical_data"
PLOTS_DIR = "plots"
N_SIMULATIONS = 10000
N_DAYS = 252

if rank == 0:
    os.makedirs(DATA_DIR, exist_ok=True)
    os.makedirs(PLOTS_DIR, exist_ok=True)

comm.Barrier()  # Wait for all ranks to ensure folders exist

# Only rank 0 fetches the list of NASDAQ companies
def fetch_nasdaq_companies(api_key):
    url = f"https://financialmodelingprep.com/api/v3/nasdaq_constituent?apikey={api_key}"
    response = requests.get(url)
    response.raise_for_status()
    companies = response.json()
    df = pd.DataFrame(companies)
    df.to_csv("nasdaq_companies.csv", index=False)
    return df

# Download historical data (shared, but can be used in parallel if needed)
def download_historical_data(symbol, api_key):
    path = f"{DATA_DIR}/{symbol}.csv"
    if os.path.exists(path):
        return
    url = f"https://financialmodelingprep.com/api/v3/historical-price-full/{symbol}?serietype=line&timeseries=1000&apikey={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        if "historical" in data:
            df = pd.DataFrame(data["historical"])
            df['date'] = pd.to_datetime(df['date'])
            df = df.sort_values('date')
            df.to_csv(path, index=False)

# Simulation for a single symbol
def run_simulation(symbol):
    try:
        df = pd.read_csv(f"{DATA_DIR}/{symbol}.csv", parse_dates=["date"])
        prices = df['close']
        returns = prices.pct_change().dropna()
        mu = returns.mean()
        sigma = returns.std()
        last_price = prices.iloc[-1]

        simulations = np.zeros((N_DAYS, N_SIMULATIONS))
        for i in range(N_SIMULATIONS):
            price = last_price
            for t in range(N_DAYS):
                price *= (1 + np.random.normal(mu, sigma))
                simulations[t, i] = price
        return simulations
    except Exception as e:
        print(f"[ERROR] {symbol}: {e}")
        return None

# ------------------------ MAIN ------------------------

start_time = datetime.now()

if rank == 0:
    print("📥 Fetching NASDAQ list on Rank 0...")
    nasdaq_df = fetch_nasdaq_companies(API_KEY)
    symbols = nasdaq_df['symbol'].tolist()
else:
    symbols = None

# Broadcast list to all ranks
symbols = comm.bcast(symbols, root=0)

# Partition symbols among nodes
my_symbols = symbols[rank::size]

# Each rank processes its assigned symbols
partial_results = []
partial_simulations = []
partial_final_prices = []

for symbol in my_symbols:
    print(f"Rank {rank} processing {symbol}")
    download_historical_data(symbol, API_KEY)
    sims = run_simulation(symbol)
    if sims is None:
        continue

    ending_prices = sims[-1, :]
    initial_price = sims[0, 0]
    expected_price = np.mean(ending_prices)
    expected_return = (expected_price - initial_price) / initial_price
    std_dev = np.std(ending_prices)

    partial_results.append([symbol, initial_price, expected_price, expected_return, std_dev])
    partial_simulations.append(sims)
    partial_final_prices.append(ending_prices)

# Gather all results at rank 0
all_results = comm.gather(partial_results, root=0)
all_final_prices = comm.gather(partial_final_prices, root=0)
all_simulations = comm.gather(partial_simulations, root=0)

if rank == 0:
    print("🧩 Aggregating results on Rank 0...")
    flat_results = [item for sublist in all_results for item in sublist]
    flat_final_prices = [item for sublist in all_final_prices for item in sublist]
    flat_simulations = [item for sublist in all_simulations for item in sublist]

    df = pd.DataFrame(flat_results, columns=["symbol", "initial_price", "expected_price", "expected_return_1yr", "std_dev_1yr"])
    df.sort_values(by="expected_return_1yr", ascending=False, inplace=True)
    df.to_csv("nasdaq_simulation_summary.csv", index=False)

    if flat_final_prices:
        portfolio_distribution = np.mean(flat_final_prices, axis=0)
        plt.figure(figsize=(12, 6))
        plt.hist(portfolio_distribution, bins=60, color='lightgreen', edgecolor='black', alpha=0.8)
        plt.title("Equal-Weighted NASDAQ Portfolio – Final Price Distribution (1 Year)")
        plt.xlabel("Portfolio Value (Average of All Stocks)")
        plt.ylabel("Frequency")
        plt.grid(True)
        plt.tight_layout()
        plt.savefig(f"{PLOTS_DIR}/nasdaq_portfolio_distribution.png")
        plt.close()

    if flat_simulations:
        aggregate_index = np.mean(flat_simulations, axis=0)
        plt.figure(figsize=(12, 6))
        for i in range(min(100, aggregate_index.shape[1])):
            plt.plot(aggregate_index[:, i], linewidth=0.5, alpha=0.6)
        plt.title("Simulated NASDAQ Index (Equal-Weighted, 1 Year)")
        plt.xlabel("Trading Days")
        plt.ylabel("Index Value")
        plt.grid(True)
        plt.tight_layout()
        plt.savefig(f"{PLOTS_DIR}/nasdaq_simulated_index_paths.png")
        plt.close()

    duration = datetime.now() - start_time
    print(f"\n✅ All distributed simulations complete in {duration}.")


In [None]:
FROM python:3.11-slim

RUN apt-get update && apt-get install -y \
    libopenmpi-dev openmpi-bin \
    && pip install --no-cache-dir pandas numpy matplotlib requests mpi4py

WORKDIR /app
COPY . /app

ENTRYPOINT ["mpirun", "--allow-run-as-root", "-np", "10", "python", "main.py"]


In [None]:
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
  name: monte-carlo-nasdaq
spec:
  slotsPerWorker: 1
  runPolicy:
    cleanPodPolicy: Running
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
            - image: your-repo/monte-carlo:latest
              name: mpi-launcher
              volumeMounts:
                - name: shared-volume
                  mountPath: /app
          volumes:
            - name: shared-volume
              persistentVolumeClaim:
                claimName: your-pvc
    Worker:
      replicas: 10
      template:
        spec:
          containers:
            - image: your-repo/monte-carlo:latest
              name: mpi-worker
              volumeMounts:
                - name: shared-volume
                  mountPath: /app
