# Group Assignment QF627

*Group Members:*
* Anna Germaine Lim
* Peng Cheng
* Zenith Tay
* Gregory Tan

# Packages Used in This Workbook

In [1]:
## Data Download

import yfinance as yf
import numpy as np
import pandas as pd
from datetime import datetime

## For Visualisation

# Useful Functions Used in this Sheet

In [None]:
def download_data (ticker,
                   start_date: str | datetime,
                   end_date: str | datetime) -> pd.DataFrame:
    data =\
    (
        yf.download(tickers = ticker,
                    start = start_date,
                    end = end_date
                    )
    )

    return data

In [3]:
# preallocate empty array and assign slice by chrisaycock

def np_shift(arr, num, fill_value=np.nan):
    result = np.empty_like(arr)
    if num > 0:
        result[:num] = fill_value
        result[num:] = arr[:-num]
    elif num < 0:
        result[num:] = fill_value
        result[:num] = arr[-num:]
    else:
        result[:] = arr
    return result

### Mean Reversion Strategies

In [4]:
def bollinger_band(price_data: pd.Series,
                            window: int = 14,
                                                ) -> pd.Series:

    price = price_data[price_col]

    std_dev_series =\
    (
        price
        .rolling(window = window)
        .std()
    )

    price_high =\
    (
        price + 2*std_dev_series
    )

    price_low =\
    (
        price - 2*std_dev_series
    )

    return price_high, price_low

### Momentum

In [None]:
## Moving Average

def generate_moving_avg(price_data: pd.Series,
                        window: int
                                              ) -> pd.Series:
    
    ma_series =\
    (
        pd.Series
        (   
            price_data
            .rolling(window = window)
            .mean(),

            name = 'MA' + str(window)
        )
    )

    return ma_series

In [None]:
## Exponential Moving Average

def generate_EMA(price_data: pd.Series, 
                    window: int
                            ) -> pd.Series:
    EMA = pd.Series(price_data
                    .ewm(span = window,
                         min_periods = window)
                    .mean(),
                    name = "EMA_" + str(window)
                    )
    return EMA

In [None]:
def generate_moving_avg_cross_signal(long_ma: pd.Series,
                                    short_ma: pd.Series) -> pd.Series:
    
    ## Sanity Check
    if len(long_ma) != len(short_ma):
        print('MA series lengths not equal, please check')
        return

    ## Return Signals
    else:

        moving_avg_cross_positions = np.where(short_ma > long_ma, 1.0, 0.0)
        moving_avg_cross_positions = np.where(short_ma < long_ma, -1.0 , moving_avg_cross_positions)

        moving_avg_cross_signals = np.where(moving_avg_cross_positions - np_shift(moving_avg_cross_positions,1) > 0, 1, 0)
        moving_avg_cross_signals = np.where(moving_avg_cross_positions - np_shift(moving_avg_cross_positions, 1) < 0, -1, moving_avg_cross_signals)

        buy_or_sell = pd.DataFrame({'MA_Cross_Signal':moving_avg_cross_signals, 'MA_Cross_Position': moving_avg_cross_positions},
                                   index = long_ma.index
                                   )

        return buy_or_sell

In [None]:
## Rate of Change

def generate_rate_of_change(price_data: pd.Series,
                            n: int
                            ) -> pd.Series:
    
    ROC = pd.Series(
                        (price_data - price_data.diff(n)) / price_data.diff(n),
                        name = 'ROC'+str(n),
                        # index = price_data.index
                    )
    
    return ROC

In [9]:
def generate_rate_of_change_signal(roc_data: pd.Series) -> pd.Series:

    roc_position = pd.Series(np.where(roc_data > 0, 1.0, 0.0), index=roc_data.index, name = 'ROC_Position')
    roc_signal = roc_position.diff()
    roc_signal.name = 'ROC_Signal'
    
    # roc_signal = roc_position - np_shift(roc_position, 1)


    # buy_or_sell = pd.DataFrame({'ROC_Position': roc_position, 'ROC_Signal': roc_signal},
    #                         #    index = roc_data.index
    #                            )

    return pd.concat([roc_position, roc_signal], axis=1)

In [None]:
## RSI

def generate_RSI(series, period):
    
    delta = series.diff().dropna()
    
    u = delta * 0
    d = u.copy()
    
    u[delta > 0] = delta[delta > 0]
    d[delta < 0] = -delta[delta < 0]
    
    u[u.index[period - 1]] = np.mean( u[:period] ) # 
    
    u = u.drop(u.index[:(period - 1)
                      ]
              )
    
    d[d.index[period - 1]] = np.mean( d[:period] )
    
    d = d.drop(d.index[:(period - 1)
                      ]
              )
    
    rs = u.ewm(com = period - 1, adjust = False).mean() / \
         d.ewm(com = period - 1, adjust = False).mean()
    
    return 100 - 100 / (1 + rs)

In [133]:
def generate_rsi_signal(rsi_data: pd.Series) -> pd.Series:

    rsi_position = pd.Series(np.where(rsi_data > 50, 1.0, -1.0), 
                             index= rsi_data.index, 
                             name = 'rsi_position')
    
    rsi_signal = rsi_position.diff()
    rsi_signal.name = 'rsi_signal'
    
    # rsi_signal = rsi_position - np_shift(rsi_position, 1)


    # buy_or_sell = pd.DataFrame({'rsi_position': rsi_position, 'rsi_signal': rsi_signal},
    #                         #    index = rsi_data.index
    #                            )

    return pd.concat([rsi_position, rsi_signal], axis=1)

In [166]:
def generate_force_index(data_price: pd.Series, data_vol: pd.Series, period: int) -> pd.Series:

    indicator = data_price.diff() * data_vol
    lag_indicator = indicator.ewm(span = period, adjust = False).mean()

    return lag_indicator

In [174]:
def generate_force_index_signal(data: pd.Series) -> pd.Series:

    position = pd.Series(np.where(data >= 0, 1.0, -1.0),
                        index = data.index,
                        name = 'FI_position'
                        )
    signal = position.diff()/2
    signal.name = 'FI_signal'

    return pd.concat([position, signal], axis = 1)

## Performance Metrics

In [12]:
def annual_sharpe(returns):
    days = (returns.index[-1] - returns.index[0]).days
    
    return\
    (
        (
            (1+returns).prod()
            **(365/days) 
            - 1
        )
        /
        returns.std()
        /
        np.sqrt(252)
    )

In [13]:
## CAGR

def cagr(returns: pd.Series) -> float:
    days = (returns.index[-1] - returns.index[0]).days
    return ( (1 + returns).prod() )**(365/days) - 1   

In [14]:
### Max Drawdown

def max_drawdown(cumulative_returns):
    max_performance = cumulative_returns.cummax()
    dd = ((max_performance - cumulative_returns) / max_performance).max()
    return dd


### Longest Drawdown

def calculate_longest_drawdown(cumulative_returns):
    drawdown = cumulative_returns.cummax() - cumulative_returns
    period =\
    (
        np
        .diff(np
              .append(drawdown[drawdown == 0].index, 
                      drawdown.index[-1: ]
                    )
            )
    )
    return period.max() / np.timedelta64(1, "D")

In [100]:
def evaluate_returns(returns_series: pd.Series, to_print: bool = False):
    
    cum_returns_series = (1 + returns_series).cumprod()

    tot_returns = (1 + returns_series).prod()
    CAGR = cagr(returns_series)
    Annualised_Sharpe = annual_sharpe(returns_series)
    Max_DD = max_drawdown(cum_returns_series)
    Longest_DD = calculate_longest_drawdown(cum_returns_series)

    if to_print == True:
      print('-- Summary of Returns -- \n',
            f'Total Returns: {tot_returns: .2%} \n',
            f'CAGR: {CAGR: .2%} \n',
            f'Annualised_Sharpe: {Annualised_Sharpe: .2%} \n',
            f'Max Drawdown: {Max_DD: .2%} \n',
            f'Longest Drawdown (Days): {Longest_DD}'            
            )

    return pd.Series([tot_returns, CAGR, Annualised_Sharpe, Max_DD, Longest_DD])

## Packages

In [16]:
## Data Download

import yfinance as yf
import numpy as np
import pandas as pd
from datetime import datetime

## Download Dataset

In [192]:
# spy_data =\
(
    download_data('SPY',
                  start_date = '2006-11-01',
                  end_date = '2025-11-12')
    .droplevel(level = 1,
               axis = 1)
    [['Close', 'Volume']]
).resample('W-FRI').agg({'Close': 'last', 'Volume': 'sum'})

  yf.download(tickers = ticker,
[*********************100%***********************]  1 of 1 completed


Price,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2006-11-03,95.749245,215045100
2006-11-10,96.941330,359047800
2006-11-17,98.470078,365694400
2006-11-24,98.420952,197045400
2006-12-01,98.329857,491307200
...,...,...
2025-10-17,664.390015,457106900
2025-10-24,677.250000,337267400
2025-10-31,682.059998,374234900
2025-11-07,670.969971,395772100


In [194]:
train_proportion = 0.75

spy_data =\
(
    download_data('SPY',
                  start_date = '2006-11-01',
                  end_date = '2025-11-12')
    .droplevel(level = 1,
               axis = 1)
    [['Close', 'Volume']]
).resample('W-FRI').agg({'Close': 'last', 'Volume': 'sum'})

spy_train_data = spy_data[:int(train_proportion*len(spy_data))]
spy_test_data = spy_data[int(train_proportion*len(spy_data)):]


spy_data_close = spy_train_data['Close'].to_frame()
spy_data_returns = spy_train_data['Close'].pct_change().to_frame().rename(columns= {'Close': 'Returns'})

  yf.download(tickers = ticker,
[*********************100%***********************]  1 of 1 completed


In [195]:
spy_data_returns

Unnamed: 0_level_0,Returns
Date,Unnamed: 1_level_1
2006-11-03,
2006-11-10,0.012450
2006-11-17,0.015770
2006-11-24,-0.000499
2006-12-01,-0.000926
...,...
2021-01-08,0.019739
2021-01-15,-0.014583
2021-01-22,0.019111
2021-01-29,-0.033457


In [196]:
evaluate_returns(spy_data_returns['Returns'], to_print=True)

-- Summary of Returns -- 
 Total Returns:  379.10% 
 CAGR:  9.79% 
 Annualised_Sharpe:  23.57% 
 Max Drawdown:  54.61% 
 Longest Drawdown (Days): 1771.0


0       3.791022
1       0.097898
2       0.235730
3       0.546131
4    1771.000000
dtype: float64

## Momentum Strategies

In [197]:
time_periods = np.arange(10, 201, 10).tolist()

#### ROC

In [198]:
def generate_ROC_Metrics(time_periods):

    ROC_Metrics = pd.DataFrame()

    for i in time_periods:
        roc_data = spy_data_returns.copy()
        
        roc_data[f'ROC{i}'] = generate_rate_of_change(spy_data_close['Close'], i)
        roc_data = pd.concat([roc_data, generate_rate_of_change_signal(roc_data[f'ROC{i}'])], axis = 1)
        roc_data['Strat_returns'] = roc_data['Returns'] * roc_data['ROC_Position'].shift(1)
        
        # print(f'===Data for ROC{i}===')
        roc_series = evaluate_returns(roc_data['Strat_returns'])
        roc_series.name = f'ROC{i}'
        
        ROC_Metrics = pd.concat([ROC_Metrics, roc_series], axis = 1)

    ROC_Metrics.index = ['Total Returns', 'CAGR', 'Annualised Sharpe', 'Max Drawdown', 'Longest Drawdown (Days)']
    return ROC_Metrics.T

In [210]:
generate_ROC_Metrics(time_periods).sort_values(by = 'Annualised Sharpe', ascending=False)

Unnamed: 0,Total Returns,CAGR,Annualised Sharpe,Max Drawdown,Longest Drawdown (Days)
ROC100,3.546123,0.092771,0.321478,0.31829,357.0
ROC70,3.541795,0.092678,0.321278,0.202251,714.0
ROC170,3.467794,0.091062,0.320582,0.31829,315.0
ROC150,3.704476,0.096122,0.319853,0.31829,315.0
ROC190,3.363655,0.088733,0.319186,0.31829,315.0
ROC200,3.302024,0.087323,0.315491,0.31829,427.0
ROC90,3.555622,0.092976,0.315145,0.31829,357.0
ROC80,3.403233,0.089626,0.310517,0.212966,938.0
ROC180,3.290073,0.087046,0.309823,0.31829,322.0
ROC130,3.400761,0.08957,0.303794,0.332358,315.0


In [200]:
def generate_RSI_Metrics(time_periods):

    Metrics = pd.DataFrame()

    for i in time_periods:
        data = spy_data_returns.copy()
        
        data[f'RSI{i}'] = generate_RSI(spy_data_close['Close'], i)
        
        data = pd.concat([data, generate_rsi_signal(data[f'RSI{i}'])], axis = 1)
        
        data['Strat_returns'] = data['Returns'] * data['rsi_position'].shift(1)
        
        # print(f'===Data for ROC{i}===')
        strat_series = evaluate_returns(data['Strat_returns'])
        strat_series.name = f'RSI{i}'
        
        Metrics = pd.concat([Metrics, strat_series], axis = 1)

    Metrics.index = ['Total Returns', 'CAGR', 'Annualised Sharpe', 'Max Drawdown', 'Longest Drawdown (Days)']
    return Metrics.T

In [201]:
generate_RSI_Metrics(range(10, 252, 10)).sort_values(by = 'Annualised Sharpe', ascending = False)

Unnamed: 0,Total Returns,CAGR,Annualised Sharpe,Max Drawdown,Longest Drawdown (Days)
RSI180,2.674247,0.071372,0.171632,0.559959,3073.0
RSI250,2.481152,0.065759,0.158098,0.578604,3199.0
RSI190,2.471124,0.065457,0.157368,0.580307,3206.0
RSI200,2.396999,0.063185,0.151893,0.592896,3227.0
RSI210,2.347381,0.061627,0.14814,0.601323,3241.0
RSI220,2.246125,0.058352,0.140248,0.61852,3465.0
RSI240,2.243045,0.05825,0.140003,0.619043,3465.0
RSI80,2.079754,0.052659,0.126539,0.574706,3010.0
RSI230,2.063852,0.052093,0.125176,0.649477,3899.0
RSI170,1.934571,0.047334,0.113722,0.604903,3227.0


In [202]:
def generate_ma_cross_metrics(short_ma: list[int], long_ma: list[int]) -> pd.Series:
    
    ma_cross_data = spy_data_close.copy()

    ma_cross_summary_table = pd.DataFrame()

    for j in long_ma:

        for i in short_ma:

            if i < j:
            
                short_ma_cross_data = generate_moving_avg(ma_cross_data['Close'], i)
                long_ma_cross_data = generate_moving_avg(ma_cross_data['Close'], j)

                ma_cross_returns =\
                (
                    pd
                    .concat(
                            (spy_data_returns, generate_moving_avg_cross_signal(long_ma_cross_data, short_ma_cross_data)),
                            axis = 1
                            )
                )

                ma_cross_returns['Strat_Returns'] = ma_cross_returns['Returns']*ma_cross_returns['MA_Cross_Position'].shift(1)

                ma_cross_summary_stat_series = evaluate_returns(ma_cross_returns['Strat_Returns'])
                ma_cross_summary_stat_series.name = f'MA{i} + MA{j}'


                ma_cross_summary_table = pd.concat([ma_cross_summary_table, ma_cross_summary_stat_series], axis = 1)

    ma_cross_summary_table.index = ['Total Returns', 'CAGR', 'Annualised Sharpe', 'Max Drawdown', 'Longest Drawdown (Days)']
    return ma_cross_summary_table.T

In [203]:
MA_list =\
(
    list(range(10, 201, 10))
) 

In [211]:
ma_cross_summary = generate_ma_cross_metrics(MA_list, MA_list)
ma_cross_summary.sort_values(by = 'Total Returns', ascending = False).head(10)

Unnamed: 0,Total Returns,CAGR,Annualised Sharpe,Max Drawdown,Longest Drawdown (Days)
MA40 + MA90,4.466238,0.110583,0.27823,0.437184,1589.0
MA40 + MA160,4.349126,0.108516,0.341281,0.31829,315.0
MA40 + MA150,4.094129,0.103832,0.322501,0.31829,385.0
MA30 + MA150,4.06353,0.103252,0.320682,0.31829,399.0
MA50 + MA70,4.056251,0.103113,0.256137,0.46421,1715.0
MA50 + MA150,4.055235,0.103094,0.320187,0.31829,399.0
MA30 + MA100,3.999715,0.102029,0.257375,0.443429,1687.0
MA30 + MA160,3.986587,0.101775,0.319881,0.31829,315.0
MA140 + MA180,3.979286,0.101633,0.322653,0.31829,581.0
MA50 + MA160,3.925162,0.100576,0.316081,0.31829,315.0


In [205]:
def generate_ema_cross_metrics(short_ma: list[int], long_ma: list[int]) -> pd.Series:
    
    ma_cross_data = spy_data_close.copy()
    ma_cross_summary_table = pd.DataFrame()

    for j in long_ma:

        for i in short_ma:

            if i < j:
            
                short_ma_cross_data = generate_EMA(ma_cross_data['Close'], i)             
                long_ma_cross_data = generate_EMA(ma_cross_data['Close'], j)

                ma_cross_returns =\
                (
                    pd
                    .concat(
                            (spy_data_returns, generate_moving_avg_cross_signal(long_ma_cross_data, short_ma_cross_data)),
                            axis = 1
                            )
                )

                ma_cross_returns['Strat_Returns'] = ma_cross_returns['Returns']*ma_cross_returns['MA_Cross_Position'].shift(1)

                ma_cross_summary_stat_series = evaluate_returns(ma_cross_returns['Strat_Returns'])
                ma_cross_summary_stat_series.name = f'EMA{i} + EMA{j}'


                ma_cross_summary_table = pd.concat([ma_cross_summary_table, ma_cross_summary_stat_series], axis = 1)

    ma_cross_summary_table.index = ['Total Returns', 'CAGR', 'Annualised Sharpe', 'Max Drawdown', 'Longest Drawdown (Days)']
    return ma_cross_summary_table.T

In [206]:
generate_ema_cross_metrics(MA_list, MA_list).sort_values(by = 'Annualised Sharpe', ascending=False)

Unnamed: 0,Total Returns,CAGR,Annualised Sharpe,Max Drawdown,Longest Drawdown (Days)
EMA20 + EMA190,4.279346,0.107260,0.346182,0.318290,315.0
EMA20 + EMA200,4.105233,0.104042,0.341948,0.318290,315.0
EMA40 + EMA180,4.178071,0.105403,0.334739,0.318290,315.0
EMA10 + EMA200,3.988189,0.101806,0.334526,0.318290,315.0
EMA30 + EMA200,3.988189,0.101806,0.334526,0.318290,315.0
...,...,...,...,...,...
EMA10 + EMA40,1.294045,0.018230,0.044266,0.554401,1715.0
EMA20 + EMA50,1.147756,0.009705,0.023617,0.490311,3136.0
EMA10 + EMA30,1.095521,0.006414,0.015498,0.554401,2177.0
EMA10 + EMA60,1.020466,0.001421,0.003480,0.603809,1946.0


In [207]:
def generate_fi_Metrics(time_periods):

    Metrics = pd.DataFrame()

    for i in time_periods:

        data = spy_data_returns.copy()
        
        data[f'fi{i}'] = generate_force_index(spy_data['Close'], spy_data['Volume'], i)
        
        data = pd.concat([data, generate_force_index_signal(data[f'fi{i}'])], axis = 1)
        
        data['Strat_returns'] = data['Returns'] * data['FI_position'].shift(1)
        
        # print(f'===Data for ROC{i}===')
        strat_series = evaluate_returns(data['Strat_returns'])
        strat_series.name = f'FI{i}'
        
        Metrics = pd.concat([Metrics, strat_series], axis = 1)

    Metrics.index = ['Total Returns', 'CAGR', 'Annualised Sharpe', 'Max Drawdown', 'Longest Drawdown (Days)']
    return Metrics.T

In [208]:
generate_force_index_signal(generate_force_index(spy_data['Close'], spy_data['Volume'], 14))

Unnamed: 0_level_0,FI_position,FI_signal
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2006-11-03,-1.0,
2006-11-10,1.0,1.0
2006-11-17,1.0,0.0
2006-11-24,1.0,0.0
2006-12-01,1.0,0.0
...,...,...
2025-10-17,1.0,0.0
2025-10-24,1.0,0.0
2025-10-31,1.0,0.0
2025-11-07,1.0,0.0


In [209]:
generate_fi_Metrics(list(range(10,251,10))).sort_values(by = 'Annualised Sharpe', ascending=False)

Unnamed: 0,Total Returns,CAGR,Annualised Sharpe,Max Drawdown,Longest Drawdown (Days)
FI10,0.6252,-0.032382,-0.077717,0.484253,4354.0
FI40,0.448521,-0.054644,-0.131192,0.725769,4354.0
FI20,0.447686,-0.054767,-0.131488,0.722403,4354.0
FI50,0.427739,-0.057782,-0.138736,0.751453,4354.0
FI30,0.413664,-0.059989,-0.144042,0.715841,4354.0
FI60,0.398527,-0.062442,-0.149941,0.768427,4354.0
FI70,0.398339,-0.062473,-0.150015,0.749207,4354.0
FI80,0.330146,-0.074729,-0.179515,0.796984,4354.0
FI110,0.327698,-0.075212,-0.180677,0.802883,4354.0
FI100,0.326009,-0.075547,-0.181483,0.803711,4354.0
