In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cvxpy as cp
from sklearn.covariance import LedoitWolf

### Forex & Interest rate Data Processing and Excess Returns Calculation

In [None]:
fx_raw = pd.read_csv("FX_rates.csv")

currency_row = fx_raw.iloc[0, 1:].values
fx = fx_raw.iloc[1:].copy()
fx["Time Period"] = pd.PeriodIndex(fx["Time Period"], freq="M")
fx = fx.set_index("Time Period")
fx = fx.apply(pd.to_numeric, errors="coerce")


fx_adj = fx.copy()
for col, curr in zip(fx.columns, currency_row):
    if curr != "USD":
        fx_adj[col] = 1.0 / fx_adj[col]

delta_s = np.log(fx_adj).diff().dropna(how="all").round(6)


rf_raw = pd.read_csv("Risk Free Rates.csv")
currency_to_country = {
    "USD": "USD",
    "AUD": "AUSTRALIA", "CAD": "CANADA", "EUR": "EURO AREA",
    "BRL": "BRAZIL", "CHF": "SWITZERLAND", "DKK": "DENMARK",
    "GBP": "UNITED KINGDOM", "JPY": "JAPAN", "KRW": "KOREA",
    "ZAR": "SOUTH AFRICA", "TWD": "TAIWAN", "SGD": "SINGAPORE",
    "NZD": "NEW ZEALAND", "MXN": "MEXICO", "NOK": "NORWAY",
    "INR": "INDIA", "CNY": "CHINA", "SEK": "SWEDEN","HUF":"HUNGARY",
    "CZK":"CZECHIA","TRY":"TURKEY"
}
rf = rf_raw.rename(columns=currency_to_country)

fx_countries = delta_s.columns
for country in fx_countries:
    if country not in rf.columns:
        rf[country] = np.nan

rf["Time Period"] = pd.to_datetime(rf["Time Period"], format="%Y%m%d", errors='coerce')
rf = rf.set_index("Time Period")
rf = rf.apply(pd.to_numeric, errors="coerce")

rf_monthly = rf.resample('ME').mean().round(6)
rf_monthly.index = rf_monthly.index.to_period("M")


common_index = delta_s.index.intersection(rf_monthly.index)
delta_s = delta_s.loc[common_index]
rf_monthly = rf_monthly.loc[common_index]

usd_rf = rf_monthly["USD"]

excess_returns = pd.DataFrame(index=common_index, columns=delta_s.columns, dtype=float)

for col in delta_s.columns:
    if col not in rf_monthly.columns:
        continue

    mask = delta_s[col].notna() & rf_monthly[col].notna() & usd_rf.notna()
    excess_returns.loc[mask, col] = rf_monthly.loc[mask, col] - usd_rf.loc[mask] - delta_s.loc[mask, col]

excess_returns.round(6).drop(["INDIA", "CHINA", "KOREA"], axis=1).to_csv("FX_excess_returns.csv", na_rep="NaN")

## Full-Sample Markowitz Tangency Portfolio (With efficient frontier + cumulative return plot)

In [None]:
mu = excess_returns.mean().values          
cov = excess_returns.cov().values          

# Tangency Portfolio
def markowitz_tangency(mu, cov):
    inv_cov = np.linalg.inv(cov)
    ones = np.ones(len(mu))

    w = inv_cov @ mu
    w = w / (ones @ w)
    return w

w_full = markowitz_tangency(mu, cov)

print("Full-Sample Tangency Weights (Shorting Allowed):")
print(w_full)

# Efficient Frontier
def efficient_frontier(mu, cov, points=50):
    mu = mu.reshape(-1, 1)
    ones = np.ones((len(mu), 1))
    inv_cov = np.linalg.inv(cov)

    A = ones.T @ inv_cov @ ones   
    B = ones.T @ inv_cov @ mu       
    C = mu.T @ inv_cov @ mu         
    D = A*C - B**2                 

    target_returns = np.linspace(mu.min(), mu.max(), points)

    vols = []
    rets = []

    for r in target_returns:
        r = float(r)
        w = ((C - B*r)/D) * (inv_cov @ ones) + ((A*r - B)/D) * (inv_cov @ mu)
        w = w.flatten()
        vol = np.sqrt(w.T @ cov @ w)

        vols.append(vol)
        rets.append(r)

    return vols, rets

sigma, ret_list = efficient_frontier(mu, cov)

plt.figure(figsize=(8,6))
plt.plot(sigma, ret_list)
plt.title("Efficient Frontier — Short-Sale Allowed")
plt.xlabel("Volatility")
plt.ylabel("Return")
plt.grid(True)
plt.show()

# Cumulative Return of Tangency Portfolio
full_sample_portfolio_returns = excess_returns.values @ w_full
cum = (1 + full_sample_portfolio_returns).cumprod()

plt.figure(figsize=(8,5))
plt.plot(cum)
plt.title("Cumulative Return — Tangency Port (Shorting Allowed)")
plt.grid(True)
plt.show()

## Rolling Sample Markowitz Portfolio

In [None]:
# Tangency portfolio (short selling allowed)

def markowitz_tangency(mu, cov):
    mu = np.asarray(mu).reshape(-1)
    cov = np.asarray(cov)
    inv_cov = np.linalg.inv(cov)

    ones = np.ones(len(mu))

    w = inv_cov @ mu
    w = w / (ones @ w)   # budget constraint 1'w = 1

    return w

# Rolling Out-of-Sample Evaluation

window = 60
rets = excess_returns.copy()
oos = []

for t in range(window, len(rets) - 1):

    sample = rets.iloc[t-window:t]

    # convert to numpy
    mu_t = sample.mean().values
    cov_t = sample.cov().values

    w_t = markowitz_tangency(mu_t, cov_t)
    r_next = np.dot(rets.iloc[t+1].values, w_t)
    oos.append(r_next)

rolling_markowitz = pd.Series(oos, index=rets.index[window+1:])

## Naive 1/N Portfolio

In [None]:
# number of assets
N = excess_returns.shape[1]

# equal weights, short selling allowed
# (weights sum to 1, no sign restrictions needed)
w_equal = np.ones(N) / N

# out-of-sample returns aligned after 60-month window
naive_returns = (excess_returns @ w_equal).iloc[60:]

## Markowitz based on Shrinkage Estimaator

In [None]:
window = 60
rets = excess_returns.copy()

oos_lw = []

for t in range(window, len(rets)-1):

    sample = rets.iloc[t-window:t]
    mu_t = sample.mean().values

    # Ledoit–Wolf shrinkage covariance
    lw = LedoitWolf().fit(sample.values)
    cov_t = lw.covariance_

    # Tangency Portfolio
    inv_cov = np.linalg.inv(cov_t)
    w = inv_cov @ mu_t

    # budget constraint: 1'w = 1 (no sign restrictions)
    w = w / (np.ones(len(mu_t)) @ w)

    r_next = np.dot(rets.iloc[t+1].values, w)
    oos_lw.append(r_next)

lw_returns = pd.Series(oos_lw, index=rets.index[window+1:])

## Constrained Markowitz

In [None]:

# Tangency portfolio via mean-variance optimization
# Short-selling allowed, only sum(w)=1

def cvxpy_tangency(mu, cov, gamma=1e-6):
    n = len(mu)
    w = cp.Variable(n)

    objective = cp.Maximize(mu @ w - gamma * cp.quad_form(w, cov))
    constraints = [cp.sum(w) == 1]   # only sum constraint, short-selling allowed

    prob = cp.Problem(objective, constraints)

    try:
        prob.solve(solver=cp.ECOS)
    except:
        prob.solve(solver=cp.SCS, verbose=False)

    w_val = w.value

    if w_val is None:
        w_val = np.ones(n) / n

    # normalize to ensure sum = 1
    return w_val / np.sum(w_val)


# Rolling out-of-sample

window = 60
rets = excess_returns.copy()
oos_shrink = []

for t in range(window, len(rets)-1):
    sample = rets.iloc[t-window:t]

    mu_t = sample.mean().values
    cov_t = sample.cov().values

    w_t = cvxpy_tangency(mu_t, cov_t)
    r_next = np.dot(rets.iloc[t+1].values, w_t)
    oos_shrink.append(r_next)

cvx_returns = pd.Series(oos_shrink, index=rets.index[window+1:])

## Resampling based Estimation

In [None]:
def bootstrap_weights(sample, B=500):
    """
    Compute resampled tangency portfolio weights (Michaud)
    Short-selling allowed.
    
    sample : pd.DataFrame : window of returns
    B : int : number of bootstrap samples
    """
    N = sample.shape[1]
    weights = []

    for _ in range(B):
        # bootstrap indices
        idx = np.random.choice(len(sample), size=len(sample), replace=True)
        boot = sample.iloc[idx]

        # sample mean and cov
        mu_b = boot.mean().values
        cov_b = boot.cov().values

        # ensure covariance is invertible
        try:
            w_b = np.linalg.inv(cov_b) @ mu_b
            # normalize weights to sum to 1
            w_b = w_b / np.sum(w_b)
            weights.append(w_b)
        except np.linalg.LinAlgError:
            continue

    return np.mean(weights, axis=0)



# Rolling Out-of-Sample Portfolio

window = 60
rets = excess_returns.copy()
oos = []

for t in range(window, len(rets)-1):
    sample = rets.iloc[t-window:t]

    w_bar = bootstrap_weights(sample, B=300)

    r_next = np.dot(rets.iloc[t+1].values, w_bar)
    oos.append(r_next)

boot_returns = pd.Series(oos, index=rets.index[window+1:])

In [None]:
# run this before plotting
def ensure_ts(s):
    if s is None:
        return None
    s = s.dropna()
    if s.empty:
        return None
    if isinstance(s.index, pd.PeriodIndex):
        s = s.copy(); s.index = s.index.to_timestamp()
    return s

for name in ('excess_returns','w_full','naive_returns'):
    print(name, 'exists:', name in globals())

naive_plot = ensure_ts(globals().get('naive_returns'))
markowitz_plot = ensure_ts(globals().get('rolling_markowitz'))
print('naive_plot ok?', naive_plot is not None)

# Diagnostics
print("naive_returns head:\n", naive_returns.head())
print("non-null count:", naive_returns.dropna().shape[0])
print("index type:", type(naive_returns.index))
print("is PeriodIndex:", isinstance(naive_returns.index, pd.PeriodIndex))

## Rolling Sharpe Ratio Plot

In [None]:
def rolling_sharpe(return_series, window=36):
    return (return_series.rolling(window).mean() /
            return_series.rolling(window).std())

def _plot_sharpe_series(series, label, window=36):
    s = rolling_sharpe(series, window).dropna()
    # PeriodIndex to timestamps for matplotlib
    idx = s.index.to_timestamp() if hasattr(s.index, "to_timestamp") else s.index
    plt.plot(idx, s.values, label=label)

plt.figure(figsize=(10,6))
# _plot_sharpe_series(rolling_markowitz, "Markowitz-Sample Estimates")
_plot_sharpe_series(naive_returns, "Naive 1/N")
# _plot_sharpe_series(lw_returns, "Markowitz-Shrinkage Estimator")
# _plot_sharpe_series(cvx_returns, "Constrained")
# _plot_sharpe_series(boot_returns, "Resampled")
plt.legend()
plt.title("Rolling Sharpe Ratios (36-month window)")
plt.xlabel("Time Period")
plt.ylabel("Sharpe Ratio")
plt.grid(True,linestyle='--',linewidth=0.75)
plt.show()

## Cumulative Return Plot for All Methods

In [None]:
plt.figure(figsize=(10,6))

def _plot_cum(series, label):
	s = (1 + series).cumprod()
	idx = s.index.to_timestamp() if hasattr(s.index, "to_timestamp") else s.index
	plt.plot(idx, s.values, label=label)

# _plot_cum(rolling_markowitz, "Rolling Markowitz")
_plot_cum(naive_returns, "Naive 1/N")
# _plot_cum(lw_returns, "Shirnkage")
# _plot_cum(cvx_returns, "Constrained")
# _plot_cum(boot_returns, "Resampling")

plt.legend()
plt.title("Cumulative Wealth")
plt.xlabel("Time Period")
plt.ylabel("Cummlative Retunrs")
plt.grid(visible=True,linestyle='--',linewidth='0.75')
plt.show()

--------
The above program is completely written by Hema Srikar Ankem