In [1]:
%pip install plotly --quiet
%pip install pandas --quiet
%pip install ta --quiet
import ta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import requests
import json
import datetime
import pandas as pd
import numpy as np
import time

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
# Kraken data
pair = "XBTUSDT"
time_interval_in_minutes = 5

url = f"https://api.kraken.com/0/public/OHLC?pair={pair}&interval={time_interval_in_minutes}"
response = requests.get(url)
data = json.loads(response.text)

ohlcv_data = data['result'][pair]
times = [datetime.datetime.fromtimestamp(int(entry[0])) for entry in ohlcv_data]
opens = [entry[1] for entry in ohlcv_data]
highs = [entry[2] for entry in ohlcv_data]
lows = [entry[3] for entry in ohlcv_data]
closes = [entry[4] for entry in ohlcv_data]

df = pd.DataFrame(list(zip(times, opens, highs, lows, closes)), columns=['time', 'open', 'high', 'low', 'close'])
df['time'] = pd.to_datetime(df['time'])
df[['open', 'high', 'low', 'close']] = df[['open', 'high', 'low', 'close']].apply(pd.to_numeric)

df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'], window=14)
df['rsi'] = ta.momentum.RSIIndicator(df['close'], window=14).rsi()
df['sma14'] = ta.trend.sma_indicator(df['close'], window=14)
df[['atr', 'rsi', 'sma14']] = df[['atr', 'rsi', 'sma14']].round(2)
df.drop(df.index[:14], inplace=True)
df.reset_index(drop=True, inplace=True)
pd.set_option('display.max_rows', 100)

In [3]:
def find_horizontal_resistance_lines(df):
    resistance_lines = []
    i = 0
    waiting_period = 11
    
    while i < len(df) - waiting_period-1:  # leaves space for checking the next 10 candles
        origin_candle = df.iloc[i]
        if origin_candle['close'] > origin_candle['open']:  # Check if the origin candle is green
            line_price = 0.6 * origin_candle['close'] + 0.4 * origin_candle['high']  # Weighted mean of close and high
            second_touch = None 
            waiting_candles = df.iloc[i+1:i+waiting_period]
            waiting_candles_upper_range = line_price + (0.5 * waiting_candles['atr'])
            waiting_candles_line_price = 0.6 * waiting_candles[['close', 'open']].max(axis=1) + 0.4 * waiting_candles['high']
            
            # If any of the waiting candle's close prices exceed the upper limit, we skip to the next current candle
            if any(waiting_candles_line_price > waiting_candles_upper_range):
                i += 1
                continue
                
            for j in range(i+waiting_period, len(df)):  # Skip the waiting candles and then start looking for touch points
                next_candle = df.iloc[j]
                upper_range = line_price + (0.5 * next_candle['atr'])
                lower_range = line_price - (0.5 * next_candle['atr'])
                bottom_limit = line_price - (5 * next_candle['atr'])
                next_candle_line_price = 0.6 * next_candle[['close', 'open']].max() + 0.4 * next_candle['high']
                next_candle_line_price_down = 0.6 * next_candle[['close', 'open']].min() + 0.4 * next_candle['low']

                # What counts as a touch?:
                if lower_range <= next_candle_line_price <= upper_range and second_touch is None:
                    second_touch = next_candle['time']  # If touch, save the timestamp of the second touch
                
                # Check if next_candle close is outside the allowed range, even if there's no second touch yet
                if next_candle_line_price > upper_range or next_candle_line_price_down < bottom_limit:
                    if second_touch is not None:  # If we had a second touch, we append the line
                        resistance_lines.append({'start_time': origin_candle['time'], 'end_time': next_candle['time'], 'second_point': second_touch, 'line_price': line_price})
                    second_touch = None  
                    i = j + 1  # Start analyzing from the next candle
                    break
            else:
                i += 1  # If we finished the loop without hitting a break, increment i to check the next candle
        else:
            i += 1  # If the current candle is not green, check the next one
    return pd.DataFrame(resistance_lines)

resistance_df = find_horizontal_resistance_lines(df)

In [4]:
resistance_df['start_time'] = pd.to_datetime(resistance_df['start_time'])
resistance_df['end_time'] = pd.to_datetime(resistance_df['end_time'])
resistance_df['line_length'] = (resistance_df['end_time'] - resistance_df['start_time']).dt.total_seconds() / (time_interval_in_minutes * 60)
resistance_df['line_length'] = resistance_df['line_length'].astype(int)

In [5]:
def find_touch_points(df, resistance_df):
    waiting_period = 11
    touch_point_cols = []
    max_touch_points = 0
    
    for idx, row in resistance_df.iterrows():

        second_touch_point = row['second_point']
        price = row['line_price']
        atr = df.loc[df['time'] == second_touch_point, 'atr'].values[0]  # ATR value at the second touch point
        
        upper_range = price + (0.5 * atr)
        lower_range = price - (0.5 * atr)
        bottom_limit = price - (5 * atr)
        
        # Initialize the start index for slicing
        start_idx = df.loc[df['time'] == second_touch_point].index[0] + waiting_period
        end_idx = df.loc[df['time'] == row['end_time']].index[0]

        touch_points = []
        while start_idx < end_idx:
            slice_df = df.iloc[start_idx:end_idx+1]  
            found = False
            for j, candle in slice_df.iterrows():
                candle_line_price = 0.6 * candle[['close', 'open']].max() + 0.4 * candle['high']
                candle_line_price_down = 0.6 * candle[['close', 'open']].min() + 0.4 * candle['low']
                if lower_range <= candle_line_price <= upper_range:
                    touch_points.append(candle['time'])
                    start_idx = j + waiting_period
                    found = True
                    break
                if candle_line_price > upper_range or candle_line_price_down < bottom_limit:
                    break

            if not found:
                break

        max_touch_points = max(max_touch_points, len(touch_points))
        for i, tp in enumerate(touch_points):
            col_name = f'touch_point_{i+3}'
            if col_name not in touch_point_cols:
                touch_point_cols.append(col_name)
                resistance_df[col_name] = None
            resistance_df.loc[idx, col_name] = tp
        
        resistance_df.loc[idx, 'touch_points_count'] = len(touch_points)

    print(f"Maximum touch points for a line: {max_touch_points}")
    return resistance_df
resistance_df = find_touch_points(df, resistance_df)

Maximum touch points for a line: 1


In [6]:
resistance_df['has_third_touch'] = resistance_df['touch_point_3'].notna()
resistance_df['touch_points_count'] = resistance_df['touch_points_count'] + 2 # Every line starts with 2 touchpoints already
resistance_df['touch_points_count'] = resistance_df['touch_points_count'].astype(int)
resistance_df

Unnamed: 0,start_time,end_time,second_point,line_price,line_length,touch_points_count,touch_point_3,has_third_touch
0,2023-05-30 12:35:00,2023-05-30 13:45:00,2023-05-30 13:40:00,27721.58,14,2.0,,False
1,2023-05-30 22:25:00,2023-05-31 00:40:00,2023-05-30 23:35:00,27687.2,27,2.0,,False
2,2023-05-31 01:25:00,2023-05-31 03:35:00,2023-05-31 02:30:00,27177.6,26,3.0,2023-05-31 03:25:00,True
3,2023-05-31 07:35:00,2023-05-31 09:10:00,2023-05-31 09:05:00,27115.24,19,2.0,,False
4,2023-05-31 12:10:00,2023-05-31 13:25:00,2023-05-31 13:05:00,26958.68,15,2.0,,False
5,2023-05-31 13:50:00,2023-05-31 15:45:00,2023-05-31 14:45:00,27058.12,23,3.0,2023-05-31 15:40:00,True
6,2023-05-31 16:30:00,2023-05-31 18:15:00,2023-05-31 18:05:00,27110.8,21,2.0,,False
7,2023-06-01 03:25:00,2023-06-01 04:45:00,2023-06-01 04:30:00,26854.2,16,2.0,,False
8,2023-06-01 05:10:00,2023-06-01 07:05:00,2023-06-01 06:25:00,26942.18,23,2.0,,False
9,2023-06-01 08:50:00,2023-06-01 12:35:00,2023-06-01 10:05:00,26953.9,45,3.0,2023-06-01 11:15:00,True


In [7]:
def calculate_returns_after_third_touch(df, resistance_df):
    returns_dict = {}

    for idx, row in resistance_df.iterrows():
        if not row['has_third_touch']:
            continue

        line_length = row['line_length']
        third_point = row['touch_point_3']
        
        # Get the price at the third touch point, this is the entry. May be a bit below or above the line's price.
        entry = df.loc[df['time'] == third_point, 'close'].values[0]

        # Slice the original data frame from the third touch point to the end of the period (3 * line_length)
        start_idx = df.loc[df['time'] == third_point].index[0]
        end_idx = min(start_idx + 3 * line_length, len(df) - 1)
        slice_df = df.iloc[start_idx:end_idx+1]
        # returns are also based on the 'closes' of the candles.
        returns = slice_df['close'].apply(lambda x: x - entry)
        returns_dict[idx] = returns.tolist()

    return returns_dict

returns_dict = calculate_returns_after_third_touch(df, resistance_df)

In [9]:
# Candlestick plot
fig = make_subplots(specs=[[{"secondary_y": True}]])

candlestick_trace = go.Candlestick(x=df['time'], open=df['open'], high=df['high'], low=df['low'], close=df['close'])
atr_trace = go.Scatter(x=df['time'], y=df['atr'], mode='lines', name='ATR', line=dict(color='purple'), opacity=0.2)
first_touch_trace = go.Scatter(x=resistance_df['start_time'], y=resistance_df['line_price'], mode='markers', name='First Touch Point', marker=dict(color='black', size=6))
second_touch_trace = go.Scatter(x=resistance_df['second_point'], y=resistance_df['line_price'], mode='markers', name='Second Touch Point', marker=dict(color='black', size=6))
third_touch_trace = go.Scatter(x=resistance_df['touch_point_3'], y=resistance_df['line_price'], mode='markers', name='Third Touch Point', marker=dict(color='black', size=6))
#forth_touch_trace = go.Scatter(x=resistance_df['touch_point_4'], y=resistance_df['line_price'], mode='markers', name='Forth Touch Point', marker=dict(color='black', size=6))
sma_trace = go.Scatter(x=df['time'], y=df['sma14'], mode='lines', name='SMA14', line=dict(color='blue'), opacity=0.4)
rsi_trace = go.Scatter(x=df['time'], y=df['rsi'], mode='lines', name='RSI', line=dict(color='maroon'), opacity=0.2)

fig.add_trace(candlestick_trace, secondary_y=False)
fig.add_trace(first_touch_trace, secondary_y=False)
fig.add_trace(second_touch_trace, secondary_y=False)
fig.add_trace(third_touch_trace, secondary_y=False)
#fig.add_trace(forth_touch_trace, secondary_y=False)
fig.add_trace(sma_trace, secondary_y=False)
fig.add_trace(atr_trace, secondary_y=True)
fig.add_trace(rsi_trace, secondary_y=True)

fig.update_layout(title='BTC/USDT', xaxis_title='Time')
fig.update_yaxes(title_text='Price', secondary_y=False, tickformat="$,.2f", autorange=True, fixedrange=False)
fig.update_yaxes(title_text='ATR', secondary_y=True)

for _, row in resistance_df.iterrows():
    start_time = row['start_time']
    end_time = row['end_time']
    price = row['line_price']
    
    fig.add_shape(type="line", x0=start_time, y0=price, x1=end_time, y1=price, line=dict(color='blue', width=3), xref='x', yref='y')

fig.show()

# Returns plot
fig = go.Figure()

for idx, (_, returns) in enumerate(returns_dict.items()):
    scatter_trace = go.Scatter(y=returns, mode='lines', name=f'res_line_{idx + 1}', line=dict(color='purple'))
    fig.add_trace(scatter_trace)

fig.add_hline(y=0, line=dict(color='blue', width=2, dash='dash'))
fig.update_layout(title="Returns from 3rd Touch Point, Long Entry is Close of 3rd Touch Point", xaxis_title="# Candles after 3rd Touch", yaxis_title="Returns")
fig.show()