In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import numpy as np
import pandas as pd

# **Step 1** Load the S&P500 data into a pandas DataFrame

In [3]:
# Load the dataset from Google Drive
import pandas as pd

dataset_path = '/content/drive/MyDrive/2023SummerResearch/SPY.csv'  # Update the path according to your dataset location

SP500data = pd.read_csv(dataset_path)

In [4]:
SP500data

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2019-01-10,256.260010,259.160004,255.500000,258.880005,240.387314,96823900
1,2019-01-11,257.679993,259.010010,257.029999,258.980011,240.480194,73858100
2,2019-01-14,256.859985,258.299988,256.410004,257.399994,239.013062,70908200
3,2019-01-15,257.820007,260.700012,257.809998,260.350006,241.752304,85208300
4,2019-01-16,260.829987,261.970001,260.600006,260.980011,242.337341,77636700
...,...,...,...,...,...,...,...
775,2022-02-07,449.510010,450.989990,445.850006,447.260010,436.823853,84472900
776,2022-02-08,446.730011,451.920013,445.220001,450.940002,440.417999,81012000
777,2022-02-09,455.220001,457.880005,455.010010,457.540009,446.863983,92589900
778,2022-02-10,451.339996,457.709991,447.200012,449.320007,438.835785,140103700


# **Step 2:** Define the parameters for the Trend Intensity Indicator calculation

In [5]:
lookback = 60
what = 'Adj Close'
where = 'MA'

# **Step 3:** Define the ma function to calculate the moving average

In [6]:
def ma(Data, lookback, what, where):
    for i in range(len(Data)):
        try:
            Data[i, where] = np.mean(Data[i - lookback + 1:i + 1, what])
        except IndexError:
            pass
    return Data

**'Data'** which represents the input data set

**'lookback'** for the number of previous data points to consider

**'what'** as the column index or name for which to calculate the moving average

**'where'** as the column index or name where the moving average values will be stored.

Note: If there are not enough previous data points to form the lookback window, an **'IndexError'** may occur. The try-except block is used to catch this exception, and the pass statement is used to skip that iteration.


# **Step 4:** Define the deleter function to remove unnecessary columns from the data set

In [7]:
def deleter(Data, start, num_columns):
    return np.delete(Data, np.s_[start:start+num_columns], axis=1)

# **Step 5:** Define the trend_intensity_indicator function

In [8]:
def trend_intensity_indicator(Data, lookback, what, where):
    # Convert numpy array to DataFrame with column names
    Data = pd.DataFrame(Data, columns=['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume'])
    # Calculating the Moving Average
    Data[where] = Data[what].rolling(lookback).mean()

    for i in range(len(Data)):
        if Data.iloc[i][what] > Data.iloc[i][where]:
            Data.at[i, where + '_deviation_pos'] = Data.iloc[i][what] - Data.iloc[i][where]
        if Data.iloc[i][what] < Data.iloc[i][where]:
            Data.at[i, where + '_deviation_neg'] = Data.iloc[i][where] - Data.iloc[i][what]

    Data[where + '_up_periods'] = Data[where + '_deviation_pos'].rolling(lookback).count()
    Data[where + '_down_periods'] = Data[where + '_deviation_neg'].rolling(lookback).count()
    Data[where + '_trend_intensity'] = (Data[where + '_up_periods'] / (Data[where + '_up_periods'] + Data[where + '_down_periods'])) * 100

    Data = Data.drop(columns=[where, where + '_deviation_pos', where + '_deviation_neg'])

    return Data.values

# **Step 6:** Apply the trend_intensity_indicator function to the S&P data:

In [9]:
result_data = trend_intensity_indicator(SP500data.values, lookback, what, where)
result_dataframe = pd.DataFrame(result_data, columns=['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'MA_up_periods', 'MA_down_periods', 'MA_trend_intensity'])


  Data[where + '_up_periods'] = Data[where + '_deviation_pos'].rolling(lookback).count()
  Data[where + '_down_periods'] = Data[where + '_deviation_neg'].rolling(lookback).count()


In [10]:
result_dataframe

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,MA_up_periods,MA_down_periods,MA_trend_intensity
0,2019-01-10,256.26001,259.160004,255.5,258.880005,240.387314,96823900,0.0,0.0,
1,2019-01-11,257.679993,259.01001,257.029999,258.980011,240.480194,73858100,0.0,0.0,
2,2019-01-14,256.859985,258.299988,256.410004,257.399994,239.013062,70908200,0.0,0.0,
3,2019-01-15,257.820007,260.700012,257.809998,260.350006,241.752304,85208300,0.0,0.0,
4,2019-01-16,260.829987,261.970001,260.600006,260.980011,242.337341,77636700,0.0,0.0,
...,...,...,...,...,...,...,...,...,...,...
775,2022-02-07,449.51001,450.98999,445.850006,447.26001,436.823853,84472900,44.0,16.0,73.333333
776,2022-02-08,446.730011,451.920013,445.220001,450.940002,440.417999,81012000,43.0,17.0,71.666667
777,2022-02-09,455.220001,457.880005,455.01001,457.540009,446.863983,92589900,42.0,18.0,70.0
778,2022-02-10,451.339996,457.709991,447.200012,449.320007,438.835785,140103700,41.0,19.0,68.333333


# **Step 7:** Back Testing

In [11]:
def signal(Data, what, buy, sell):

    for i in range(len(Data)):

        if Data[i, what] > lower_barrier and Data[i - 1, what] <= lower_barrier:
            Data[i, buy] = 1

        if Data[i, what] < upper_barrier and Data[i - 1, what] >= upper_barrier:
            Data[i, sell] = -1