In [1]:
# RSI PowerZone 30-70 Weekly
import numpy as np
import pandas as pd
from bokeh.plotting import figure, show

In [2]:
# Initialisation.
EXG = "LSE"
TIDM = "GHH"
filename = f"{EXG}_{TIDM}_prices.csv"
hist_start = "2003-03-19"
hist_end = "2021-08-31"
P_SMA = 40  # Simple Moving Average (SMA) look back period.
P_RSI = 4  # Relative Strength Index (RSI) look back period.
PZ1 = 30  # First RSI PowerZone entry level.
PZ2 = 25  # Second RSI PowerZone entry level.
PZE = 75  # RSI PowerZone exit level.
PS = 10000  # Position size in major currency unit.
C = 6  # Commission per trade (round trip).
SD = 0.0  # Stamp Duty percentage.
SL = 20  # Stop Loss percentage.

In [3]:
def rsi(close, period):
    """Relative Strength Index (RSI)."""
    df = pd.DataFrame(close)
    df["chg"] = df.close.diff()
    df["gain"] = df.chg.clip(lower=0)
    df["loss"] = df.chg.clip(upper=0).abs()
    df["avg_gain"] = np.nan
    df["avg_loss"] = np.nan
    df.avg_gain.iloc[period] = df.gain[0:period].sum() / period
    df.avg_loss.iloc[period] = df.loss[0:period].sum() / period

    for i in range(period + 1, df.shape[0]):
        df["avg_gain"].iloc[i] = (
            df["avg_gain"].iloc[i - 1] * (period - 1) + df["gain"].iloc[i]
        ) / period

        df["avg_loss"].iloc[i] = (
            df["avg_loss"].iloc[i - 1] * (period - 1) + df["loss"].iloc[i]
        ) / period

    df["rs"] = df.avg_gain / df.avg_loss
    df["rsi"] = 100 - 100 / (1 + df.rs)
    return df.rsi


def state_signal(entry_signal, exit_signal, period):
    """Calculate trade state signals."""
    df = pd.concat([entry_signal, exit_signal], axis=1)
    df.columns = ["entry", "exit"]
    df["state"] = 0
    for i in range(period, len(df)):
        if df.loc[df.index[i], "entry"] == 1 and df.loc[df.index[i - 1], "state"] == 0:
            df.loc[df.index[i], "state"] = 1
        elif df.loc[df.index[i], "exit"] == 1:
            df.loc[df.index[i], "state"] = 0
        else:
            df.loc[df.index[i], "state"] = df.loc[df.index[i - 1], "state"]
    return df.state


def trade_list(close, entry_flag, exit_flag):
    """Generate trade list."""
    ref = pd.concat([close, entry_flag, exit_flag], axis=1)
    ref.columns = ["close", "entry_flag", "exit_flag"]
    df = pd.DataFrame(columns=["entry_price", "exit_date", "exit_price"])
    df.entry_price = ref.close[ref.entry_flag == 1]
    df.exit_date = ref.index[ref.exit_flag == 1]
    df.exit_price = ref.close[ref.exit_flag == 1].values
    return df

In [4]:
# Import daily prices from SharePad csv file (minor currency unit).
df = pd.read_csv(
    filename,
    header=0,
    names=["date", "open", "high", "low", "close"],
    index_col=0,
    usecols=[0, 1, 2, 3, 4],
    parse_dates=True,
    dayfirst=True,
)

# Sort daily prices in ascending date order.
df = df.sort_index()

# Downsample daily prices to weekly frequency.
functions = dict(open="first", high="max", low="min", close="last")
df = df.resample("W-FRI").agg(functions)

# Convert prices from minor to major currency unit.
df = df / 100

# Set date range for analysis.
df = df.loc[hist_start:hist_end]

In [5]:
# Calulate Indicators.
df["sma"] = df.close.rolling(P_SMA).mean()
df["rsi"] = rsi(df.close, P_RSI)

In [6]:
# Trade system logic.

# Check closing price is above SMA.
df["trend"] = np.where(df.close > df.sma, 1, 0)

# Check for RSI cross below first PowerZone.
df["pz1"] = np.where(np.logical_and(df.rsi.shift(periods=1) >= PZ1, df.rsi < PZ1), 1, 0)

# Check whether RSI is below second PowerZone and price is above SMA.
df["pz2"] = np.where(np.logical_and(df.trend == 1, df.rsi < PZ2), 1, 0)

# Check for RSI cross above exit level.
df["pze"] = np.where(np.logical_and(df.rsi.shift(periods=1) < PZE, df.rsi >= PZE), 1, 0)

# Trade #1 entry signal.
df["sig1"] = np.where(np.logical_and(df.trend == 1, df.pz1 == 1), 1, 0)

# Trade #1 state signal.
df["ss1"] = state_signal(df.sig1, df.pze, P_SMA)

# Trade #2 entry signal.
df["sig2"] = np.where(np.logical_and(df.ss1.shift(periods=1) == 1, df.pz2 == 1), 1, 0)

# Trade #2 state signal.
df["ss2"] = state_signal(df.sig2, df.pze, P_SMA)

# Trade entry & exit flags.
df["en1"] = np.where(np.logical_and(df.ss1 == 1, df.ss1.shift(periods=1) == 0), 1, 0)
df["ex1"] = np.where(np.logical_and(df.ss1 == 0, df.ss1.shift(periods=1) == 1), 1, 0)
df["en2"] = np.where(np.logical_and(df.ss2 == 1, df.ss2.shift(periods=1) == 0), 1, 0)
df["ex2"] = np.where(np.logical_and(df.ss2 == 0, df.ss2.shift(periods=1) == 1), 1, 0)

In [7]:
# Trade list based on RSI exit only (no Stop Loss).

# Trade List #1.
td1 = trade_list(df.close, df.en1, df.ex1)

# Trade List #2.
td2 = trade_list(df.close, df.en2, df.ex2)

# Create full trade list.
td = pd.concat([td1, td2])

# Sort trades in ascending date order.
td = td.sort_index()

# Append trade percentage price change to trade list.
td["chg_pct"] = ((td.exit_price - td.entry_price) / td.entry_price) * 100

In [8]:
# Trade list incorporating Stop Loss.
tdm = pd.DataFrame(columns=["entry_price", "exit_date", "exit_price"])
tdm.entry_price = td.entry_price
tdm.exit_date = td.exit_date
tdm.exit_price = td.exit_price

for i in range(0, len(td)):
    # Loop through trade list based on RSI exit and check if stop loss is triggered.
    ps = pd.DataFrame(columns=["close", "stop", "exit"])
    ps.close = df.close.loc[td.index[i] : td.exit_date[i]]
    ps.stop = ps.close[0] * (1 - (SL / 100))
    ps.exit = np.where(ps.close < ps.stop, 1, 0)

    # Create modified trade list incorporating Stop Loss.
    if ps.exit.sum() > 0:
        exit_date = ps.exit.idxmax(axis=1, skipna=True)
        tdm.loc[tdm.index[i], "exit_date"] = exit_date
        tdm.loc[tdm.index[i], "exit_price"] = ps.close.loc[exit_date]
    else:
        tdm.loc[tdm.index[i], "exit_date"] = td.loc[td.index[i], "exit_date"]
        tdm.loc[tdm.index[i], "exit_price"] = td.loc[td.index[i], "exit_price"]

In [9]:
# Cumulative profit after costs.
tdm["weeks"] = (tdm.exit_date - tdm.index) / np.timedelta64(1, "W")
tdm["chg_pct"] = ((tdm.exit_price - tdm.entry_price) / tdm.entry_price) * 100
tdm["charges"] = C
tdm["stamp_duty"] = PS * (SD / 100)
tdm["profit"] = PS * (tdm.chg_pct / 100) - tdm.charges - tdm.stamp_duty
tdm["cum_profit"] = tdm.profit.cumsum()
tdm.round(2)

Unnamed: 0_level_0,entry_price,exit_date,exit_price,weeks,chg_pct,charges,stamp_duty,profit,cum_profit
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2005-03-25,1.9,2005-06-10,2.1,11.0,10.26,6,0.0,1020.32,1020.32
2005-04-29,1.8,2005-06-10,2.1,6.0,16.71,6,0.0,1665.31,2685.62
2006-05-26,3.45,2006-10-27,3.7,22.0,7.25,6,0.0,718.64,3404.26
2007-05-11,4.22,2007-07-06,4.75,8.0,12.43,6,0.0,1236.6,4640.87
2007-05-18,4.22,2007-07-06,4.75,7.0,12.43,6,0.0,1236.6,5877.47
2011-03-18,4.51,2011-05-06,5.56,7.0,23.28,6,0.0,2322.16,8199.63
2011-07-29,5.07,2011-09-09,3.92,6.0,-22.61,6,0.0,-2266.75,5932.88
2013-03-01,4.32,2013-05-03,4.7,9.0,8.92,6,0.0,886.24,6819.12
2013-07-12,4.75,2013-08-09,5.52,4.0,16.21,6,0.0,1615.05,8434.17
2014-03-14,6.72,2014-08-29,6.72,24.0,-0.04,6,0.0,-9.72,8424.45


In [10]:
# Trade Statistics
ts = pd.Series(dtype="string")

# Instrument identifier (Tradable Instrument Display Mnemonic)
ts["TIDM"] = TIDM

# Analysis start date (later than history start to allow for indicator calculation).
start_date = df.sma[df.sma.notnull()].index[0]
ts["Start Date"] = start_date.strftime("%Y-%m-%d")

# Analysis end date (coincides with history end).
end_date = df.index[-1]
ts["End Date"] = end_date.strftime("%Y-%m-%d")

# Analysis period.
analysis_years = (end_date - start_date) / np.timedelta64(1, "Y")
ts["Analysis Years"] = "{0:.1f}".format(analysis_years)

# Position size used for analysis.
ts["Position Size"] = "{0:,.2f}".format(PS)

# Calculate net profit.
ts["Net Profit"] = "{0:,.2f}".format(tdm.loc[tdm.index[-1], "cum_profit"])

# Calculate annualised percentage return.
analysis_days = (end_date - start_date) / np.timedelta64(1, "D")
annual_pct = (((PS + tdm.cum_profit[-1]) / PS) ** (365 / analysis_days) - 1) * 100
ts["Annual %"] = "{0:.1f}".format(annual_pct)

# Calculate expenses.
ts["Charges"] = "{0:,.2f}".format(tdm.charges.sum())
ts["Stamp Duty"] = "{0:,.2f}".format(tdm.stamp_duty.sum())

# Total trade statistics.
total_trades = len(tdm.index)
ts["Total Trades"] = "{0:.0f}".format(total_trades)

winning_trades = len(tdm[tdm.chg_pct > 0])
ts["Winning Trades"] = "{0:.0f}".format(winning_trades)

losing_trades = len(tdm[tdm.chg_pct < 0])
ts["Losing Trades"] = "{0:.0f}".format(losing_trades)

ts["Winning %"] = "{0:.1f}".format((winning_trades / total_trades) * 100)
ts["Trades per Year"] = "{0:.1f}".format(total_trades / analysis_years)
ts["Average Profit %"] = "{0:.1f}".format(tdm.chg_pct.mean())
ts["Average Profit"] = "{0:,.2f}".format(tdm.profit.mean())
ts["Average Weeks"] = "{0:.1f}".format(tdm.weeks.mean())

# Winning trade statistics.
ts["Average Winning Profit %"] = "{0:.1f}".format(tdm.chg_pct[tdm.chg_pct > 0].mean())
ts["Average Winning Profit"] = "{0:,.2f}".format(tdm.profit[tdm.profit > 0].mean())
ts["Average Winning Weeks"] = "{0:.1f}".format(tdm.weeks[tdm.chg_pct > 0].mean())

# Losing trade statistics.
ts["Average Losing Profit %"] = "{0:.1f}".format(tdm.chg_pct[tdm.chg_pct < 0].mean())
ts["Average Losing Profit"] = "{0:,.2f}".format(tdm.profit[tdm.profit < 0].mean())
ts["Average Losing Weeks"] = "{0:.1f}".format(tdm.weeks[tdm.chg_pct < 0].mean())
ts

TIDM                               GHH
Start Date                  2003-12-19
End Date                    2021-08-27
Analysis Years                    17.7
Position Size                10,000.00
Net Profit                   14,375.75
Annual %                           5.2
Charges                         114.00
Stamp Duty                        0.00
Total Trades                        19
Winning Trades                      17
Losing Trades                        2
Winning %                         89.5
Trades per Year                    1.1
Average Profit %                   7.6
Average Profit                  756.62
Average Weeks                     10.8
Average Winning Profit %           9.9
Average Winning Profit          979.54
Average Winning Weeks             10.3
Average Losing Profit %          -11.3
Average Losing Profit        -1,138.23
Average Losing Weeks              15.0
dtype: object

In [11]:
x = tdm.index
y = tdm.cum_profit
p = figure(
    title="Simple line example",
    x_axis_type="datetime",
    x_axis_label="x",
    y_axis_label="y",
)
p.line(x, y, legend_label="Temp.", line_width=2)
show(p)