In [0]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
result = spark.sql("select * from global_temp.filled_raw_stock_price")

In [0]:
# result.display()

In [0]:
# ============================================================
# 1. Calculate daily returns 
# ============================================================

window = Window.partitionBy("ticker").orderBy("as_of_date")
df_with_prev = result.withColumn("prev_close", F.lag("close").over(window))

In [0]:
# Calculate daily return
df_returns = df_with_prev.withColumn(
    "daily_return",
    F.when(
        (F.col("prev_close").isNull()) | (F.col("prev_close") == 0), 0.0
    ).otherwise(
        (F.col("close") - F.col("prev_close")) / F.col("prev_close")
    )
)

In [0]:
# Replace inf with 0
df_returns = df_returns.withColumn(
    "daily_return",
    F.when(F.abs(F.col("daily_return")) == float("inf"), 0.0).otherwise(F.col("daily_return"))
)

In [0]:
# ============================================================
# 2. Compute mean returns per ticker 
# ============================================================
mean_returns_df = df_returns.groupBy("ticker").agg(
    F.mean("daily_return").alias("mean_return")
).orderBy("ticker")

tickers = [row["ticker"] for row in mean_returns_df.collect()]
mean_returns = np.array([row["mean_return"] for row in mean_returns_df.collect()])
num_assets = len(tickers)

In [0]:
# Annualize
annual_returns = mean_returns * 252

In [0]:
# ============================================================
# 3. Compute covariance matrix 
# ============================================================
# Pivot returns to wide format: each ticker as a column
returns_pivot = df_returns.select("as_of_date", "ticker", "daily_return") \
    .groupBy("as_of_date").pivot("ticker", tickers) \
    .agg(F.first("daily_return")) \
    .na.fill(0) \
    .orderBy("as_of_date")

# Collect as numpy array for covariance calculation
returns_cols = [row.asDict() for row in returns_pivot.collect()]
returns_matrix = np.array([[row[t] for t in tickers] for row in returns_cols])

# Covariance matrix (annualized)
cov_matrix = np.cov(returns_matrix, rowvar=False) * 252

In [0]:
# ============================================================
# 4. Portfolio performance helper
# ============================================================
def portfolio_performance(weights, annual_returns, cov_matrix):
    ret = np.dot(weights, annual_returns)
    vol = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
    return ret, vol

In [0]:
# ============================================================
# 5. Minimum Variance Portfolio
# ============================================================
def min_variance(annual_returns, cov_matrix):
    num = len(annual_returns)
    constraints = {"type": "eq", "fun": lambda w: np.sum(w) - 1}
    bounds = tuple((0, 1) for _ in range(num))
    initial = np.array([1 / num] * num)
    res = minimize(
        lambda w: np.sqrt(np.dot(w.T, np.dot(cov_matrix, w))),
        initial, method="SLSQP", bounds=bounds, constraints=constraints
    )
    return res

In [0]:
# ============================================================
# 6. Maximum Sharpe Ratio Portfolio
# ============================================================
def max_sharpe(annual_returns, cov_matrix, risk_free_rate=0.02):
    num = len(annual_returns)
    constraints = {"type": "eq", "fun": lambda w: np.sum(w) - 1}
    bounds = tuple((0, 1) for _ in range(num))
    initial = np.array([1 / num] * num)
    def neg_sharpe(w):
        ret, vol = portfolio_performance(w, annual_returns, cov_matrix)
        return -(ret - risk_free_rate) / vol
    res = minimize(neg_sharpe, initial, method="SLSQP", bounds=bounds, constraints=constraints)
    return res

In [0]:
# ============================================================
# 7. Monte Carlo simulation for Efficient Frontier
# ============================================================
num_portfolios = 10000
sim_results = np.zeros((3, num_portfolios))
risk_free_rate = 0.15

for i in range(num_portfolios):
    w = np.random.dirichlet(np.ones(num_assets))
    ret, vol = portfolio_performance(w, annual_returns, cov_matrix)
    sim_results[0, i] = ret
    sim_results[1, i] = vol
    sim_results[2, i] = (ret - risk_free_rate) / vol

In [0]:
# ============================================================
# 8. Optimal portfolios
# ============================================================
min_var_result = min_variance(annual_returns, cov_matrix)
max_sharpe_result = max_sharpe(annual_returns, cov_matrix, risk_free_rate)

min_var_ret, min_var_vol = portfolio_performance(min_var_result.x, annual_returns, cov_matrix)
max_sh_ret, max_sh_vol = portfolio_performance(max_sharpe_result.x, annual_returns, cov_matrix)


In [0]:
# ============================================================
# 9. Plot Efficient Frontier
# ============================================================
fig, ax = plt.subplots(figsize=(12, 8))

scatter = ax.scatter(sim_results[1], sim_results[0], c=sim_results[2], cmap="viridis", marker="o", s=10, alpha=0.5)
plt.colorbar(scatter, label="Sharpe Ratio")

ax.scatter(min_var_vol, min_var_ret, color="red", marker="*", s=300, label="Min Variance")
ax.scatter(max_sh_vol, max_sh_ret, color="gold", marker="*", s=300, label="Max Sharpe")

ax.set_xlabel("Annualized Volatility")
ax.set_ylabel("Annualized Return")
ax.set_title("Markowitz Efficient Frontier")
ax.legend()
plt.tight_layout()
plt.show()

In [0]:
# ============================================================
# 10. Print optimal portfolio weights
# ============================================================
print("=" * 50)
print("MINIMUM VARIANCE PORTFOLIO")
print(f"  Return: {min_var_ret:.4f}  |  Volatility: {min_var_vol:.4f}")
for t, w in zip(tickers, min_var_result.x):
    print(f"  {t}: {w:.4%}")

print("=" * 50)
print("MAXIMUM SHARPE RATIO PORTFOLIO")
print(f"  Return: {max_sh_ret:.4f}  |  Volatility: {max_sh_vol:.4f}")
print(f"  Sharpe: {(max_sh_ret - risk_free_rate) / max_sh_vol:.4f}")
for t, w in zip(tickers, max_sharpe_result.x):
    print(f"  {t}: {w:.4%}")