In [35]:
import numpy as np
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
import seaborn as sns
from math import floor, trunc, sqrt

In [36]:
# Define the stock symbol and time period
stock_symbol = 'HINDUNILVR.NS'
title='Tata Consultancy Services Limited (INR)'
start_date = '2010-01-01'
end_date = '2022-08-01'
read_file = 'nifty50_stocks.csv'

In [37]:
sp500 = pd.read_csv(read_file, parse_dates=['Date'])

# Create a copy of the slice
df = sp500[sp500['Symbol'] == stock_symbol].copy().set_index('Date')

# Now you can safely modify the copy without the warning
df.loc[:, 'pct_ch'] = df['Close'].pct_change()

ADDING SIGNALS BASED ON TECHNICAL INDIATORS

In [38]:
def ADX(df, n=14):
    # Calculate the differences between consecutive highs and lows
    dH = df['High'].diff().fillna(0)
    dL = -df['Low'].diff().fillna(0)

    # Calculate DMIp and DMIn
    DMIp = (dH > dL) * dH
    DMIn = (dH < dL) * dL

    # Calculate True Range
    tr = df['High'] - df['Low']
    TRsum = tr.rolling(window=n).sum()

    # Calculate DIp and DIn
    DIp = 100 * DMIp.rolling(window=n).sum() / TRsum
    DIn = 100 * DMIn.rolling(window=n).sum() / TRsum

    # Calculate DX
    DX = 100 * abs(DIp - DIn) / (DIp + DIn)

    # Calculate ADX using an Exponential Moving Average (EMA)
    ADX = DX.ewm(span=n, adjust=False).mean()

    return ADX
df['ADX'] = ADX(df)

In [39]:
def true_range(df):
    true_high = df['High'].combine(df['Close'].shift(1), max)
    true_low = df['Low'].combine(df['Close'].shift(1), min)
    tr = true_high - true_low
    return tr

def ATR(df, n=14):
    tr = true_range(df)
    atr = tr.ewm(span=n, adjust=False).mean() # Using Exponential Moving Average (EMA)

    return atr
df['ATR'] = ATR(df)

In [40]:
def bollinger_feature(df, n=20, sd=2):
    # Calculate the moving average (SMA)
    mavg = df['Close'].rolling(window=n).mean()

    # Calculate the standard deviation
    sdev = df['Close'].rolling(window=n).std()

    # Calculate the upper and lower bands
    up = mavg + sd * sdev
    dn = mavg - sd * sdev

    # Calculate the percentage B (pctB)
    pctB = (df['Close'] - dn) / (up - dn)

    # Calculate the band width
    bandwidth = (up - dn) / mavg

    # Calculate the slope of the moving average (trend direction)
    trend_direction = mavg.diff()

    # Combine the features into a single value
    bollinger_value = pctB * bandwidth * trend_direction

    return bollinger_value
df['BollingerB'] = bollinger_feature(df)

In [41]:
def cci(df, n=20, c=0.015):
    # Calculate the typical price
    typical_price = (df['High'] + df['Low'] + df['Close']) / 3

    # Calculate the moving average (SMA) of the typical price
    mavg = typical_price.rolling(window=n).mean()

    # Calculate the mean absolute deviation
    mean_dev = typical_price.rolling(window=n).apply(lambda x: abs(x - x.mean()).mean())

    # Calculate the CCI
    cci = (typical_price - mavg) / (c * mean_dev)

    return cci
df['CCI'] = cci(df)

In [42]:
def KST(df, n=[10,10,10,15], nROC=[10,15,20,30], nSig=9, wts=[1,1,1,1]):
    def ROC(series, n):
        return (series - series.shift(n)) / series.shift(n)

    def SMA(series, n):
        return series.rolling(window=n).mean()

    ret = pd.DataFrame()
    for i in range(len(n)):
        roc = ROC(df['Close'], nROC[i])
        ma_roc = SMA(roc, n[i]) * wts[i]
        ret = pd.concat([ret, ma_roc], axis=1)

    kst = 100 * ret.sum(axis=1)
    kst_signal = SMA(kst, nSig)

    return kst
df['KST'] = KST(df)

In [43]:
def MACD(df, nFast=12, nSlow=26, nSig=9, percent=True):
    def EMA(series, n):
        return series.ewm(span=n, adjust=False).mean()

    mavg_fast = EMA(df['Close'], nFast)
    mavg_slow = EMA(df['Close'], nSlow)

    if percent:
        macd = 100 * (mavg_fast / mavg_slow - 1)
    else:
        macd = mavg_fast - mavg_slow

    signal = EMA(macd, nSig)

    return macd
df['MACD'] = MACD(df)

In [44]:
def RSI(df, n=14):
    delta = df['Close'].diff()
    up = delta.where(delta > 0, 0)
    down = -delta.where(delta < 0, 0)

    # Calculate the Exponential Moving Average (EMA) for up and down changes
    mavg_up = up.ewm(span=n, adjust=False).mean()
    mavg_down = down.ewm(span=n, adjust=False).mean()

    rsi = 100 * mavg_up / (mavg_up + mavg_down)
    return rsi
df['RSI'] = RSI(df)

In [45]:
def SMA(df, n=10):  # Simple Moving Average
    return df['Close'].rolling(window=n).mean()

def EMA(df, n=10, wilder=False):    # Exponential Moving Average
    if wilder:
        ratio = 1 / n
    else:
        ratio = 2 / (n + 1)
    return df['Close'].ewm(adjust=False, alpha=ratio).mean()

def DEMA(df, n=10, v=1, wilder=False):  # Double Exponential Moving Average
    ema1 = EMA(df, n, wilder)
    ema2 = EMA(df.assign(Close=ema1), n, wilder)
    return (1 + v) * ema1 - ema2 * v

def WMA(df, n=10, wts=None):    # Weighted Moving Average
    if wts is None:
        wts = list(range(1, n + 1))
    return df['Close'].rolling(window=n).apply(lambda x: (x * wts).sum() / sum(wts))

def EVWMA(df, n=10):  # Elastic Volume-Weighted Moving Average
    evwma = [0] * len(df)
    volume_sum = 0
    for i in range(n):
        volume_sum += df['Volume'].iloc[i]
        evwma[i] = df['Close'].iloc[i]  # Initialize first n values with the close price
    for i in range(n, len(df)):
        volume_sum = volume_sum + df['Volume'].iloc[i] - df['Volume'].iloc[i - n]
        ratio = df['Volume'].iloc[i] / volume_sum
        evwma[i] = evwma[i - 1] + ratio * (df['Close'].iloc[i] - evwma[i - 1])
    return pd.Series(evwma, index=df.index)

def ZLEMA(df, n=10, ratio=None):    # Zero-Lag Exponential Moving Average
    lag = (n - 1) // 2
    lagged_price = df['Close'].shift(lag)
    return EMA(df.assign(Close=2 * df['Close'] - lagged_price), n, ratio)

def VWAP(df, n=10):   # Volume-Weighted Average Price
    return (df['Close'] * df['Volume']).rolling(window=n).sum() / df['Volume'].rolling(window=n).sum()

def HMA(df, n=20):  # Hull Moving Average
    wma1 = WMA(df, n=trunc(n / 2))
    wma2 = WMA(df, n=n)
    madiff = 2 * wma1 - wma2
    return WMA(df.assign(Close=madiff), n=trunc(sqrt(n)))

def ALMA(df, n=9, offset=0.85, sigma=6):    # Arnaud Legoux Moving Average
    m = floor(offset * (n - 1))
    s = n / sigma
    wts = np.exp(-((np.arange(0, n) - m) ** 2) / (2 * s * s))
    wts /= wts.sum()
    return df['Close'].rolling(window=n).apply(lambda x: (x * wts).sum())

df['WMA'] = WMA(df)
df['DEMA'] = DEMA(df)
df['SMA'] = SMA(df)
df['EMA'] = EMA(df)
df['EVWMA'] = EVWMA(df)
df['ZLEMA'] = ZLEMA(df)
df['HMA'] = HMA(df)

In [46]:
def VHF(price, n=28):
    # Vertical Horizontal Filter
    if 'Close' in price.columns:
        close = price['Close']
    elif 'High' in price.columns and 'Low' in price.columns:
        close = price['Close']
    else:
        raise ValueError("Price series must be either Close, or High-Low-Close")

    hmax = price['High'].rolling(window=n).max()
    lmin = price['Low'].rolling(window=n).min()
    denom = close.diff().abs()
    VHF = (hmax - lmin) / denom.rolling(window=n).sum()

    return VHF
df['VHF'] = VHF(df)

In [47]:
def SMI(HLC, n=13, nFast=2, nSlow=25, nSig=9, maType='EMA'):
    # Stochastic Momentum Index
    if 'High' in HLC.columns and 'Low' in HLC.columns and 'Close' in HLC.columns:
        high = HLC['High']
        low = HLC['Low']
        close = HLC['Close']
    else:
        raise ValueError("Price series must be either High-Low-Close, or Close")

    hmax = high.rolling(window=n).max()
    lmin = low.rolling(window=n).min()
    HLdiff = hmax - lmin
    Cdiff = close - (hmax + lmin) / 2

    num1 = Cdiff.ewm(span=nSlow, adjust=False).mean() if maType == 'EMA' else Cdiff.rolling(window=nSlow).mean()
    den1 = HLdiff.ewm(span=nSlow, adjust=False).mean() if maType == 'EMA' else HLdiff.rolling(window=nSlow).mean()
    num2 = num1.ewm(span=nFast, adjust=False).mean() if maType == 'EMA' else num1.rolling(window=nFast).mean()
    den2 = den1.ewm(span=nFast, adjust=False).mean() if maType == 'EMA' else den1.rolling(window=nFast).mean()

    SMI_value = 100 * (num2 / (den2 / 2))
    signal = SMI_value.ewm(span=nSig, adjust=False).mean() if maType == 'EMA' else SMI_value.rolling(window=nSig).mean()

    return SMI_value
df['SMI'] = SMI(df)

In [48]:
def AR(stock_data):
    # Count the number of stocks that have closed higher than their previous close
    advancing_stocks = (stock_data > stock_data.shift(1)).sum(axis=1)
    # Count the number of stocks that have closed lower than their previous close
    declining_stocks = (stock_data < stock_data.shift(1)).sum(axis=1)

    # Avoid division by zero
    declining_stocks[declining_stocks == 0] = 1

    AR = advancing_stocks / declining_stocks
    return AR
sp500_values = sp500.pivot(index='Date', columns='Symbol', values='Close')
df['AR'] = AR(sp500_values)

In [49]:
def OBV(price, volume):
    # On Balance Volume
    prChg = price.diff()
    obv = [volume[0]]
    for i in range(1, len(volume)):
        if prChg[i] > 0:
            obv.append(obv[-1] + volume[i])
        elif prChg[i] < 0:
            obv.append(obv[-1] - volume[i])
        else:
            obv.append(obv[-1]) # OBV[t] = OBV[t-1] if price change is equal to zero
    return obv
df['OBV'] = OBV(df['Close'], df['Volume'])

In [50]:
def chaikin_ad(df):
    # Calculate the Close Location Value (CLV)
    clv = ((df['Close'] - df['Low']) - (df['High'] - df['Close'])) / (df['High'] - df['Low'])
    clv = clv.fillna(0)  # handle cases where high and low are equal

    # Multiply CLV by volume
    ad = clv * df['Volume']

    # Calculate the cumulative sum of AD
    ad_cumsum = ad.cumsum()

    return ad_cumsum
df['ChaikinAD'] = chaikin_ad(df)

In [51]:
def MFI(HLC, volume, n=14):
    # Assuming HLC is a DataFrame with columns 'High', 'Low', 'Close'
    typical_price = (HLC['High'] + HLC['Low'] + HLC['Close']) / 3
    money_flow = typical_price * volume

    # Calculate positive and negative Money Flow
    pmf = money_flow.where(typical_price > typical_price.shift(1), 0)
    nmf = money_flow.where(typical_price < typical_price.shift(1), 0)

    # Calculate Money Ratio and Money Flow Index
    num = pmf.rolling(window=n).sum()
    den = nmf.rolling(window=n).sum()
    mr = num / den
    mfi = 100 - (100 / (1 + mr))
    mfi[(den == 0) & (num == 0)] = 50
    mfi[den == 0] = 100

    return mfi
df['MFI'] = MFI(df[['High', 'Low', 'Close']], df['Volume'])

In [52]:
def ROC(x, n=1, type="continuous", na_pad=True):
    if type == "discrete":
        roc = x / x.shift(n) - 1
    elif type == "continuous":
        roc = (x / x.shift(n)).apply(np.log).diff(n)
    else:
        raise ValueError("Type must be either 'continuous' or 'discrete'")

    if na_pad:
        roc[:n] = np.nan

    return roc
df['ROC'] = ROC(df['Close'])

In [53]:
df.to_csv(f'{stock_symbol}_technical_data.csv')

In [62]:
print(df.tail(5))
print(df.columns)

                   Symbol    Adj Close        Close         High          Low  \
Date                                                                            
2022-12-26  HINDUNILVR.NS  2596.245605  2617.449951  2637.850098  2606.750000   
2022-12-27  HINDUNILVR.NS  2572.241699  2593.250000  2634.899902  2575.250000   
2022-12-28  HINDUNILVR.NS  2562.669922  2583.600098  2603.100098  2578.100098   
2022-12-29  HINDUNILVR.NS  2547.245850  2568.050049  2583.300049  2539.399902   
2022-12-30  HINDUNILVR.NS  2540.302490  2561.050049  2586.449951  2555.199951   

                   Open     Volume    pct_ch        ADX        ATR  ...  \
Date                                                                ...   
2022-12-26  2620.949951   605144.0 -0.001393  23.772708  43.031955  ...   
2022-12-27  2625.000000   870960.0 -0.009246  27.399991  45.247681  ...   
2022-12-28  2593.000000   822651.0 -0.003721  29.691067  42.547991  ...   
2022-12-29  2580.250000  1345863.0 -0.006019  35.374416  