In [10]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
from fredapi import Fred
import warnings
warnings.filterwarnings("ignore")


In [11]:
df = pd.read_csv("data.csv", sep=";", decimal=",")
df = df.rename(columns={
    "Column1": "Date",
    "Column2": "SPX",
    "Column3": "S5SFTW",
    "Column4": "S5PHRM",
    "Column5": "S5CPGS",
    "Column6": "S5ENRSX",
    "Column7": "S5FDBT",
    "Column8": "S5TECH",
    "Column9": "S5RETL",
    "Column10": "S5BANKX",
    "Column11": "S5HCES",
    "Column12": "S5DIVF",
    "Column13": "S5UTILX",
    "Column14": "S5MEDA",
    "Column15": "S5REAL",
    "Column16": "S5TELSX",
    "Column17": "S5MATRX",
    "Column18": "S5INSU",
    "Column19": "S5FDSR",
    "Column20": "S5HOUS",
    "Column21": "S5SSEQX",
    "Column22": "S5TRAN",
    "Column23": "S5HOTR",
    "Column24": "S5CODU",
    "Column25": "S5AUCO",
    "Column26": "S5COMS",
})
df["Date"] = pd.to_datetime(df["Date"], format="%d/%m/%Y")

In [12]:
def GetReturn(df,date,lookback):
    date=pd.to_datetime(date)
    if date not in df["Date"].values:#add breaker if windows not in df
        raise ValueError("Date not in dataframe")
    returns_df = df[["Date","S5SFTW","S5PHRM","S5CPGS","S5ENRSX","S5FDBT","S5TECH","S5RETL","S5BANKX","S5HCES","S5DIVF","S5UTILX","S5MEDA","S5REAL","S5TELSX","S5MATRX","S5INSU","S5FDSR","S5HOUS","S5SSEQX","S5TRAN","S5HOTR","S5CODU","S5AUCO","S5COMS"]].copy()

    date_list=returns_df.drop(columns="Date")
    date_index = returns_df.index[returns_df["Date"] == date][0]
    returns_df=returns_df[(returns_df.index<=date_index) & (returns_df.index>=date_index-lookback) ]
    returns_df.drop(columns="Date",inplace=True)

    returns_df = np.log(returns_df/ returns_df.shift(1))
    returns_df.dropna(inplace=True)
    #print(returns_df.std().mean()) #verification if std is around 1% daily

    return returns_df


def GetReturnSPX(df,date,lookback):
    date=pd.to_datetime(date)
    if date not in df["Date"].values:#add breaker if windows not in df
        raise ValueError("Date not in dataframe")
    returns_df = df[["Date","SPX"]].copy()

    date_list=returns_df.drop(columns="Date")
    date_index = returns_df.index[returns_df["Date"] == date][0]
    returns_df=returns_df[(returns_df.index<=date_index) & (returns_df.index>=date_index-lookback) ]
    returns_df.drop(columns="Date",inplace=True)

    returns_df = np.log(returns_df/ returns_df.shift(1))
    returns_df.dropna(inplace=True)
    #print(returns_df.std().mean()) #verification if std is around 1% daily

    return returns_df

#Returns=GetReturn(df,"2020-05-11",lookback=180)
#ReturnsSPX=GetReturnSPX(df,"2020-05-11",lookback=180)

In [13]:
def GetSigma(df,date,lookback):
    returns_df=GetReturn(df,date,lookback=lookback)
    #covariance matric from returns_df
    sigma_windowed=returns_df.cov()

    return sigma_windowed

Sigma=GetSigma(df,"2020-05-11",lookback=180)

In [14]:
def GetRfDataframe(df):
    fred = Fred(api_key="5c742a53d96bd3085e9199dcdb5af60b")
    riskfree = fred.get_series('DFF')
    # riskfree = fred.get_series('DTB1MO')

    riskfree = riskfree.to_frame(name='FedFunds')
    riskfree.index.name = "Date"
    riskfree = riskfree[riskfree.index >= "2002-01-01"]
    riskfree["FedFunds"]=riskfree["FedFunds"]/100
    list_days_open = pd.to_datetime(df["Date"], dayfirst=True, errors="coerce")
    list_days_full = pd.to_datetime(riskfree.index, dayfirst=True, errors="coerce")

    list_days_open=[pd.to_datetime(date) for date in list_days_open]
    list_days_full=[pd.to_datetime(date) for date in list_days_full]


    list_days_open_pondered=[]
    riskfree_list=[]
    count_list=[]
    timestamp=0
    while timestamp < len(list_days_full)-1:

      if list_days_full[timestamp+1] in list_days_open:
            list_days_open_pondered.append(list_days_full[timestamp])
            riskfree_list.append(riskfree["FedFunds"].loc[list_days_full[timestamp]])
            count_list.append(1)
            timestamp += 1

      else:
          count = 0
          timestampbis = timestamp
          while (timestamp + 1 < len(list_days_full)) and (list_days_full[timestamp + 1] not in list_days_open):
              timestamp += 1
              count += 1

          list_days_open_pondered.append(list_days_full[timestampbis])  # jour de d√©part
          riskfree_list.append(riskfree["FedFunds"].loc[list_days_full[timestampbis]])
          count_list.append(count+1)
          timestamp += 1

    RfDf=pd.DataFrame({"Date":list_days_open_pondered,"Rf":riskfree_list,"Count":count_list})
    RfDf=RfDf.set_index("Date")
    return RfDf

def GetRiskFree(df,date,lookback,RfDf):
    positionOfStartDate=df.index[df["Date"]==pd.to_datetime(date)][0]-lookback
    #print(positionOfStartDate)
    startDate=pd.to_datetime(df.iloc[positionOfStartDate,0])
    endDate=pd.to_datetime(date)
    RfDf=RfDf[(RfDf.index >= startDate) & (RfDf.index <= endDate )].copy()
    CumulativeRf=[]

    for i in range(len(RfDf)):
      if i==0:
        CumulativeRf.append(pow((1+RfDf["Rf"].iloc[i]),(RfDf["Count"].iloc[i]/360)))
      else:
        CumulativeRf.append(pow((1+RfDf["Rf"].iloc[i]),(RfDf["Count"].iloc[i]/360))*CumulativeRf[i-1])

    RfDf["CumulativeRf"]=CumulativeRf
    RfDf["CumulativeRf"]= RfDf["CumulativeRf"]-1

    return RfDf["CumulativeRf"].iloc[-1]

RfDf=GetRfDataframe(df)

In [15]:
def GetWeight(df,date):
    #for the moment we will use the equal weight
    weight_vector=np.zeros((24,1))
    for i in range(0,24):
        weight_vector[i]=1/24

    return weight_vector
#Weight=GetWeight(df,"2020-05-11")


In [16]:
def GetLambda(df,date,timeofcalculation,RfDf):
    returns=GetReturn(df,date,timeofcalculation)
    weight_vector=GetWeight(df=0,date=0)

    mean_return=np.mean(np.dot(returns,weight_vector))
    mean_annual=(1+mean_return)**252-1


    rf_temps=GetRiskFree(df,date,timeofcalculation,RfDf)
    rf_annual=(1+rf_temps)**(252/timeofcalculation)-1


    Sigma=GetSigma(df,date,timeofcalculation)
    Sigma_annual=252*Sigma
    var = float((weight_vector.T @ Sigma_annual.values @ weight_vector).item())
    print(var)
    lambda_value=(mean_annual - rf_annual)/var


    excess = mean_annual - rf_annual
    sigma2 = var
    sigma  = np.sqrt(var)
    lam    = excess / sigma2
    sharpe = excess / sigma
    print("Excess:", excess, " Var:", sigma2, " Vol:", sigma, " Œª:", lam, " Sharpe:", sharpe)

    return lambda_value



Lambda=GetLambda(df,"2024-01-11",timeofcalculation=3500,RfDf=RfDf)

0.027917132194784967
Excess: 0.08824333617258762  Var: 0.027917132194784967  Vol: 0.1670842068981535  Œª: 3.1609026155298223  Sharpe: 0.5281369065980994


In [29]:
def GetPMatrix(df,date, lookback,proportion=3):
    #(date)
    #print(proportion)
    #print(lookback)
    AssetColumns=["S5SFTW","S5PHRM","S5CPGS","S5ENRSX","S5FDBT","S5TECH","S5RETL","S5BANKX","S5HCES","S5DIVF","S5UTILX","S5MEDA","S5REAL","S5TELSX","S5MATRX","S5INSU","S5FDSR","S5HOUS","S5SSEQX","S5TRAN","S5HOTR","S5CODU","S5AUCO","S5COMS"]
    bestperformer = []
    performerc = []
    returnBestPerformer=[]
    returnWorstPerformer=[]

    endDateIndex=df.index[df["Date"]==pd.to_datetime(date)][0]
    startDateIndex=df.index[df["Date"]==pd.to_datetime(date)][0]-lookback

    for i in range(2, df.shape[1]):  #loop through asset columns
        performerc.append((((float(df.iloc[endDateIndex, i]) / float(df.iloc[startDateIndex, i]) - 1) * 100), i - 2,df.columns[i])) #pos of best stock in a tuple with its return

    performerc.sort(reverse=True)
    #print(performerc)
    perfMarket= (float(df.iloc[endDateIndex, 1]) / float(df.iloc[startDateIndex, 1]) - 1) * 100
    #print(f"Market performance over the period : {perfMarket}%")




    for i in range(proportion):
        bestperformer.append(performerc[i][1])
        returnBestPerformer.append(performerc[i][0])


    P=np.zeros((proportion,24))
    Q=np.zeros((proportion,1))
    for lineview in range(proportion):
        for i in range(len(AssetColumns)):
            P[lineview,i]=-1/len(AssetColumns)
        P[lineview,bestperformer[lineview]]=1-1/len(AssetColumns)
        sum=0
        for i in range(len(AssetColumns)):
            sum+=P[lineview,i]
    for i in range(proportion):
        Q[i,0]=((returnBestPerformer[i]-perfMarket)/2)/100


    return P, Q
PMatrix,TempoQ=GetPMatrix(df,"2016-05-11",lookback=180,proportion=3)

In [37]:
def GetOmega(PMatrix, Sigma, c=0.99):
    #Omega is the uncertainty of the views

    factorC=(1/c-1)
    Omega=factorC*PMatrix@Sigma@np.transpose(PMatrix)

    return Omega


GetOmega(PMatrix,Sigma,0.2)

Unnamed: 0,0,1,2
0,0.000706,0.000357,0.00028
1,0.000357,0.000901,0.000427
2,0.00028,0.000427,0.000381


In [27]:
def LinkOmegaTau(Omega, P, Sigma):
    #Link omega to tau
    constant=36

    multiple= np.trace(np.transpose(P) @ np.linalg.inv(Omega) @ P) * constant
    numerator= np.trace(np.linalg.inv(Sigma*252))

    result= numerator / multiple
    return result



In [36]:
def BlackAndLittermanModel(backtestStartDate, rebalancingFrequency, lookbackPeriod, df,RfDf,confidence=0.25,proportion=4,tau=0.01,Lambda=3):
    #implement the full backtest of the black and litterman model

    #---------
    #PARAMETERS
    #---------
    Sigma=GetSigma(df,backtestStartDate,lookback=lookbackPeriod)
    PMatrix,Q= GetPMatrix(df,backtestStartDate, lookback=lookbackPeriod,proportion=proportion)
    Omega=GetOmega(PMatrix, Sigma, c=confidence)
    rf=GetRiskFree(df,backtestStartDate,lookbackPeriod,RfDf)
    weights = GetWeight(df, backtestStartDate)
    weights = np.array(weights).reshape(-1, 1)
    uimplied = Lambda * (Sigma @ weights) + rf
    #BL formula
    OmegaLinked=LinkOmegaTau(Omega,PMatrix,Sigma)
    #tau=OmegaLinked
    tau=0.01




    optimizedReturn=(np.linalg.inv(np.linalg.inv(tau*Sigma)+np.transpose(PMatrix)@np.linalg.inv(Omega)@PMatrix)) @ (np.linalg.inv(tau*Sigma)@uimplied+np.transpose(PMatrix)@np.linalg.inv(Omega)@Q)
    LambdaMarkowitz=Lambda

    #MarkowitzAllocation
    WeightBL=np.linalg.inv(Sigma)@(optimizedReturn-rf)/LambdaMarkowitz

    print(np.sum(WeightBL))

    if not np.isclose(float(np.sum(WeightBL)), 1.0, atol=1e-6):
        print(np.sum(WeightBL))
        raise ValueError("Weights do not sum to 1, please investigate.")

    return WeightBL


BlackAndLittermanModel("2018-05-11", rebalancingFrequency=3, lookbackPeriod=180, df=df,RfDf=RfDf)


0    1.0
dtype: float64


Unnamed: 0,0
0,-6.439281
1,-1.442308
2,-1.442308
3,7.016784
4,-1.442308
5,-1.442308
6,26.310788
7,-1.442308
8,-1.442308
9,-1.442308


In [None]:
import numpy as np

#---------
#PARAMETERS
#---------

free_asset=0 #proportion of risk free asset allocated in the benchmark
rf=0.046 #risk free rate
taux=0.01
RiskAversion=3.3

#kindly import it or calculate it !
Sigma = np.array([
    [0.0254, 0.0327, 0.0292, 0.0168],
    [0.0327, 0.0718, 0.0451, 0.0292],
    [0.0292, 0.0451, 0.0758, 0.0285],
    [0.0168, 0.0292, 0.0285, 0.0409]
], dtype=float)



#---------
#VIEWS
#---------


#Q :
confidence=2 #95
ViewMatrix=np.array([0.13,0.18,0.16,0.13]) #in %
ConfidenceMatrix=np.array([0.05,0.06,0.04,0.05])
OmegaMatrix=np.zeros((len(ViewMatrix),len(ViewMatrix)))

#ùõÄ
for i in range(OmegaMatrix.shape[0]):
    for j in range(OmegaMatrix.shape[1]):
      if i==j:
        OmegaMatrix[i][j]=(ConfidenceMatrix[i]/(confidence))**2

#ùõÄ-1
OmegaMatrixInverse = np.linalg.inv(OmegaMatrix)

#P
PMatrix=np.zeros((len(ViewMatrix),len(ViewMatrix)))
for i in range(PMatrix.shape[0]):
  PMatrix[i][i]=1


#---------
#BENCHMARK
#---------

def EW(ReturnofBenchmark,free_asset=0):
  tempoarray=np.zeros(len(¬µLT))
  for i in range(len(¬µLT)):
    tempoarray[i]=(1-free_asset)/len(¬µLT)
  return tempoarray


¬µLT=np.array([0.084,0.065,0.014,0.055])
weightBEW=EW(¬µLT)#possible to add free_asset liquity

#Perf of Benchmark
PerfOfBench=np.transpose(weightBEW)@¬µLT+free_asset*rf-rf
VarianceOfBench=np.transpose(weightBEW)@Sigma@weightBEW
lambdaOfBench=PerfOfBench/VarianceOfBench

¬µimplied=lambdaOfBench*Sigma@weightBEW+rf

#--------------
#MASTER FORMULA
#--------------


OptimisedReturn=(np.linalg.inv(np.linalg.inv(taux*Sigma)+np.transpose(PMatrix)@OmegaMatrixInverse@PMatrix))@(np.linalg.inv(taux*Sigma)@¬µimplied+np.transpose(PMatrix)@OmegaMatrixInverse@ViewMatrix)
print(OptimisedReturn)
print(" ")
#--------------------
#MARKOVITZ ALLOCATION
#--------------------

WeightBL=np.linalg.inv(Sigma)@(OptimisedReturn-rf)/RiskAversion
print(WeightBL)


RfWeightBL = 1.0 - float(WeightBL.sum())





In [31]:
from rich.console import Console
from rich.panel import Panel
from tqdm import tqdm

console = Console()

#BACK TESTER
dfbacktest=df.copy()
dfbacktest["Date"] = pd.to_datetime(df["Date"], format="%d/%m/%Y")
dfbacktest["MonthIndex"] = dfbacktest["Date"].dt.to_period("M")

df_length = dfbacktest.shape[1] - 2  # bcs of date and spx
last_rebalance = dfbacktest.loc[0, "Date"]  # premi√®re date
month_count = 0

# üé® AFFICHAGE STYL√â (sans prompts)
hold = 1
hist = 0
proportion = 3
Lambda=3
tau=0.01
confidence=0.00001

console.print(Panel.fit(
    "[bold cyan]üìä PORTFOLIO BACKTESTER[/bold cyan]\n"
    "[dim]Black-Litterman Model[/dim]",
    border_style="cyan"
))

console.print(f"\n[yellow]‚öôÔ∏è  Configuration :[/yellow]")
console.print(f"   ‚Ä¢ Hold period: [cyan]{hold}[/cyan] mois")
console.print(f"   ‚Ä¢ Historique: [cyan]{hist}[/cyan] mois")
console.print(f"   ‚Ä¢ Proportion: [cyan]{proportion}[/cyan]")
console.print(f"   ‚Ä¢ Lambda: [cyan]{Lambda:.4f}[/cyan]")
console.print(f"   ‚Ä¢ Confiance: [cyan]{confidence}[/cyan]")
console.print(f"   ‚Ä¢ Taux: [cyan]{tau}[/cyan]\n")

console.print("\n[yellow]‚è≥ Lancement du backtest...[/yellow]\n")

def Backtester(df,hold, hist, proportion,df_toBL, RfDf,confidence2,proportion2,tau2,Lambda2):
    #new dataframe for stock quantity

    StockQty = df.copy()
    StockQty.drop(columns="MonthIndex", inplace=True)
    start=181


    StockQty.loc[:, :] = 0
    #starting data
    MoneyAtStart = 10000000
    month_count=0
    CurrentValue=MoneyAtStart

    #first ligne
    StockQty.loc[start, "Money"] = MoneyAtStart
    StockQty.loc[start, "SPX"] = df.iloc[start, 1]
    StockQty.loc[start, "Date"] = df.iloc[start, 0]

    #start of the algorithm

    for i in tqdm(range(start,df.shape[0]), desc="Backtesting"):
      StockQty.iloc[i,0]=df.iloc[i,0]
      StockQty.iloc[i,1]=df.iloc[i,1]
      fees=0


      if df.loc[i, "Date"].month != df.loc[i-1, "Date"].month:
        month_count += 1


    # Si on atteint la p√©riode voulue
      if i>= hist and month_count % hold == 0 and df.loc[i, "Date"].month != df.loc[i - 1, "Date"].month:
        #print(f"üîÅ Rebalancement d√©clench√© √† la date : {df.loc[i, 'Date'].date()}")
        #print(str(df.iloc[i,0]))



        BLWeight=BlackAndLittermanModel(str(df.iloc[i,0]),3,3*22,df_toBL,RfDf,confidence=confidence2,proportion=proportion2,tau=tau2,Lambda=Lambda2)
        #print(len(BLWeight))
        for index in range(len(BLWeight)):
            StockQty.iloc[i,index+2]=(BLWeight.iloc[index,0]*CurrentValue)/df.iloc[i,index+2] #qty = weight*total value/price

      else :
        for stocks in range(2,StockQty.shape[1]-1):
          StockQty.iloc[i,stocks]=StockQty.iloc[i-1,stocks] #same qty
      #value of pf

      GainOrLoss = 0
      for stocks in range(2, StockQty.shape[1]-1):
        qty = StockQty.iloc[i, stocks]

        if qty != 0.0:
            price_now = df.iloc[i, stocks]
            price_prev = df.iloc[i - 1, stocks]
            GainOrLoss += qty * (price_now - price_prev)


      CurrentValue+=GainOrLoss-fees
      StockQty.iloc[i,-1]=CurrentValue

    StockQty = StockQty.iloc[start:].reset_index(drop=True)
    return StockQty
RfDf=GetRfDataframe(df)
final = Backtester(dfbacktest, hold=hold, hist=hist, proportion=proportion, df_toBL=df,RfDf=RfDf,confidence2=confidence,proportion2=proportion,tau2=tau,Lambda2=Lambda)

console.print("\n[green]‚úÖ Backtest termin√© avec succ√®s ![/green]\n")

Backtesting: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 5602/5602 [00:40<00:00, 139.07it/s]


In [32]:
import plotly.express as px
import pandas as pd

money_norm = (final["Money"]/10000000*100) - 100
spx_norm = (final["SPX"]/final["SPX"].iloc[0]*100) - 100

df_plot = pd.DataFrame({
    "Date": final["Date"],
    "Portfolio": money_norm,
    "SPX": spx_norm
}).melt(id_vars="Date", var_name="S√©rie", value_name="√âvolution en %")

fix = px.line(
    df_plot,
    x="Date",
    y="√âvolution en %",
    color="S√©rie",
    color_discrete_map={"SPX": "red", "Portfolio": "green"},
    title="Comparaison des √©volutions en %"
)

fix.update_layout(hovermode="x unified")
fix.show()


In [None]:
AnnualizedDf=final[["Date","SPX","Money"]]
AnnualizedDf['Date'] = pd.to_datetime(AnnualizedDf['Date'])
AnnualizedDf['Year'] = AnnualizedDf['Date'].dt.year



YearList=AnnualizedDf["Year"].unique()
SPXAnnualized=pd.DataFrame(columns=YearList)
StratAnnualized=pd.DataFrame(columns=YearList)



for year in YearList:
  compteurPerYear=0
  for i in AnnualizedDf.index:
    if AnnualizedDf.loc[i,"Year"]==year:
      if compteurPerYear==0:
        SPXAnnualized.loc[compteurPerYear,year]=AnnualizedDf.loc[i,"SPX"]
        StratAnnualized.loc[compteurPerYear,year]=AnnualizedDf.loc[i,"Money"]
      else :
        SPXAnnualized.loc[compteurPerYear,year]=AnnualizedDf.loc[i,"SPX"]/SPXAnnualized.loc[0,year]*100-100
        StratAnnualized.loc[compteurPerYear,year]=AnnualizedDf.loc[i,"Money"]/StratAnnualized.loc[0,year]*100-100
      compteurPerYear+=1

for year in YearList:
  SPXAnnualized.loc[0,year]=SPXAnnualized.loc[0,year]/SPXAnnualized.loc[0,year]*100-100
  StratAnnualized.loc[0,year]=StratAnnualized.loc[0,year]/StratAnnualized.loc[0,year]*100-100



SPXAvg=[]
StratAvg=[]
for i in SPXAnnualized.index:
  sumSPX=0
  sumStrat=0
  for year in SPXAnnualized.columns:
    sumSPX+=SPXAnnualized.loc[i,year]
    sumStrat+=StratAnnualized.loc[i,year]
  SPXAvg.append(sumSPX/len(YearList))
  StratAvg.append(sumStrat/len(YearList))

SPXAnnualized=SPXAnnualized.drop(columns=[2024,2002]) #too much nan
StratAnnualized=StratAnnualized.drop(columns=[2024,2002])

SPXAvg=[]
StratAVG=[]

for i in SPXAnnualized.index:
  sumSPX=0
  sumStrat=0
  for year in SPXAnnualized.columns:
    sumSPX+=SPXAnnualized.loc[i,year]
    sumStrat+=StratAnnualized.loc[i,year]
  SPXAvg.append(sumSPX/len(YearList))
  StratAVG.append(sumStrat/len(YearList))

dff = pd.DataFrame({"Index": (range(len(SPXAvg))),"Portfolio": StratAVG,"SPX": SPXAvg})


fig = px.line(dff, x="Index", y=["SPX","Portfolio"], color_discrete_map={"SPX": "red","Portfolio": "green"})
fig.show()

