In [1]:
import numpy as np
import ta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from quantfreedom.enums import CandleBodyType
from quantfreedom.helper_funcs import dl_ex_candles
from quantfreedom.indicators.tv_indicators import macd_tv, ema_tv

np.set_printoptions(formatter={"float_kind": "{:0.2f}".format})

%load_ext autoreload
%autoreload 2




In [2]:
candles = dl_ex_candles(
    exchange='mufex',
    symbol="BTCUSDT",
    timeframe="5m",
    candles_to_dl=3000
)

In [3]:
datetimes = candles[:,CandleBodyType.Timestamp].astype('datetime64[ms]')
open_prices = candles[:,CandleBodyType.Open]
close_prices = candles[:,CandleBodyType.Close]
low_prices = candles[:,CandleBodyType.Low]
high_prices = candles[:,CandleBodyType.High]
volume = candles[:,CandleBodyType.Volume]

In [4]:
# def donchian_channels(high, low, period=96, offset=0):
#     """Calculates Donchian Channels with an offset."""

#     upper = np.full_like(high, np.nan)  # Initialize with NaN
#     lower = np.full_like(low, np.nan)
#     basis = np.full_like(high, np.nan)

#     for i in range(period, len(high)):
#         # Enhanced Debugging
#         print("Iteration:", i)
#         print("Index Range:", i - period + 1, "to", i)
#         print("Current high slice:", high[i - period + 1:i + 1])

#         upper[i] = max(high[i - period + 1:i + 1])
#         lower[i] = min(low[i - period + 1:i + 1])
#         basis[i] = (upper[i] + lower[i]) / 2

#     print("Length of input data:", len(high), len(low))
#     print("Length of upper array:", len(upper))
#     print("upper[1940:1945]:", upper[1940:1945]) 

#     return basis[offset:], upper[offset:], lower[offset:]


def donchian_channels(high, low, period=96, offset=0):
    """Calculates Donchian Channels with an offset."""

    upper = np.full_like(high, np.nan)  # Initialize with NaN
    lower = np.full_like(low, np.nan)
    basis = np.full_like(high, np.nan)

    for i in range(period, len(high)):
        # Enhanced Debugging
        print("Iteration:", i)
        print("Index Range:", i - period + 1, "to", i)
        print("Current high slice:", high[i - period + 1:i + 1])

        upper[i] = max(high[i - period: i + 1])  # Adjust from i - period + 1 
        lower[i] = min(low[i - period + 1:i + 1])
        basis[i] = (upper[i] + lower[i]) / 2

    print("Length of input data:", len(high), len(low))
    print("Length of upper array:", len(upper))

    return basis[offset:], upper[offset:].copy(), lower[offset:] 



In [5]:
def ema(data, period):
    """Calculates the Exponential Moving Average (EMA).

    Args:
        data (array-like): Array of values  
        period (int): The smoothing period.

    Returns:
        array-like: The calculated EMA values.
    """
    alpha = 2 / (period + 1)  # Smoothing factor
    ema_values = np.zeros_like(data)
    ema_values[period - 1] = np.mean(data[:period])  # Initial simple average

    for i in range(period, len(data)):
        ema_values[i] = alpha * data[i] + (1 - alpha) * ema_values[i - 1]

    return ema_values

def sma(data, period):
    """Calculates the Simple Moving Average (SMA).

    Args:
        data (array-like): Array of values.
        period (int): The smoothing period.

    Returns:
        array-like: The calculated SMA values.
    """
    sma_values = np.zeros_like(data)
    for i in range(period - 1, len(data)):
        sma_values[i] = np.mean(data[i - period + 1: i + 1])

    return sma_values

def calculate_ma(data, period, ma_type):
    if ma_type == "SMA":
        return ta.sma(data, period)  
    elif ma_type == "EMA":
        return ema(data, period)  # Use our custom 'ema' function
    # ... Add cases for WMA and RMA if those functions are available 
    else:  
        return ta.sma(data, period)  # Default to SMA

def smooth_indicator(data, smooth_type, period):
    # Implement smoothing functions for EMA, WMA, RMA if needed
    if smooth_type == "SMA":
        return ta.sma(data, period) 
    # ... 
    else: 
        return data 
    

def atr(high, low, close, period):
    """Calculates the Average True Range (ATR).

    Args:
        high (array-like): Array of high prices.
        low (array-like): Array of low prices.
        close (array-like): Array of closing prices.
        period (int): The lookback period.

    Returns:
        array-like: The calculated ATR values.
    """
    tr = np.zeros_like(high)
    atr_values = np.zeros_like(high)

    for i in range(1, len(high)):
        tr[i] = max(
            high[i] - low[i], 
            abs(high[i] - close[i - 1]), 
            abs(low[i] - close[i - 1])
        )

    atr_values[period - 1] = np.mean(tr[:period])  # Initial simple average

    for i in range(period, len(high)):
        atr_values[i] = (atr_values[i - 1] * (period - 1) + tr[i]) / period

    return atr_values


In [6]:
def lwti(close, period=8, smooth=False, smooth_type="SMA", smooth_period=5):
    """Calculates the Larry Williams Large Trade Index (LWTI)."""

    def calculate_range(high, low):  # True Range helper 
        tr = np.maximum(high - low, np.abs(np.subtract(high, np.roll(close, 1))))
        return tr

    diff = close - np.roll(close, period)
    tr = calculate_range(high_prices, low_prices) 

    ma_diff = calculate_ma(diff, period, smooth_type) 
    ma_tr = calculate_ma(tr, period, smooth_type) 

    lwti = (ma_diff / ma_tr) * 50 + 50

    if smooth:
        lwti = smooth_indicator(lwti, smooth_type, smooth_period)

    return lwti

In [7]:
# Calculate LWTI
lwti_values = lwti(close_prices, period=25, smooth=True, smooth_type="EMA", smooth_period=20)  
lwti_values

  lwti = (ma_diff / ma_tr) * 50 + 50


array([nan, nan, nan, ..., 160.70, 155.70, 158.17])

In [8]:
# Calculate Donchian Channels (with an offset to handle the initial period)
period = 96  # Lookback period
offset = period - 1  # Offset to skip incomplete calculations
basis, upper, lower = donchian_channels(high_prices, low_prices, period, offset)
# upper = np.concatenate((np.full(96, np.nan), upper))  



Iteration: 96
Index Range: 1 to 96
Current high slice: [61963.20 61923.50 61855.60 61891.80 61797.00 61894.70 61970.80 61976.30
 61966.60 61978.10 62027.80 62034.00 62023.00 61992.30 62037.80 61939.50
 61978.80 61958.10 61918.60 61838.50 61921.70 61942.60 61923.80 61918.00
 61862.70 61848.90 61846.50 61803.20 61778.10 61768.00 61886.70 61950.30
 61964.20 61970.00 61986.60 61934.70 61934.10 61959.80 61993.60 62010.50
 62017.90 62007.00 61961.20 61879.90 61887.70 61916.90 61944.00 62005.60
 62028.10 62000.20 61975.80 62028.60 62028.60 62049.90 62080.10 62101.40
 62131.40 62093.00 62119.50 62129.60 62122.60 62142.90 62129.90 62114.20
 62116.10 62128.80 62160.30 62189.80 62272.80 62194.60 62062.10 61932.20
 61929.30 61929.50 61934.20 61905.80 61960.20 61960.00 61947.50 61929.80
 61975.20 61973.10 61990.20 62083.90 62067.20 62003.10 61987.10 61983.60
 61943.80 61940.00 61930.30 61967.40 61998.30 61967.20 61998.60 62000.20]
Iteration: 97
Index Range: 2 to 97
Current high slice: [61923.50 618

In [9]:
def get_lwti_color(value):
    if value > 50:
        return 'green'
    elif value < 50:
        return 'red'
    else:
        return 'gray'  # Or a color of your choice for the midline     
    
# plotly.__version__

In [10]:
# Create Plotly figure
fig = go.Figure(data=[
    go.Candlestick(
        x=datetimes,
        open=open_prices, high=high_prices,
        low=low_prices, close=close_prices
    ),
    go.Scatter(x=datetimes[offset:], y=basis, name='Basis', line=dict(color='#FF6D00')),
    go.Scatter(x=datetimes[offset:], y=upper, name='Upper', line=dict(color='#2962FF')),
    go.Scatter(x=datetimes[offset:], y=lower, name='Lower', line=dict(color='#2962FF'))
])
fig.update_layout(xaxis_rangeslider_visible=False)

fig.show()

In [11]:
def calculate_volume_color(open_price, close_price):
    if close_price >= open_price:
        return 'green'
    else:
        return 'red'

In [354]:
# def get_buy_signal(close, upper, lwti, volume, volume_ma):        
#     # if(close >= upper-10):
#         # print(close,upper,lwti,volume,volume_ma)
        
#     if i >= 19 and close >= upper-20 and lwti > 50 and volume > volume_ma:         
#         return True
#     else:
#         return False

def get_buy_signal(close, upper, lwti, volume, volume_ma):           
    # print('upper vs paramUpper',i,len(upper),len(paramUpper))  
    # print('upper lengh',i,len(upper))  
    if i >= offset and close[i] >= upper[i]-50 and lwti[i] > 50  and volume[i] > volume_ma[i] and close[i] > close[i-1] and close[i-1] > close[i-2] :                           
        return True
    else:
        return False


ma_volume = sma(volume, 30)

print('close_prices',len(close_prices))
print('upper',len(upper))
print('lwti_values',len(lwti_values))
print('volume',len(volume))
print('ma_volume',len(ma_volume))


buy_signals = np.full_like(close_prices, np.nan)  # Initialize with NaN

# print('before loop',upper[1942])
recent_buy_index = -1  # Initialize 
for i, (c, u, l, v, vm) in enumerate(zip(close_prices[offset:], upper, lwti_values[offset:], volume[offset:], ma_volume[offset:])):    
    # if(i == 1942):
    #     print('in loop',u)
    if get_buy_signal(close_prices[offset:], upper, lwti_values[offset:], volume[offset:], ma_volume[offset:]) and i - recent_buy_index >= 5: 
        buy_signals[i+offset] = c 
        recent_buy_index = i + offset  # Update after a buy 
        print("Buy signal detected:", c, u, "Close price difference:", abs(c - u), "Index:", i) 
        # print("Buy signal detected:", c, u, "Index:", i)             


close_prices 3000
upper 2905
lwti_values 3000
volume 3000
ma_volume 3000
Buy signal detected: 62127.7 62175.3 Close price difference: 47.60000000000582 Index: 274
Buy signal detected: 62839.1 62865.9 Close price difference: 26.80000000000291 Index: 423
Buy signal detected: 66830.8 66879.3 Close price difference: 48.5 Index: 704
Buy signal detected: 67746.1 67787.6 Close price difference: 41.5 Index: 955
Buy signal detected: 64649.3 64657.1 Close price difference: 7.799999999995634 Index: 1145
Buy signal detected: 66737.3 66737.3 Close price difference: 0.0 Index: 1470
Buy signal detected: 67951.2 67988.5 Close price difference: 37.30000000000291 Index: 1580
Buy signal detected: 67671.5 67715.3 Close price difference: 43.80000000000291 Index: 1804
Buy signal detected: 68417.7 68460.8 Close price difference: 43.10000000000582 Index: 2028
Buy signal detected: 68488.4 68492.9 Close price difference: 4.5 Index: 2228
Buy signal detected: 69837.5 69852.1 Close price difference: 14.60000000000

In [17]:
def shift_and_pad(arr, shift_amount, num_nan):
    shifted_arr = np.roll(arr, shift_amount)  # Shift the array
    padding = np.full(num_nan, np.nan)        # Create NaN padding
    return np.concatenate((padding, shifted_arr))  # Combine padding and shifted array

adj_upper = shift_and_pad(upper, shift_amount=2, num_nan=offset)
adj_basis = shift_and_pad(basis, shift_amount=2, num_nan=offset)
# close_prices > upper-50
len(adj_upper)

3000

In [69]:
ma_volume = sma(volume, 30)


prev_close = np.roll(close_prices,1)
prev_close[0] = np.nan

prev_prev_close = np.roll(prev_close,1)
prev_prev_close[0] = np.nan

price_touch_upper = close_prices >= adj_upper # Close

prev_adj_upper = np.roll(adj_upper,1);
prev_adj_upper[0] = np.nan

prev_prev_adj_upper = np.roll(prev_adj_upper,1)
prev_prev_adj_upper[0] = np.nan

prev_price_touch_upper = prev_close >= prev_adj_upper
prev_prev_price_touch_upper = prev_prev_close >= prev_prev_adj_upper
double_price_touch_upper = prev_price_touch_upper & prev_prev_price_touch_upper

lwti_uptrend = lwti_values > 50


# last_5_lwti_trend = np.roll(lwti_values,5)
# last_5_lwti_trend[0] = np.nan
# last_5_lwti_below = last_5_lwti_trend < 50

look_back_lwti_below_50 = 24

has_lwti_below_50 = np.zeros_like(close_prices, dtype=bool)  # Initialize as False

for i in range(look_back_lwti_below_50, len(lwti_values)):
    window = lwti_values[i - look_back_lwti_below_50: i]
    has_lwti_below_50[i] = np.any(window < 50)
    
print(len(has_lwti_below_50))

diffBasis = (adj_upper - adj_basis)/1000

is_price_up = (diffBasis >= 0.9) & (diffBasis < 1.96) 



volume_above_ma = volume > ma_volume
rising = close_prices > prev_close
rising2 = prev_close > prev_prev_close

entries = np.where(price_touch_upper & lwti_uptrend & volume_above_ma & (has_lwti_below_50),True,False)

signal_history = np.zeros_like(entries)  # Array to track recent signals
filter_distance = 24  # Number of candles to look back
for i, signal in enumerate(entries):
   if signal:
      # Check for previous signals within the filter distance
      if np.any(signal_history[max(0, i - filter_distance):i]):
          entries[i] = False  # Filter out the current signal
      else:
          signal_history[i] = True  # Record this signal
          
# filtered_indices = np.where(entries == True)
# print(len(filtered_indices[0]))
          
entry_signals = np.where(entries, close_prices, np.nan)
exit_prices = np.full_like(close_prices, np.nan)
exit_condition = close_prices <= adj_basis  
exit_prices[exit_condition] = close_prices[exit_condition] 

3000


In [41]:
# Create Plotly figure
test = go.Figure(data=[
    go.Candlestick(
        x=datetimes,
        open=open_prices, high=high_prices,
        low=low_prices, close=close_prices
    ),
    go.Scatter(x=datetimes[offset:], y=basis, name='Basis', line=dict(color='#FF6D00')),
    go.Scatter(x=datetimes[offset:], y=upper, name='Upper', line=dict(color='#2962FF')),
    go.Scatter(x=datetimes[offset:], y=lower, name='Lower', line=dict(color='#2962FF'))
])
test.update_layout(xaxis_rangeslider_visible=False)

test.show()

In [70]:
# Create figure with two rows
fig = make_subplots(rows=3, cols=1, shared_xaxes=True)  

# Candlestick chart with Donchian Channels (Row 1)
fig.add_trace(go.Candlestick(
    x=datetimes,
    name="Candles",
    open=open_prices, high=high_prices,
    low=low_prices, close=close_prices
), row=1, col=1)

fig.add_trace(go.Scatter(x=datetimes[offset:], y=basis, name='Basis', line=dict(color='#FF6D00')), row=1, col=1)
fig.add_trace(go.Scatter(x=datetimes[offset:], y=upper, name='Upper', line=dict(color='#2962FF')), row=1, col=1)
fig.add_trace(go.Scatter(x=datetimes[offset:], y=lower, name='Lower', line=dict(color='#2962FF')), row=1, col=1)

# ... (Donchian Channels background fill code - Add this to Row 1)

# plot the signal
fig.add_trace(go.Scatter(
    x=datetimes,
    y=entry_signals,
    mode='markers',
    marker=dict(color='#00F6FF', symbol='triangle-up', size=10),
    name='Buy Signals'
))

# Plotting (if desired)
# if not np.isnan(first_buy_signal):
#     print("buyyy")
#     fig.add_trace(go.Scatter(
#         x=[np.searchsorted(close_prices, first_buy_signal)],  # Find corresponding datetime
#         y=[first_buy_signal],
#         mode='markers',
#         marker=dict(color='green', symbol='triangle-up', size=12),
#         name='First Buy Signal'
#     ))

# LWTI Line (Row 2)
# fig.add_trace(go.Scatter(x=datetimes, y=lwti_values, mode='lines', name='LWTI', line=dict(color='#2DD204')), row=2, col=1)

# LWTI Line (Row 2)
fig.add_trace(go.Scatter(
    x=datetimes, y=lwti_values, 
    mode='lines+markers', name='LWTI', 
    marker=dict(color=[get_lwti_color(value) for value in lwti_values]),  # Color for markers
    line=dict(width=1)  # Styling for the line
), row=2, col=1)
# Midline for LWTI
fig.add_hline(y=50, line_dash="dot", line_color='gray', row=2, col=1)

# Volume Indicator (Row 3)
fig.add_trace(go.Bar(
    x=datetimes, 
    y=volume,    
    name="Volume",
    marker_color=list(map(calculate_volume_color, open_prices, close_prices))
), row=3, col=1)

# Volume Moving Average (default period 30)
ma_volume = sma(volume, 30)  # Assuming you have the 'ta' library for moving averages
fig.add_trace(go.Scatter(
    x=datetimes,
    y=ma_volume,
    mode='lines',
    name='Volume MA 30',
    line=dict(color='white')
), row=3, col=1) 

# fig.show(config={'displayModeBar': False})  # Optional: Hide the Plotly mode bar


# Customize layout
fig.update_layout(title="Candlestick Chart with Donchian Channels and LWTI",
                  xaxis_title="Date", 
                  yaxis_title="Price",                   
                  yaxis2_title="LWTI Value",
                  yaxis3_title="Volume MA",
                  width=1200,  # Set the desired width in pixels
                  height=800,   # Set the desired height in pixels
                  xaxis_rangeslider_visible=False,)

fig.show()
# 62359.5