In [622]:
# %%
import numpy as np
import pandas as pd
import math
from sklearn.linear_model import LinearRegression
import datetime
import yfinance as yf
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import mysql.connector

In [623]:
connection = mysql.connector.connect(
    host="localhost",
    port=3306,
    user="root",
    password="root",
    database="stocks"
)

In [624]:


global_slope_up = 0.01
global_slope_down = -0.01
threshold_value=0.000000000001
PEAK, VALLEY = 1, -1
extend_by = 5


In [625]:
def plot_candlestick_with_trends(df, trends):
    fig = go.Figure(data=[go.Candlestick(x=df['Date'],

                open=df['Open'],
                high=df['High'],
                low=df['Low'],
                close=df['Close'])])

    if not trends.empty:
        for _, row in trends.iterrows():
            trend_color = 'green' if row['Trend'] == 'Up' else 'red'
            fig.add_trace(go.Scatter(
                x=[row['Start Date'], row['End Date']],
                y=[row['Start Price'], row['End Price']],
                mode='lines',
                line=dict(color=trend_color, width=2),
                name=f"{row['Trend']} Trend"
            ))

    fig.update_layout(
        title=f"Candlestick Chart",
        xaxis_title="Date",
        yaxis_title="Price",
        xaxis_rangeslider_visible=False,
        
    )


    fig.show()

In [626]:
def plot_candlestick_with_checked_trends(df, trends):
    fig = go.Figure(data=[go.Candlestick(
        x=df['Date'],
        open=df['Open'],
        high=df['High'],
        low=df['Low'],
        close=df['Close']
    )])

    if not trends.empty:
        for _, row in trends.iterrows():
            trend_color = 'green' if row['Trend'] == 'Up' else 'red'
            fig.add_trace(go.Scatter(
                x=[row['Start Date'], row['End Date']],
                y=[row['Start Price'], row['End Price']],
                mode='lines',
                line=dict(color=trend_color, width=2),
                name=f"{row['Trend']} Trend"
            ))

    cross_styles = {
        'Above': dict(symbol='triangle-up', color='blue', size=8),
        'Below': dict(symbol='triangle-down', color='orange', size=8),
        'Intercept': dict(symbol='circle', color='purple', size=8),
        'None': None
    }

    for cross_type, style in cross_styles.items():
        if style is not None: 
            cross_points = df[df['Cross'] == cross_type]
            fig.add_trace(go.Scatter(
                x=cross_points['Date'],
                 y=cross_points['Close'] - 10,
                mode='markers',
                marker=style,
                name=cross_type
            ))

    fig.update_layout(
        title="Candlestick Chart with Trends and Cross Points",
        xaxis_title="Date",
        yaxis_title="Price",
        xaxis_rangeslider_visible=False
    )

    fig.show()

In [627]:
def plot_candlestick_with_unfiltered_trends(df, trends):
    fig = go.Figure(data=[go.Candlestick(x=df['Date'],

                open=df['Open'],
                high=df['High'],
                low=df['Low'],
                close=df['Close'])])


    # Add trendlines based on trends DataFrame
    if not trends.empty:
        for _, row in trends.iterrows():
            trend_color = 'green'
            fig.add_trace(go.Scatter(
                x=[row['Start Timeframe'], row['End Timeframe']],
                y=[row['Start Price'], row['End Price']],
                mode='lines',
                line=dict(color=trend_color, width=2),
            ))

    # Update layout for better visuals
    fig.update_layout(
        title=f"Candlestick Chart with Trendlines for {tickerSymbol}",
        xaxis_title="Date",
        yaxis_title="Price",
        xaxis_rangeslider_visible=False,
        
    )


    # Show the plot
    fig.show()

In [628]:

# %%

def _identify_initial_pivot(X, up_thresh, down_thresh):
    x_0 = X[0]
    max_x = x_0
    max_t = 0
    min_x = x_0
    min_t = 0
    up_thresh += 1
    down_thresh += 1

    for t in range(1, len(X)):
        x_t = X[t]

        if x_t / min_x >= up_thresh:
            return VALLEY if min_t == 0 else PEAK

        if x_t / max_x <= down_thresh:
            return PEAK if max_t == 0 else VALLEY

        if x_t > max_x:
            max_x = x_t
            max_t = t

        if x_t < min_x:
            min_x = x_t
            min_t = t

    t_n = len(X)-1
    return VALLEY if x_0 < X[t_n] else PEAK

def calculate_pivot_points(close, high, low, up_thresh, down_thresh):


    initial_pivot = _identify_initial_pivot(close, up_thresh, down_thresh)

    t_n = len(close)
    pivots = np.zeros(t_n, dtype='i1')
    pivots[0] = initial_pivot

    up_thresh += 1
    down_thresh += 1

    trend = -initial_pivot
    last_pivot_t = 0
    last_pivot_x = close[0]
    for t in range(1, len(close)):

        if trend == -1:
            x = low[t]
            r = x / last_pivot_x
            if r >= up_thresh:
                pivots[last_pivot_t] = trend
                trend = 1
                last_pivot_x = high[t]
                last_pivot_t = t
            elif x < last_pivot_x:
                last_pivot_x = x
                last_pivot_t = t
        else:
            x = high[t]
            r = x / last_pivot_x
            if r <= down_thresh:
                pivots[last_pivot_t] = trend
                trend = -1
                last_pivot_x = low[t]
                last_pivot_t = t
            elif x > last_pivot_x:
                last_pivot_x = x
                last_pivot_t = t


    if last_pivot_t == t_n-1:
        pivots[last_pivot_t] = trend
    elif pivots[t_n-1] == 0:
        pivots[t_n-1] = trend

    return pivots



In [629]:


# %%
def find_high_low_type(df):
    prev_high = float('-inf')
    prev_low = float('inf')
    high_patterns = []
    low_patterns=[]

    for index, row in df.iterrows():
        high = row['High']
        low = row['Low']
        pivot = row['Pivots']

        if pivot == 1:
            if high > prev_high:
                high_pattern = "HH"
                low_pattern = None
                prev_high = high
            else:
                high_pattern = "LH"
                low_pattern = None
                prev_high = high
        elif pivot == -1:
            if low > prev_low:
                high_pattern = None
                low_pattern = "HL"
                prev_low = low
            else:
                high_pattern = None
                low_pattern = "LL"
                prev_low = low
        
        else:
            high_pattern = None
            low_pattern = None
            
        high_patterns.append(high_pattern)
        low_patterns.append(low_pattern)

    df['High_pattern'] = high_patterns
    df['Low_pattern'] = low_patterns


In [630]:

# %%
def find_downtrend(df):
    df=df.reset_index()
    df['Trend_Down'] = None
    for i in range(0, len(df)-2):
        if df['High_pattern'].iloc[i] == 'HH' and df['Peaks'].iloc[i] > df['Peaks'].iloc[i+1] and df['Peaks'].iloc[i+1] > df['Peaks'].iloc[i+2]:
            df.at[i, 'Trend_Down'] = 'down'
    for i in range(1, len(df)-2):
        if df['Peaks'].iloc[i] > df['Peaks'].iloc[i-1] and df['Peaks'].iloc[i] > df['Peaks'].iloc[i+1] and df['Peaks'].iloc[i+1] > df['Peaks'].iloc[i+2]:
            df.at[i, 'Trend_Down'] = 'down'
        if df['High_pattern'].iloc[i] == 'LH' and df['High_pattern'].iloc[i-1] == 'HH' and df['High_pattern'].iloc[i+1] == 'LH':
            df.at[i, 'Trend_Down'] = 'down'
    for i in range(1, len(df)):
        if df['Trend_Down'].iloc[i-1] == 'down' and df['High_pattern'].iloc[i] == 'LH':
            df.at[i, 'Trend_Down'] = 'down'    
    return df


In [631]:

# %%
def find_uptrend(df):
    df=df.reset_index()
    df['Trend_Up'] = None
    for i in range(0, len(df)-2):
        if df['Low_pattern'].iloc[i] == 'LL' and df['Troughs'].iloc[i] < df['Troughs'].iloc[i+1] and df['Troughs'].iloc[i+1] < df['Troughs'].iloc[i+2]:
            df.at[i, 'Trend_Up'] = 'up'
    for i in range(1, len(df)-2):
        if df['Troughs'].iloc[i] < df['Troughs'].iloc[i-1] and df['Troughs'].iloc[i] < df['Troughs'].iloc[i+1] and df['Troughs'].iloc[i+1] < df['Troughs'].iloc[i+2]:
            df.at[i, 'Trend_Up'] = 'up'
        if df['Low_pattern'].iloc[i] == 'HL' and df['Low_pattern'].iloc[i-1] == 'LL' and df['Low_pattern'].iloc[i+1] == 'HL':
            df.at[i, 'Trend_Up'] = 'up'
    for i in range(1, len(df)):
        if df['Trend_Up'].iloc[i-1] == 'up' and df['Low_pattern'].iloc[i] == 'HL':
            df.at[i, 'Trend_Up'] = 'up'    
    return df


In [632]:

# %%
def find_consecutive_up_labels(df_low):
    line_up_data = []
    for i in range(0, len(df_low)-1):
        if df_low['Trend_Up'].iloc[i] == 'up' and df_low['Trend_Up'].iloc[i+1] == 'up':
            start_timeframe = df_low['Date'].iloc[i]
            start_price = df_low['Troughs'].iloc[i]
            end_timeframe = df_low['Date'].iloc[i+1]
            end_price = df_low['Troughs'].iloc[i+1]
            line_up_data.append([start_timeframe, start_price, end_timeframe, end_price])

    line_up = pd.DataFrame(line_up_data, columns=['Start Timeframe', 'Start Price', 'End Timeframe', 'End Price'])

    return line_up



In [633]:

# %%
def find_consecutive_down_labels(df_high):
    line_down_data = []  # Collect data in lists

    for i in range(0, len(df_high)-1):
        if df_high['Trend_Down'].iloc[i] == 'down' and df_high['Trend_Down'].iloc[i+1] == 'down':
            start_timeframe = df_high['Date'].iloc[i]
            start_price = df_high['Peaks'].iloc[i]
            end_timeframe = df_high['Date'].iloc[i+1]
            end_price = df_high['Peaks'].iloc[i+1]
            line_down_data.append([start_timeframe, start_price, end_timeframe, end_price])

    line_down = pd.DataFrame(line_down_data, columns=['Start Timeframe', 'Start Price', 'End Timeframe', 'End Price'])
    
    return line_down


In [634]:

# %%
def final_uptrend(df, x_up, y_up):

    dates_up = pd.to_datetime(x_up)
    
    if len(dates_up) > 0 and len(x_up) == len(y_up):

        price_up = np.array(y_up)
        t = ((dates_up - dates_up[0]).total_seconds() / 60).values
        # t = (dates_up - dates_up[0]).days.values
        time_frame_up = t.reshape(-1, 1)
        model = LinearRegression()
        model.fit(time_frame_up, price_up)

        slope = model.coef_[0]
        c=[] 
        
        for i in range(0, len(dates_up)-1):

            c.append(price_up[i] - (slope * time_frame_up[i]))

        min_c= 100000000000

        for i in range(0 , len(c)):
            if c[i] < min_c:
                min_c = c[i]

        price_final_up=[]

        for i in range(0, len(dates_up)):
            price_final_up.append((slope * time_frame_up[i]) + min_c)

        price_final_flat_up = np.concatenate(price_final_up, axis=0)
        
        d_line_up = pd.DataFrame({'Date': dates_up, 'Predicted Price': price_final_flat_up})
        up_trend_values = pd.DataFrame({
            'Start Date': d_line_up['Date'].iloc[0],
            'Start Price': d_line_up['Predicted Price'].iloc[0],
            'End Date': d_line_up['Date'].iloc[-1],
            'End Price': d_line_up['Predicted Price'].iloc[-1],
            'Trend': ['Up'],
            'Slope': slope,
            'Intercept': min_c,
            })
        return up_trend_values
    else:
        print('Could not detect any up trend lines')
    



In [635]:

# %%
def final_downtrend(df, x_down, y_down):

    dates_down = pd.to_datetime(x_down)

    if len(dates_down) > 0 and len(x_down) == len(y_down):

        price_down = np.array(y_down)
        t = ((dates_down - dates_down[0]).total_seconds() / 60).values
        # t = (dates_down - dates_down[0]).days.values
        
        time_frame_down = t.reshape(-1, 1)
        model = LinearRegression()
        model.fit(time_frame_down, price_down)

        slope = model.coef_[0]


        c=[]

        for i in range(0, len(dates_down)-1):

            c.append(price_down[i] - (slope * time_frame_down[i]))


        max_c= -100000000000

        for i in range(0 , len(c)):
            if c[i] > max_c:
                max_c = c[i]
            


        price_final_down=[]

        for i in range(0, len(dates_down)):
            price_final_down.append((slope * time_frame_down[i]) + max_c)

        price_final_flat_down = np.concatenate(price_final_down, axis=0)

        d_line_down = pd.DataFrame({'Date': dates_down, 'Predicted Price': price_final_flat_down})

        down_trend_values = pd.DataFrame({
            'Start Date': d_line_down['Date'].iloc[0],
            'Start Price': d_line_down['Predicted Price'].iloc[0],
            'End Date': d_line_down['Date'].iloc[-1],
            'End Price': d_line_down['Predicted Price'].iloc[-1],
            'Trend': ['Down'],
            'Slope' : slope,
            'Intercept': max_c
            })
        return down_trend_values
    else:
        print('Could not detect any down trend lines')
    




In [636]:

# %%
def separate_group(df, trend_df, trend_type):
    consecutive_groups = []
    current_group = {'x': [], 'y': []}
    final_trend=pd.DataFrame()
    for i in range(0, len(trend_df) - 1):
        if trend_df['Start Timeframe'].iloc[i] not in current_group['x']:
            current_group['x'].append(trend_df['Start Timeframe'].iloc[i])
        if trend_df['End Timeframe'].iloc[i] not in current_group['x']:
            current_group['x'].append(trend_df['End Timeframe'].iloc[i])
        if trend_df['Start Price'].iloc[i] not in current_group['y']:
            current_group['y'].append(trend_df['Start Price'].iloc[i])
        if trend_df['End Price'].iloc[i] not in current_group['y']:
            current_group['y'].append(trend_df['End Price'].iloc[i])
        if trend_df.iloc[i]["End Timeframe"] != trend_df.iloc[i + 1]["Start Timeframe"]:
            consecutive_groups.append(current_group)
            current_group = {'x': [], 'y': []}
    current_group['x'].extend([trend_df['Start Timeframe'].iloc[-1], trend_df['End Timeframe'].iloc[-1]])
    current_group['y'].extend([trend_df['Start Price'].iloc[-1], trend_df['End Price'].iloc[-1]])
    consecutive_groups.append(current_group)
    for group in consecutive_groups:
        if(trend_type == 'up'):
            trend_value = final_uptrend(df, group['x'], group['y'])
        else:
            trend_value = final_downtrend(df, group['x'], group['y'])
        final_trend=pd.concat([final_trend, trend_value])
        
    return final_trend


In [637]:

# %%
def initialize_df(df):

    pivots = calculate_pivot_points(df.Close, df.High, df.Low, threshold_value, -1*threshold_value)
    
    df['Pivots'] = pivots
    df['Pivot Price'] = np.nan 
    df['Peaks'] = np.nan
    df['Troughs'] = np.nan
    trend_value = pd.DataFrame()
    
    
    df.loc[df['Pivots'] == 1, 'Pivot Price'] = df.High
    df.loc[df['Pivots'] == -1, 'Pivot Price'] = df.Low
    df.loc[df['Pivots'] == 1, 'Peaks'] = df.High
    df.loc[df['Pivots'] == -1, 'Troughs'] = df.Low
    find_high_low_type(df)
    
    df_high=df.dropna(subset=['High_pattern'])
    df_low=df.dropna(subset=['Low_pattern'])
    
    df_high = find_downtrend(df_high)
    df_low = find_uptrend(df_low)

    uptrend = find_consecutive_up_labels(df_low)
    downtrend = find_consecutive_down_labels(df_high)

    unfiltered_trend = pd.concat([uptrend, downtrend])

    if len(uptrend) > 0:
        trend_value=pd.concat([trend_value, separate_group(df, uptrend, 'up')])
    if len(downtrend) > 0:
        trend_value=pd.concat([trend_value, separate_group(df, downtrend, 'down')])
    return trend_value, unfiltered_trend


In [638]:
def calculate_extended_trends(df, trends):
    for j in range(len(df)):
        if trends['End Date'] == df['Date'].iloc[j]:
            date_start = pd.to_datetime(trends['Start Date'])
            date_end = pd.to_datetime(df['Date'].iloc[j + extend_by])
            x = ((date_end - date_start).total_seconds()/60)
            y = trends['Slope'] * x + trends['Start Price']
            trend_new = pd.DataFrame({
                'Start Date': [trends['Start Date']],
                'Start Price': [trends['Start Price']],
                'End Date': [df['Date'].iloc[j + extend_by]],
                'End Price': [y],
                'Trend': [trends['Trend']],
                'Slope': [trends['Slope']],
                'Intercept': [trends['Start Price']],
            })
            return trend_new

In [639]:
def check_candle_status(df, extended_trend):
    df['Cross'] = None
    for i in range(len(extended_trend)):
        for j in range(len(df)):
            if df['Date'].iloc[j] > extended_trend['Start Date'].iloc[i] and df['Date'].iloc[j] <= extended_trend['End Date'].iloc[i]:
                date_diff = ((df['Date'].iloc[j] - extended_trend['Start Date'].iloc[i]).total_seconds()/60)
                price = extended_trend['Slope'].iloc[i] * date_diff + extended_trend['Intercept'].iloc[i]
                if extended_trend['Trend'].iloc[i] == "Down":
                    if price < df['High'].iloc[j]:
                        if price < df['Low'].iloc[j]:
                            # print(f"Candle is above the downtrend on {df['Date'].iloc[j]} with price {price}") 
                            df['Cross'].iloc[j] = 'Above'
                        else:
                            # print(f"Candle is intercepted with the downtrend on {df['Date'].iloc[j]} with price {price}")   
                            df['Cross'].iloc[j] = 'Intercept'
                else:
                    if price > df['Low'].iloc[j]:
                        if price > df['High'].iloc[j]:
                            # print(f"Candle is below the uptrend on {df['Date'].iloc[j]} with price {price}") 
                            df['Cross'].iloc[j] = 'Below'
                        else:
                            # print(f"Candle is intercepted with the uptrend on {df['Date'].iloc[j]} with price {price}")
                            df['Cross'].iloc[j] = 'Intercept'  
    return df
            

In [640]:

# Input your stock dataframe or multiple ones if you like. You can use yfinance or any other libraries to import dataframe and use it accordingly.
# Just be sure that the function column name matches the name of dataframe's colmuns in the function peak_valley_pivots_candlestick(df, up_thresh, down_thresh)

In [641]:
df = pd.read_sql_query("select date as Date ,open as Open,high  as High,low as Low ,close as Close from futures_price where date >= '2025-01-05 18:00:00' and symbol='NQ'", con=connection)


pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



In [642]:
trends, unfiltered_trend = initialize_df(df)

Could not detect any down trend lines


In [643]:
df['Date'] = pd.to_datetime(df['Date'])
plot_candlestick_with_trends(df, trends)

In [644]:
extended_trend = pd.DataFrame()
for i in range(len(trends)):
    output_trend = calculate_extended_trends(df, trends.iloc[i])
    extended_trend = pd.concat([extended_trend, output_trend], ignore_index=True)          

In [645]:
df['Date'] = pd.to_datetime(df['Date'])
plot_candlestick_with_trends(df, extended_trend)

In [646]:
final_df = check_candle_status(df, extended_trend)
plot_candlestick_with_checked_trends(df, extended_trend)


ChainedAssignmentError: behaviour will change in pandas 3.0!
You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy




A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


