In [1]:
import pandas as pd
import requests
import numpy as np
from lightweight_charts import Chart
from stock_indicators import indicators, Quote
from datetime import datetime, timedelta
from legitindicators import damiani_volatmeter
import asyncio
import nest_asyncio

nest_asyncio.apply()

In [2]:
import yfinance as yf
df = yf.download('SPY', start='2010-01-01', multi_level_index=False)
df.reset_index(inplace=True)
df.to_csv('SPY.csv', index=False)
df = pd.read_csv('SPY.csv')
rawdf = df.copy()
df['Date'] = pd.to_datetime(df['Date'])
df.head()

  df = yf.download('SPY', start='2010-01-01', multi_level_index=False)
[*********************100%***********************]  1 of 1 completed


Unnamed: 0,Date,Close,High,Low,Open,Volume
0,2010-01-04,85.279243,85.324391,83.909719,84.556858,118944600
1,2010-01-05,85.504967,85.542593,84.918029,85.226551,111579900
2,2010-01-06,85.565155,85.77585,85.35446,85.422181,116074400
3,2010-01-07,85.926323,86.03167,85.166311,85.407106,131091100
4,2010-01-08,86.21228,86.249907,85.527521,85.70059,126402800


In [3]:
quotes = [
    Quote(d, o, h, l, c, v)
    for d, o, h, l, c, v in zip(
        df['Date'],
        df['Open'],
        df['High'],
        df['Low'],
        df['Close'],
        df['Volume']
    )
]


In [4]:
# Calculate RSI
df['rsi'] = [r.rsi for r in indicators.get_rsi(quotes, 14)]
df['rsima6'] = df['rsi'].rolling(6).mean()
df['rsima14'] = df['rsi'].rolling(14).mean()
df['rsi_signal'] = np.where(df['rsi'] > df['rsima14'], 'long', 'short')

# Calculate Mcginley dynamic
df['dynamic20'] = [r.dynamic for r in indicators.get_dynamic(quotes, 20)]
df['md_signal'] = np.where(df['Close'] > df['dynamic20'], 'long', 'short')






In [5]:
# Define Damiani Volatmeter parameters
vis_atr = 13  # Visual ATR period
vis_std = 20  # Visual STD period
sed_atr = 40  # Sediment ATR period
sed_std = 100  # Sediment STD period
threshold_level = 1.4  # Threshold level
lag_supressor = True  # Lag suppressor flag
lag_s_K = 0.5  # Lag suppressor coefficient

# Calculate ATR for both periods
def calculate_atr(high, low, close, period):
    tr = pd.DataFrame()
    tr['hl'] = high - low
    tr['hc'] = abs(high - close.shift(1))
    tr['lc'] = abs(low - close.shift(1))
    tr['tr'] = tr[['hl', 'hc', 'lc']].max(axis=1)
    return tr['tr'].rolling(window=period, min_periods=1).mean()

# Make a copy of the dataframe to avoid modifying the original
calc_df = df.copy()

# Initialize volatmeter series
vol = pd.Series(0.0, index=calc_df.index)

# Calculate ATR ratios
vis_atr_values = calculate_atr(calc_df['High'], calc_df['Low'], calc_df['Close'], vis_atr)
sed_atr_values = calculate_atr(calc_df['High'], calc_df['Low'], calc_df['Close'], sed_atr)

# Calculate standard deviation ratios with min_periods=1 to avoid initial NaNs
vis_std_values = calc_df['Close'].rolling(window=vis_std, min_periods=1).std()
sed_std_values = calc_df['Close'].rolling(window=sed_std, min_periods=1).std()

# Avoid division by zero in standard deviation ratio
sed_std_values = sed_std_values.replace(0, np.nan)
anti_thres = vis_std_values / sed_std_values
anti_thres = anti_thres.fillna(0)

# Calculate Volatmeter
for i in range(3, len(calc_df)):
    s1_pivot = vol.iloc[i-1] if i > 0 else 0
    s3_pivot = vol.iloc[i-3] if i > 2 else 0
    
    atr_ratio = vis_atr_values.iloc[i] / sed_atr_values.iloc[i] if sed_atr_values.iloc[i] != 0 else 0
    
    if lag_supressor:
        vol.iloc[i] = atr_ratio + (lag_s_K * (s1_pivot - s3_pivot))
    else:
        vol.iloc[i] = atr_ratio

# Calculate threshold and final signal
t = threshold_level - anti_thres
vol_m = pd.Series(np.where(vol > t, 'short', 'long'), index=calc_df.index)

# Add results to dataframe
df['volatmeter'] = vol
df['threshold'] = t
df['vol_signal'] = vol_m



In [6]:
df = df.dropna().reset_index(drop=True)
df.head()


Unnamed: 0,Date,Close,High,Low,Open,Volume,rsi,rsima6,rsima14,rsi_signal,dynamic20,md_signal,volatmeter,threshold,vol_signal
0,2010-02-11,81.366287,81.456588,79.951616,80.418159,223591600,43.338212,37.144365,37.393813,long,82.204625,short,1.119595,0.493951,short
1,2010-02-12,81.298561,81.343708,80.147259,80.508449,304622100,43.04391,38.658263,37.885631,long,82.125697,short,1.136721,0.576454,short
2,2010-02-16,82.577789,82.660563,81.133018,81.915604,159317500,49.956581,41.162061,38.967011,long,82.162553,long,1.178627,0.630266,short
3,2010-02-17,82.969086,83.08196,82.57779,82.976606,168845100,51.880425,44.292911,39.984646,long,82.227189,long,1.146698,0.759698,short
4,2010-02-18,83.45816,83.631229,82.803496,82.833596,193708600,54.247914,46.82408,41.443749,long,82.32385,long,1.065257,0.855748,short


In [7]:
# RSI crossover detection
if __name__ == '__main__':

    chart = Chart(title="RSI crossover", maximize=True, inner_height=0.8)
    chart.legend(visible=True, color_based_on_candle=True)
    # chart.layout(background_color="white")

    # Set the main candlestick data for the chart.
    # The 'lightweight-charts' library expects a DataFrame with columns like 'Date', 'Open', 'High', 'Low', 'Close'.
    chart.set(df)

    # Create RSI subchart    
    ris_chart = chart.create_subchart(position='left', width=1.0, height=0.2, sync=True)
    rsi_line = ris_chart.create_line('rsi', color='#ff5733', width=1, price_line=False, price_label=False)
    rsi_line.set(df[['Date', 'rsi']])

    rsi_ma14_line = ris_chart.create_line('rsima14', color='#33ff57', width=1, price_line=False, price_label=False)
    rsi_ma14_line.set(df[['Date', 'rsima14']])

    # Initialize a list to hold the markers
    markers = []

    # Iterate through the DataFrame to find crossover points
    for i in range(1, len(df)):

        rsiSignal = df.iloc[i]['rsi_signal']
        p_rsiSignal = df.iloc[i - 1]['rsi_signal']
        current_time = df.iloc[i]['Date']

        # Check for buy signal (EMA 12 crosses above EMA 25)
        if rsiSignal == 'long' and p_rsiSignal == 'short':
            markers.append({
                'time': current_time,
                'position': 'below',
                'shape': 'arrow_up',
                'color': '#33de3d',
                'text': 'Buy'
            })
        
        # Check for sell signal (EMA 12 crosses below EMA 25)
        elif rsiSignal == 'short' and p_rsiSignal == 'long':
            markers.append({
                'time': current_time,
                'position': 'above',
                'shape': 'arrow_down',
                'color': '#f485fb',
                'text': 'Sell'
            })

    # Add all markers at once. It's more efficient than adding them individually in a loop.
    if markers:
        chart.marker_list(markers)

chart.show(block=True)

In [8]:
# McGinley dynamic 20
if __name__ == '__main__':

    chart = Chart(title="McGinley Dynamic 20", maximize=True)
    chart.legend(visible=True, color_based_on_candle=True)
    # chart.layout(background_color="white")

    # Set the main candlestick data for the chart.
    # The 'lightweight-charts' library expects a DataFrame with columns like 'Date', 'Open', 'High', 'Low', 'Close'.
    chart.set(df)

    # Create McGinley Dynamic line
    md_line = chart.create_line('dynamic20',color="#1f77b4", width=1, price_line=False, price_label=False )
    md_line.set(df[['Date', 'dynamic20']])

    # Initialize a list to hold the markers
    markers = []

    # Iterate through the DataFrame to find crossover points
    for i in range(1, len(df)):

        mdSignal = df.iloc[i]['md_signal']
        p_mdSignal = df.iloc[i - 1]['md_signal']
        current_time = df.iloc[i]['Date']

        # Check for buy signal (EMA 12 crosses above EMA 25)
        if mdSignal == 'long' and p_mdSignal == 'short':
            markers.append({
                'time': current_time,
                'position': 'below',
                'shape': 'arrow_up',
                'color': '#33de3d',
                'text': 'Buy'
            })
        
        # Check for sell signal (EMA 12 crosses below EMA 25)
        elif mdSignal == 'short' and p_mdSignal == 'long':
            markers.append({
                'time': current_time,
                'position': 'above',
                'shape': 'arrow_down',
                'color': '#f485fb',
                'text': 'Sell'
            })

    # Add all markers at once. It's more efficient than adding them individually in a loop.
    if markers:
        chart.marker_list(markers)

chart.show(block=True)

In [9]:
# Create a visualization of the Damiani Volatmeter
if __name__ == '__main__':

    chart = Chart(title="Damiani Volatmeter with Signals", maximize=True, inner_height=0.8)
    chart.set(df)


    # Create Volatmeter subchart
    DV_chart = chart.create_subchart(position='left', width=1.0, height=0.2, sync=True)
        
    # Plot Volatmeter and threshold on subchart
    volatmeter_line = DV_chart.create_line('Volatmeter', color="#2ee30f", width=2, price_line=False, price_label=False)
    volatmeter_line.set(df[['Date', 'volatmeter']])
    threshold_line = DV_chart.create_line('Threshold', color="red", width=2, price_line=False, price_label=False)
    threshold_line.set(df[['Date', 'threshold']])
    
    # Initialize a list to hold the markers
    markers = []

    # Iterate through the DataFrame to find crossover points
    for i in range(1, len(df)):

        mdSignal = df.iloc[i]['md_signal']
        p_mdSignal = df.iloc[i - 1]['md_signal']
        current_time = df.iloc[i]['Date']

        # Check for buy signal 
        if mdSignal == 'long' and p_mdSignal == 'short':
            markers.append({
                'time': current_time,
                'position': 'below',
                'shape': 'arrow_up',
                'color': '#33de3d',
                'text': 'Buy'
            })
        
        # Check for sell signal
        elif mdSignal == 'short' and p_mdSignal == "long":
            markers.append({
                'time': current_time,
                'position': 'above',
                'shape': 'arrow_down',
                'color': '#f485fb',
                'text': 'Sell'
            })

    # Add all markers at once. It's more efficient than adding them individually in a loop.
    if markers:
        chart.marker_list(markers)
    
    
chart.show(block=True)

In [10]:
# Create a visualization of RSI, McGinley Dynamic, and Damiani Volatmeter
if __name__ == '__main__':

    chart = Chart(title="RSI + MD + DV", maximize=True, inner_height=1.0)
    chart.set(df)

    # Create McGinley Dynamic line
    md_line = chart.create_line('dynamic20',color="#1f77b4", width=1, price_line=False, price_label=False )
    md_line.set(df[['Date', 'dynamic20']])

    # Create RSI subchart    
    # ris_chart = chart.create_subchart(position='left', width=1.0, height=0.2, sync=True)
    # rsi_line = ris_chart.create_line('rsi', color='#ff5733', width=1, price_line=False, price_label=False)
    # rsi_line.set(df[['Date', 'rsi']])
    # rsi_ma14_line = ris_chart.create_line('rsima14', color='#33ff57', width=1, price_line=False, price_label=False)
    # rsi_ma14_line.set(df[['Date', 'rsima14']])

    # Create Volatmeter subchart
    # DV_chart = chart.create_subchart(position='left', width=1.0, height=0.2, sync=True)
    # volatmeter_line = DV_chart.create_line('Volatmeter', color="#2ee30f", width=2, price_line=False, price_label=False)
    # volatmeter_line.set(df[['Date', 'volatmeter']])
    # threshold_line = DV_chart.create_line('Threshold', color="red", width=2, price_line=False, price_label=False)
    # threshold_line.set(df[['Date', 'threshold']])
    
    
    
    # Initialize a list to hold the markers
    markers = []
    buy_signal = 0
    sell_signal = 0
    
    # Iterate through the DataFrame to find crossover points
    for i in range(1, len(df)):

        mdSignal = df.iloc[i]['md_signal']
        p_mdSignal = df.iloc[i - 1]['md_signal']
        dv_signal = df.iloc[i]['vol_signal']
        p_dv_signal = df.iloc[i - 1]['vol_signal']
        rsiSignal = df.iloc[i]['rsi_signal']
        p_rsiSignal = df.iloc[i - 1]['rsi_signal']
        
        current_time = df.iloc[i]['Date']

        # Check for buy signal 
        if mdSignal == "long" and rsiSignal == "long" and dv_signal == "long" and buy_signal == 0:
            markers.append({
                'time': current_time,
                'position': 'below',
                'shape': 'arrow_up',
                'color': '#33de3d',
                'text': 'Buy'
            })
            buy_signal = 1
            sell_signal = 0
        
        # Check for sell signal
        elif mdSignal == "short" and rsiSignal == "short" and dv_signal == "short" and sell_signal == 0:
            markers.append({
                'time': current_time,
                'position': 'above',
                'shape': 'arrow_down',
                'color': '#f485fb',
                'text': 'Sell'
            })
            sell_signal = 1
            buy_signal = 0

    # Add all markers at once. It's more efficient than adding them individually in a loop.
    if markers:
        chart.marker_list(markers)
    
    
chart.show(block=True)