In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd
import numpy as np
from scipy.stats import linregress

In [3]:
# Unified Helper Functions (unchanged)
def compute_rsi(data, periods=14):
    delta = data.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=periods).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=periods).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

In [4]:
def trend_strength(df_segment, window=20):
    df = df_segment.tail(window)
    x = np.arange(len(df))
    slope, _, r_value, _, _ = linregress(x, df["Close"].values)
    macd_avg = df["MACD"].mean()
    if slope > 0 and macd_avg > 0:
        return "bullish", abs(slope), r_value**2
    elif slope < 0 and macd_avg < 0:
        return "bearish", abs(slope), r_value**2
    return "neutral", 0, 0

In [5]:
def find_all_extrema(data):
    local_maxima = []
    local_minima = []
    if isinstance(data, pd.Series):
        index = data.index
        values = data.values
    else:
        index = range(len(data))
        values = data
    for i in range(1, len(values) - 1):
        if values[i] > values[i-1] and values[i] > values[i+1]:
            local_maxima.append((index[i], values[i]))
        elif values[i] < values[i-1] and values[i] < values[i+1]:
            local_minima.append((index[i], values[i]))
    global_max_idx = np.argmax(values)
    global_min_idx = np.argmin(values)
    local_maxima.append((index[global_max_idx], values[global_max_idx]))
    local_minima.append((index[global_min_idx], values[global_min_idx]))
    local_maxima = sorted(set(local_maxima), key=lambda x: x[0])
    local_minima = sorted(set(local_minima), key=lambda x: x[0])
    return local_maxima, local_minima

In [6]:
def get_top_extrema(local_maxima, local_minima, top_n=5):
    top_maxima = sorted(local_maxima, key=lambda x: x[1], reverse=True)[:top_n]
    bottom_minima = sorted(local_minima, key=lambda x: x[1])[:top_n]
    return top_maxima, bottom_minima

In [7]:
# Updated Pattern Detection Functions with Relaxed Conditions
def detect_head_and_shoulders(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(top_maxima) < 3:
        print(f"Head and Shoulders - Insufficient maxima: {len(top_maxima)} < 3")
        return None, None, 0

    top_maxima.sort(key=lambda x: x[0])
    last_three = top_maxima[-3:]
    ls, head, rs = last_three[0], last_three[1], last_three[2]
    shoulder_diff = abs(ls[1] - rs[1]) / ls[1]
    if not (head[1] > ls[1] and head[1] > rs[1] and shoulder_diff < 0.30):  # Relaxed from 0.25
        print("Failed shoulder-head condition")
        return None, None, 0

    neckline_minima = [m for m in local_minima if ls[0] < m[0] < rs[0]]
    if len(neckline_minima) < 2:
        print(f"Head and Shoulders - Insufficient neckline minima: {len(neckline_minima)} < 2")
        return None, None, 0

    neckline_slope, _, r_value, _, _ = linregress(
        [m[0].to_pydatetime().timestamp() for m in neckline_minima],
        [m[1] for m in neckline_minima]
    )
    if abs(neckline_slope) > 0.0024 or r_value**2 < 0.15:  # Relaxed from 0.003 and 0.2
        print(f"Neckline too steep or weak fit: Slope {abs(neckline_slope):.6f} > 0.0024 or R² {r_value**2:.4f} < 0.15")
        return None, None, 0

    neckline = np.mean([m[1] for m in neckline_minima])
    breakdown_threshold = 0.98 if prev_label == "Head and Shoulders" and prev_bb and df_segment.index[0] > prev_bb["x_max"] else 0.97  # Relaxed from 0.99 and 0.98
    if df_segment["Close"].iloc[-1] < neckline * breakdown_threshold:
        trend, strength, r2 = trend_strength(df_segment)
        if trend == "bearish" and df_segment["RSI"].iloc[-1] < 85:  # Relaxed from 80
            confidence = min(0.9 + strength * 2, 1.0) * r_value**2 * 0.9
            return "Head and Shoulders", (ls[0], rs[0]), confidence
    return None, None, 0

In [8]:
def detect_symmetrical_triangle(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(top_maxima) < 2 or len(bottom_minima) < 2:
        print(f"Symmetrical Triangle - Insufficient extrema: Maxima {len(top_maxima)} < 2 or Minima {len(bottom_minima)} < 2")
        return None, None, 0

    top_maxima.sort(key=lambda x: x[0])
    bottom_minima.sort(key=lambda x: x[0])
    max_values = [m[1] for m in top_maxima]
    min_values = [m[1] for m in bottom_minima]
    max_slope, _, max_r, _, _ = linregress(
        [m[0].to_pydatetime().timestamp() for m in top_maxima], max_values
    )
    min_slope, _, min_r, _, _ = linregress(
        [m[0].to_pydatetime().timestamp() for m in bottom_minima], min_values
    )
    if max_r**2 < 0.2 or min_r**2 < 0.2:  # Relaxed from 0.3
        print(f"Trendlines too weak: max_r² {max_r**2:.4f} < 0.2 or min_r² {min_r**2:.4f} < 0.2")
        return None, None, 0

    breakout_val = abs(df_segment["Close"].iloc[-1] - df_segment["Close"].iloc[-3]) / df_segment["Close"].iloc[-3]
    if breakout_val > 0.004:  # Relaxed from 0.005
        trend, strength, r2 = trend_strength(df_segment)
        if trend != "neutral":
            confidence = min(0.8 + strength * 2, 1.0) * min(max_r**2, min_r**2) * 0.8
            return "Symmetrical Triangle", (top_maxima[0][0], df_segment.index[-1]), confidence
    return None, None, 0

In [9]:
def detect_cup_and_handle(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(bottom_minima) < 2 or len(top_maxima) < 1:
        print(f"Cup and Handle - Insufficient extrema: Minima {len(bottom_minima)} < 2 or Maxima {len(top_maxima)} < 1")
        return None, None, 0

    bottom_minima.sort(key=lambda x: x[0])
    cup_min1, cup_min2 = bottom_minima[-2:]
    if (cup_min2[0] - cup_min1[0]).days < 1.5:  # Unchanged from previous relaxation
        print(f"Cup time too short: {(cup_min2[0] - cup_min1[0]).days} < 1.5 days")
        return None, None, 0

    cup_diff = abs(cup_min1[1] - cup_min2[1]) / cup_min1[1]
    if cup_diff > 0.6:  # Relaxed from 0.5
        print(f"Cup difference too large: {cup_diff:.4f} > 0.6")
        return None, None, 0

    mid_maxima = [m[1] for m in local_maxima if cup_min1[0] < m[0] < cup_min2[0]]
    if not mid_maxima:
        print("No mid maxima for cup")
        return None, None, 0

    cup_neckline = max(mid_maxima)
    handle_max = max([(m[0], m[1]) for m in top_maxima if m[0] > cup_min2[0]], default=None)
    if not handle_max or (handle_max[0] - cup_min2[0]).days < 10 or handle_max[1] > cup_neckline * 1.20:  # Changed from < 1 to < 5
        print(f"Handle max invalid: Exists: {bool(handle_max)}, Days: {(handle_max[0] - cup_min2[0]).days if handle_max else 'N/A'} < 5, Value: {handle_max[1] if handle_max else 'N/A'} > {cup_neckline * 1.20:.2f}")
        return None, None, 0

    post_handle = df_segment["Close"].loc[handle_max[0]:].head(3)
    if len(post_handle) < 2:  # Unchanged from previous relaxation
        print(f"Insufficient post-handle data: {len(post_handle)} < 2")
        return None, None, 0

    post_handle_changes = post_handle.pct_change().dropna()
    strict_downtrend = post_handle_changes.mean() < 0.001  # Unchanged from previous relaxation
    if strict_downtrend:
        breakout_threshold = 1.003  # Relaxed from 1.004
        print(f"Close: {df_segment['Close'].iloc[-1]:.2f}, Neckline: {cup_neckline:.2f}, Threshold: {cup_neckline * breakout_threshold:.2f}")
        if df_segment["Close"].iloc[-1] > cup_neckline * breakout_threshold:
            trend, strength, r2 = trend_strength(df_segment)
            print(f"Trend: {trend}, RSI: {df_segment['RSI'].iloc[-1]:.2f}")
            if trend == "bullish" and df_segment["RSI"].iloc[-1] > 35:  # Unchanged from previous relaxation
                confidence = max(0.1, min(0.8 + strength * 2, 1.0) * r2 * 0.8)  # Added confidence floor
                print(f"Detected Cup and Handle with confidence {confidence:.4f}")
                return "Cup and Handle", (cup_min1[0], df_segment.index[-1]), confidence
            else:
                print(f"Trend not bullish or RSI too low: Trend {trend}, RSI {df_segment['RSI'].iloc[-1]:.2f} <= 35")
        else:
            print("Breakout not confirmed")
    else:
        print("Handle not in strict downtrend")
    return None, None, 0


In [10]:
def detect_ascending_triangle(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(top_maxima) < 2 or len(bottom_minima) < 2:
        return None, None, 0

    top_maxima.sort(key=lambda x: x[0])
    bottom_minima.sort(key=lambda x: x[0])
    max_values = [m[1] for m in top_maxima]
    max_mean = np.mean(max_values)
    max_std = np.std(max_values)
    if max_std / max_mean >= 0.12:  # Relaxed from 0.10
        return None, None, 0

    min_values = [m[1] for m in bottom_minima]
    min_slope, _, r_value, _, _ = linregress(range(len(min_values)), min_values)
    if min_slope <= 0.0008 or r_value**2 <= 0.2:  # Relaxed from 0.001 and 0.3
        return None, None, 0

    highest_max = max(max_values)
    breakout = df_segment["Close"].iloc[-1] > highest_max * 0.75 and df_segment["Close"].iloc[-1] < highest_max * 1.25  # Relaxed from 0.8 and 1.2
    if breakout:
        trend, strength, _ = trend_strength(df_segment)
        if trend == "bullish" and df_segment["RSI"].iloc[-1] > 25:  # Relaxed from 30
            confidence = min(0.9 + strength * 5, 1.0) * r_value**2 * 0.8
            return "Ascending Triangle", (top_maxima[0][0], df_segment.index[-1]), confidence
    return None, None, 0


In [11]:
def detect_descending_triangle(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(top_maxima) < 2 or len(bottom_minima) < 2:
        return None, None, 0

    top_maxima.sort(key=lambda x: x[0])
    bottom_minima.sort(key=lambda x: x[0])
    max_values = [m[1] for m in top_maxima]
    max_slope, _, r_value, _, _ = linregress(range(len(max_values)), max_values)
    if max_slope >= -0.0008 or r_value**2 <= 0.2:  # Relaxed from -0.001 and 0.3
        return None, None, 0

    min_values = [m[1] for m in bottom_minima]
    min_mean = np.mean(min_values)
    min_std = np.std(min_values)
    if min_std / min_mean >= 0.12:  # Relaxed from 0.10
        return None, None, 0

    lowest_min = min(min_values)
    breakdown = df_segment["Close"].iloc[-1] < lowest_min * 1.25 and df_segment["Close"].iloc[-1] > lowest_min * 0.75  # Relaxed from 1.2 and 0.8
    if breakdown:
        trend, strength, _ = trend_strength(df_segment)
        if trend == "bearish" and df_segment["RSI"].iloc[-1] < 75:  # Relaxed from 70
            confidence = min(0.9 + strength * 5, 1.0) * r_value**2 * 0.8
            return "Descending Triangle", (top_maxima[0][0], df_segment.index[-1]), confidence
    return None, None, 0


In [12]:
def detect_inverse_head_and_shoulders(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(bottom_minima) < 3:
        return None, None, 0

    bottom_minima.sort(key=lambda x: x[0])
    last_three = bottom_minima[-3:]
    ls, head, rs = last_three[0], last_three[1], last_three[2]
    shoulder_diff = abs(ls[1] - rs[1]) / ls[1]
    if not (head[1] < ls[1] * 0.97 and head[1] < rs[1] * 0.97 and shoulder_diff < 0.20):  # Relaxed from 0.95 and 0.15
        return None, None, 0

    neckline_maxima = [m for m in local_maxima if ls[0] < m[0] < rs[0]]
    if len(neckline_maxima) < 2:
        return None, None, 0

    neckline_slope, _, r_value, _, _ = linregress(
        [m[0].to_pydatetime().timestamp() for m in neckline_maxima], [m[1] for m in neckline_maxima]
    )
    if abs(neckline_slope) > 0.0016 or r_value**2 < 0.2:  # Relaxed from 0.002 and 0.3
        return None, None, 0

    neckline = np.mean([m[1] for m in neckline_maxima])
    breakout_threshold = 1.04 if prev_label == "Inverse Head and Shoulders" and prev_bb and df_segment.index[0] > prev_bb["x_max"] else 1.01  # Relaxed from 1.05 and 1.02
    if df_segment["Close"].iloc[-1] > neckline * breakout_threshold:
        trend, strength, r2 = trend_strength(df_segment)
        if trend == "bullish" and df_segment["RSI"].iloc[-1] > 20:  # Relaxed from 25
            confidence = min(0.9 + strength * 2, 1.0) * r_value**2 * 0.9
            return "Inverse Head and Shoulders", (ls[0], rs[0]), confidence
    return None, None, 0

In [13]:
def detect_falling_wedge(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    print(f"Falling Wedge - Maxima count: {len(top_maxima)}, Minima count: {len(bottom_minima)}")
    if len(top_maxima) < 2 or len(bottom_minima) < 2:
        print(f"Insufficient extrema: Maxima {len(top_maxima)} < 2 or Minima {len(bottom_minima)} < 2")
        return None, None, 0

    top_maxima.sort(key=lambda x: x[0])
    bottom_minima.sort(key=lambda x: x[0])
    max_values = [m[1] for m in top_maxima]
    min_values = [m[1] for m in bottom_minima]
    max_slope, _, max_r, _, _ = linregress(range(len(max_values)), max_values)
    min_slope, _, min_r, _, _ = linregress(range(len(min_values)), min_values)
    print(f"Max slope: {max_slope:.6f}, Min slope: {min_slope:.6f}, max_r²: {max_r**2:.4f}, min_r²: {min_r**2:.4f}")
    if max_slope > -0.0012 or min_slope > -0.0012 or max_r**2 <= 0.25 or min_r**2 <= 0.25:  # Changed >= to > for correct downward slope
        print(f"Slopes not steep enough or weak fit: Max slope {max_slope:.6f} > -0.0012, Min slope {min_slope:.6f} > -0.0012, max_r² {max_r**2:.4f} <= 0.25, min_r² {min_r**2:.4f} <= 0.25")
        return None, None, 0

    slope_ratio = abs(max_slope) / abs(min_slope)
    if slope_ratio >= 4.0:  # Relaxed from 3.5
        print(f"Slope ratio too large: {slope_ratio:.4f} >= 4.0")
        return None, None, 0

    highest_max = max(max_values)
    breakout = df_segment["Close"].iloc[-1] > highest_max * 0.75 and df_segment["Close"].iloc[-1] < highest_max * 1.25  # Relaxed from 0.80-1.20
    print(f"Breakout check: {highest_max * 0.75:.2f} < Close {df_segment['Close'].iloc[-1]:.2f} < {highest_max * 1.25:.2f}")
    if breakout:
        trend, strength, _ = trend_strength(df_segment)
        print(f"Trend: {trend}, RSI: {df_segment['RSI'].iloc[-1]:.2f}")
        if trend == "bullish" and df_segment["RSI"].iloc[-1] > 30:  # Unchanged from previous relaxation
            confidence = max(0.1, min(0.9 + strength * 5, 1.0) * min(max_r**2, min_r**2) * 0.8)  # Added confidence floor
            print(f"Detected Falling Wedge with confidence {confidence:.4f}")
            return "Falling Wedge", (top_maxima[0][0], df_segment.index[-1]), confidence
        else:
            print(f"Trend not bullish or RSI too low: Trend {trend}, RSI {df_segment['RSI'].iloc[-1]:.2f} <= 30")
    else:
        print("No breakout")
    return None, None, 0

In [14]:
def detect_rounding_bottom(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(bottom_minima) < 1 or (df_segment.index[-1] - bottom_minima[0][0]).days < 12:  # Relaxed from 15
        return None, None, 0

    bottom = bottom_minima[0]
    initial_price = df_segment["Close"].iloc[0]
    if initial_price <= bottom[1] * 1.10:  # Relaxed from 1.05
        return None, None, 0

    left = df_segment["Close"].loc[:bottom[0]]
    right = df_segment["Close"].loc[bottom[0]:]
    if len(left) < 8 or len(right) < 8:  # Relaxed from 10
        return None, None, 0

    left_slope, _, left_r, _, _ = linregress(range(len(left.tail(8))), left.tail(8).values)
    right_slope, _, right_r, _, _ = linregress(range(len(right.head(8))), right.head(8).values)
    if left_slope <= -0.00024 or right_slope <= 0.00024 or left_r**2 < 0.15 or right_r**2 < 0.15:  # Relaxed from -0.0003, 0.0003, and 0.20
        return None, None, 0

    breakout_threshold = 1.04 if prev_label == "Rounding Bottom" and prev_bb and df_segment.index[0] > prev_bb["x_max"] else 1.008  # Relaxed from 1.05 and 1.01
    if df_segment["Close"].iloc[-1] > initial_price * breakout_threshold:
        trend, strength, r2 = trend_strength(df_segment)
        if trend == "bullish" and df_segment["RSI"].iloc[-1] > 20:  # Relaxed from 25
            confidence = min(0.9 + strength * 2, 1.0) * min(left_r**2, right_r**2) * 0.9
            return "Rounding Bottom", (df_segment.index[0], df_segment.index[-1]), confidence
    return None, None, 0

In [15]:
def detect_triple_bottom_reversal(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(bottom_minima) < 3:
        return None, None, 0

    bottom_minima.sort(key=lambda x: x[0])
    last_three = bottom_minima[-3:]
    min1, min2, min3 = last_three[0], last_three[1], last_three[2]
    if (min3[0] - min1[0]).days < 8 or (min2[0] - min1[0]).days < 2:  # Relaxed from 10 and 3
        return None, None, 0

    avg_min = (min1[1] + min2[1] + min3[1]) / 3
    diff_min = max(abs(min1[1] - min2[1]), abs(min2[1] - min3[1])) / avg_min
    if diff_min > 0.20:  # Relaxed from 0.15
        return None, None, 0

    mid_maxima = [m for m in local_maxima if min1[0] < m[0] < min3[0]]
    if len(mid_maxima) < 1:  # Relaxed from 2
        return None, None, 0

    max_mean = np.mean([m[1] for m in mid_maxima])
    max_std = np.std([m[1] for m in mid_maxima])
    if max_std / max_mean > 0.30:  # Relaxed from 0.25
        return None, None, 0

    neckline = max([m[1] for m in mid_maxima])
    breakout_threshold = 1.04 if prev_label == "Triple Bottom Reversal" and prev_bb and df_segment.index[0] > prev_bb["x_max"] else 1.008  # Relaxed from 1.05 and 1.01
    if df_segment["Close"].iloc[-1] > neckline * breakout_threshold:
        trend, strength, r2 = trend_strength(df_segment)
        if trend == "bullish" and df_segment["RSI"].iloc[-1] > 20:  # Relaxed from 25
            confidence = min(0.9 + strength * 2, 1.0) * r2 * 0.9
            return "Triple Bottom Reversal", (min1[0], min3[0]), confidence
    return None, None, 0

In [16]:
def detect_double_top(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(top_maxima) < 2:
        return None, None, 0

    top_maxima.sort(key=lambda x: x[0])
    last_two = top_maxima[-2:]
    max1, max2 = last_two[0], last_two[1]
    if (max2[0] - max1[0]).days < 4:  # Relaxed from 5
        return None, None, 0

    initial_price = df_segment["Close"].iloc[0]
    if initial_price >= max1[1] * 0.95 or initial_price >= max2[1] * 0.95:  # Relaxed from 0.97
        return None, None, 0

    diff = abs(max1[1] - max2[1]) / ((max1[1] + max2[1]) / 2)
    if diff > 0.20:  # Relaxed from 0.15
        return None, None, 0

    mid_minima = [m[1] for m in local_minima if max1[0] < m[0] < max2[0]]
    if not mid_minima:
        return None, None, 0

    neckline = min(mid_minima)
    if not (initial_price * 0.75 < neckline < initial_price * 1.25):  # Relaxed from 0.8 and 1.2
        return None, None, 0

    breakdown_threshold = 0.96 if prev_label == "Double Top" and prev_bb and df_segment.index[0] > prev_bb["x_max"] else 0.97  # Relaxed from 0.95 and 0.98
    breakdown = df_segment["Close"].loc[max2[0]:].min() < neckline * breakdown_threshold
    if breakdown:
        trend, strength, r2 = trend_strength(df_segment)
        if trend == "bearish" and df_segment["RSI"].iloc[-1] < 80:  # Relaxed from 75
            confidence = min(0.9 + strength * 2, 1.0) * r2 * 0.9
            return "Double Top", (max1[0], max2[0]), confidence
    return None, None, 0

In [17]:
def detect_double_bottom(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    top_maxima, bottom_minima = get_top_extrema(local_maxima, local_minima)
    if len(bottom_minima) < 2:
        return None, None, 0

    bottom_minima.sort(key=lambda x: x[0])
    last_two = bottom_minima[-2:]
    min1, min2 = last_two[0], last_two[1]
    if (min2[0] - min1[0]).days < 4:  # Relaxed from 5
        return None, None, 0

    initial_price = df_segment["Close"].iloc[0]
    if initial_price <= min1[1] * 1.05 or initial_price <= min2[1] * 1.05:  # Relaxed from 1.03
        return None, None, 0

    diff = abs(min1[1] - min2[1]) / ((min1[1] + min2[1]) / 2)
    if diff > 0.20:  # Relaxed from 0.15
        return None, None, 0

    mid_maxima = [m[1] for m in local_maxima if min1[0] < m[0] < min2[0]]
    if not mid_maxima:
        return None, None, 0

    neckline = max(mid_maxima)
    if not (initial_price * 0.75 < neckline < initial_price * 1.25):  # Relaxed from 0.8 and 1.2
        return None, None, 0

    breakout_threshold = 1.04 if prev_label == "Double Bottom" and prev_bb and df_segment.index[0] > prev_bb["x_max"] else 1.01  # Relaxed from 1.05 and 1.02
    breakout = df_segment["Close"].loc[min2[0]:].max() > neckline * breakout_threshold
    if breakout:
        trend, strength, r2 = trend_strength(df_segment)
        if trend == "bullish" and df_segment["RSI"].iloc[-1] > 20:  # Relaxed from 25
            confidence = min(0.9 + strength * 2, 1.0) * r2 * 0.9
            return "Double Bottom", (min1[0], min2[0]), confidence
    return None, None, 0

In [18]:
def detect_flag_and_pole(df_segment, prev_label=None, prev_bb=None):
    local_maxima, local_minima = find_all_extrema(df_segment["Close"])
    if len(local_maxima) < 1 or len(local_minima) < 1:
        return None, None, 0

    pole_max = sorted(local_maxima, key=lambda x: x[0])[0]
    initial_price = df_segment["Close"].iloc[0]
    if initial_price >= pole_max[1] * 0.95:  # Relaxed from 0.97
        return None, None, 0

    pole_change = (pole_max[1] - initial_price) / initial_price
    if pole_change < 0.025 or (pole_max[0] - df_segment.index[0]).days < 4:  # Relaxed from 0.03 and 5
        return None, None, 0

    flag_min = next((m for m in local_minima if m[0] > pole_max[0]), None)
    if not flag_min or (flag_min[0] - pole_max[0]).days < 2:  # Relaxed from 3
        return None, None, 0

    flag_range = df_segment["Close"].loc[pole_max[0]:flag_min[0]]
    flag_volatility = flag_range.std() / flag_range.mean()
    if flag_volatility > 0.04:  # Relaxed from 0.03
        return None, None, 0

    if df_segment["Close"].iloc[-1] > pole_max[1] * 1.008:  # Relaxed from 1.01
        trend, strength, r2 = trend_strength(df_segment)
        if trend == "bullish":
            confidence = min(0.9 + strength * 2, 1.0) * r2 * 0.9
            return "Flag and Pole", (df_segment.index[0], df_segment.index[-1]), confidence
    return None, None, 0

In [19]:
# Main Processing Function (updated)
def process_patterns(csv_path):
    df = pd.read_csv(csv_path)
    required_cols = ["Date", "Open", "High", "Low", "Close"]
    if not all(col in df.columns for col in required_cols):
        raise ValueError("CSV must contain 'Date', 'Open', 'High', 'Low', 'Close' columns.")

    df["Date"] = pd.to_datetime(df["Date"])
    df.set_index("Date", inplace=True)
    df.sort_index(inplace=True)

    df["MACD"] = df["Close"].ewm(span=12, adjust=False).mean() - df["Close"].ewm(span=26, adjust=False).mean()
    df["Signal"] = df["MACD"].ewm(span=9, adjust=False).mean()
    df["RSI"] = compute_rsi(df["Close"])

    num_days = 90
    step = 90
    results = []
    prev_label = None
    prev_bb = None
    MIN_CONFIDENCE = 0.02 # Relaxed from 0.005

    pattern_functions = [
        detect_head_and_shoulders, detect_symmetrical_triangle, detect_cup_and_handle,
        detect_ascending_triangle, detect_descending_triangle, detect_inverse_head_and_shoulders,
        detect_falling_wedge, detect_rounding_bottom, detect_triple_bottom_reversal,
        detect_double_top, detect_double_bottom, detect_flag_and_pole
    ]

    for start_idx in range(0, len(df) - num_days + 1, step):
        end_idx = start_idx + num_days
        df_segment = df.iloc[start_idx:end_idx].dropna(subset=["Close", "RSI", "MACD"])
        label = f"{start_idx + 1}-{end_idx}"
        print(f"\nProcessing segment: {label}")

        best_pattern = None
        best_date_range = None
        best_confidence = 0

        for pattern_func in pattern_functions:
            pattern_name, date_range, confidence = pattern_func(df_segment, prev_label, prev_bb)
            if pattern_name and confidence > best_confidence:
                print(f"Detected {pattern_name} with confidence {confidence:.4f}")
                best_pattern = pattern_name
                best_date_range = date_range
                best_confidence = confidence

        if best_confidence < MIN_CONFIDENCE:
            results.append({"label": label, "pattern": "No Pattern Detected", "confidence": 0.0, "bounding_box": None})
            prev_label = None
            prev_bb = None
        else:
            start_date, end_date = best_date_range
            segment = df_segment.loc[start_date:end_date]
            x_min, x_max = segment.index[0], segment.index[-1]
            y_min, y_max = segment["Low"].min(), segment["High"].max()
            bounding_box = {"x_min": x_min, "y_min": y_min, "x_max": x_max, "y_max": y_max}
            results.append({
                "label": label,
                "pattern": best_pattern,
                "confidence": best_confidence,
                "bounding_box": bounding_box
            })
            prev_label = best_pattern
            prev_bb = bounding_box

    detected = sum(1 for r in results if r["pattern"] != "No Pattern Detected")
    print(f"Detected patterns in {detected}/{len(results)} segments ({detected/len(results)*100:.1f}%)")

    for result in results:
        print(f"Period: {result['label']}")
        print(f"Pattern: {result['pattern']} (Confidence: {result['confidence']:.4f})")
        if result['bounding_box']:
            print(f"Bounding Box: x_min={result['bounding_box']['x_min']}, y_min={result['bounding_box']['y_min']}, "
                  f"x_max={result['bounding_box']['x_max']}, y_max={result['bounding_box']['y_max']}")
        print()

    return results

# Usage
if __name__ == "__main__":
    csv_path = "/content/drive/MyDrive/nifty 50/AXISBANK.csv"  # Update as needed
    results = process_patterns(csv_path)


Processing segment: 1-90
Failed shoulder-head condition
Detected Symmetrical Triangle with confidence 0.4257
Handle max invalid: Exists: True, Days: 4 < 5, Value: 39.7 > 38.82
Detected Ascending Triangle with confidence 0.6952
Falling Wedge - Maxima count: 5, Minima count: 5
Max slope: -0.965000, Min slope: 1.490000, max_r²: 0.7482, min_r²: 0.8690
Slopes not steep enough or weak fit: Max slope -0.965000 > -0.0012, Min slope 1.490000 > -0.0012, max_r² 0.7482 <= 0.25, min_r² 0.8690 <= 0.25

Processing segment: 91-180
Failed shoulder-head condition
Close: 36.95, Neckline: 34.45, Threshold: 34.55
Trend: neutral, RSI: 41.21
Trend not bullish or RSI too low: Trend neutral, RSI 41.21 <= 35
Falling Wedge - Maxima count: 5, Minima count: 5
Max slope: -0.400000, Min slope: 0.165000, max_r²: 0.3421, min_r²: 0.8377
Slopes not steep enough or weak fit: Max slope -0.400000 > -0.0012, Min slope 0.165000 > -0.0012, max_r² 0.3421 <= 0.25, min_r² 0.8377 <= 0.25

Processing segment: 181-270
Failed shoul

In [20]:
pattern_functions = [
    detect_head_and_shoulders, detect_symmetrical_triangle, detect_cup_and_handle,
    detect_ascending_triangle, detect_descending_triangle, detect_inverse_head_and_shoulders,
    detect_falling_wedge, detect_rounding_bottom, detect_triple_bottom_reversal,
    detect_double_top, detect_double_bottom, detect_flag_and_pole
]

In [22]:
import os
def process_nifty_fifty_directory(directory="/content/drive/MyDrive/nifty 50",
                                  output_csv="/content/drive/MyDrive/nifty50_labels.csv"):
    total_segments = 0
    detected_segments = 0
    all_results = []

    # Ensure the directory exists
    if not os.path.exists(directory):
        print(f"Directory '{directory}' does not exist.")
        return None

    # Iterate through all CSV files in the directory
    for filename in os.listdir(directory):
        if filename.endswith(".csv"):
            company = filename.replace('.csv', '')
            filepath = os.path.join(directory, filename)
            print(f"\n=== Processing file: {filename} ===")

            try:
                # Process the CSV file
                file_results = process_patterns(filepath)

                # Convert results to DataFrame format
                for result in file_results:
                    period = result["label"]
                    company_period = f"{company}_{period}"
                    if result["pattern"] != "No Pattern Detected" and result["bounding_box"]:
                        all_results.append({
                            "Company_Period": company_period,
                            "Label": result["pattern"],
                            "Confidence": result["confidence"],
                            "x_min": result["bounding_box"]["x_min"],
                            "y_min": result["bounding_box"]["y_min"],
                            "x_max": result["bounding_box"]["x_max"],
                            "y_max": result["bounding_box"]["y_max"]
                        })
                    else:
                        all_results.append({
                            "Company_Period": company_period,
                            "Label": result["pattern"],
                            "Confidence": 0.0,
                            "x_min": None,
                            "y_min": None,
                            "x_max": None,
                            "y_max": None
                        })

                # Update segment counts
                file_detected = sum(1 for r in file_results if r["pattern"] != "No Pattern Detected")
                file_total = len(file_results)
                total_segments += file_total
                detected_segments += file_detected

            except Exception as e:
                print(f"Error processing {filename}: {str(e)}")

    # Create DataFrame
    results_df = pd.DataFrame(all_results)

    # Overall summary
    detection_rate = (detected_segments / total_segments) * 100 if total_segments > 0 else 0
    print(f"\n=== Overall Summary ===")
    print(f"Total files processed: {len([f for f in os.listdir(directory) if f.endswith('.csv')])}")
    print(f"Detected patterns in {detected_segments}/{total_segments} segments across all files ({detection_rate:.1f}%)")

    # Save DataFrame to CSV
    results_df.to_csv(output_csv, index=False)
    print(f"Saved labels to {output_csv}")

    return results_df

# Updated main block
if __name__ == "__main__":
    results_df = process_nifty_fifty_directory("/content/drive/MyDrive/nifty 50")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m

Period: 1891-1980
Pattern: No Pattern Detected (Confidence: 0.0000)

Period: 1981-2070
Pattern: Double Top (Confidence: 0.3049)
Bounding Box: x_min=2008-01-07 00:00:00, y_min=530.5, x_max=2008-02-04 00:00:00, y_max=807.8

Period: 2071-2160
Pattern: No Pattern Detected (Confidence: 0.0000)

Period: 2161-2250
Pattern: No Pattern Detected (Confidence: 0.0000)

Period: 2251-2340
Pattern: Triple Bottom Reversal (Confidence: 0.5864)
Bounding Box: x_min=2009-02-05 00:00:00, y_min=128.0, x_max=2009-02-24 00:00:00, y_max=144.1

Period: 2341-2430
Pattern: Triple Bottom Reversal (Confidence: 0.8093)
Bounding Box: x_min=2009-07-02 00:00:00, y_min=254.1, x_max=2009-07-13 00:00:00, y_max=307.7

Period: 2431-2520
Pattern: No Pattern Detected (Confidence: 0.0000)

Period: 2521-2610
Pattern: No Pattern Detected (Confidence: 0.0000)

Period: 2611-2700
Pattern: Symmetrical Triangle (Confidence: 0.5213)
Bounding Box: x_min=2010-10-04 00:00: