In [46]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
import yfinance as yf
import mplfinance as mpf
import matplotlib.pyplot as plt

In [45]:
%pip install yfinance
%pip install mplfinance

Note: you may need to restart the kernel to use updated packages.
Collecting mplfinance
  Using cached mplfinance-0.12.10b0-py3-none-any.whl.metadata (19 kB)
Using cached mplfinance-0.12.10b0-py3-none-any.whl (75 kB)
Installing collected packages: mplfinance
Successfully installed mplfinance-0.12.10b0
Note: you may need to restart the kernel to use updated packages.


In [None]:
def find_local_extrema(df, window=3):
    """
    Very simple local high/low markers using a rolling window.
    A point is a local high if its High is the max over [t-window, t+window].
    Similar for local low.
    """
    highs = df['High']
    lows = df['Low']

    local_high = (highs == highs.rolling(window*2+1, center=True).max())
    local_low = (lows == lows.rolling(window*2+1, center=True).min())

    return local_high.fillna(False), local_low.fillna(False)


In [144]:
def detect_double_tops(
    df,
    peak_window=3,
    peak_tolerance=0.01,     # peaks must be within 1%
    min_peak_gap=15,
    max_peak_gap=30,
    min_trough_drop=0.03,    # trough must be at least 3% below peaks
    require_lower_second_vol=True
):
    """
    Detect double top patterns and return a DataFrame of pattern events
    *before confirmation* (confirmation checked separately).
    """
    local_high, local_low = find_local_extrema(df, window=peak_window)
    
    peaks = df[local_high].copy()
    troughs = df[local_low].copy()

    peaks_idx = peaks.index
    events = []

    for i in range(len(peaks_idx)):
        t1 = peaks_idx[i]
        p1 = peaks.loc[t1]

        # look for second peak within [min_peak_gap, max_peak_gap] days
        min_date = t1 + min_peak_gap
        max_date = t1 + max_peak_gap

        candidate_peaks2 = peaks[(peaks.index >= min_date) & (peaks.index <= max_date)]
        if candidate_peaks2.empty:
            continue

        for t2, p2 in candidate_peaks2.iterrows():
            price1 = p1['High']
            price2 = p2['High']

            # peaks similar in height
            if abs(price2 - price1) / price1 > peak_tolerance:
                continue

            # trough between them
            mid_troughs = troughs[(troughs.index > t1) & (troughs.index < t2)]
            if mid_troughs.empty:
                continue

            trough_date = mid_troughs['Low'].idxmin()
            trough_price = mid_troughs.loc[trough_date, 'Low']

            # trough must be meaningfully below peaks
            avg_peak = (price1 + price2) / 2
            if (avg_peak - trough_price) / avg_peak < min_trough_drop:
                continue

            # volume condition (second peak volume < first)
            vol1 = p1['Volume']
            vol2 = p2['Volume']
            if require_lower_second_vol and not (vol2 < vol1):
                continue

            events.append({
                'peak1_date': t1,
                'peak2_date': t2,
                'trough_date': trough_date,
                'peak1_price': price1,
                'peak2_price': price2,
                'trough_price': trough_price,
                'peak_gap_days': (t2 - t1),
                'vol1': vol1,
                'vol2': vol2,
                'vol2_vol1_ratio': vol2 / vol1 if vol1 > 0 else np.nan
            })

    events_df = pd.DataFrame(events)
    return events_df


In [82]:
def confirm_double_tops(df, events_df, max_confirm_days=20):
    """
    For each candidate double top, look for first close below trough (neckline)
    after the second peak. That date is confirmation_date.
    """
    confirmed = []

    for _, row in events_df.iterrows():
        t2 = row['peak2_date']
        neck_price = row['trough_price']

        start = t2 + 1
        end = t2 + max_confirm_days

        post = df.loc[(df.index >= start) & (df.index <= end)]

        below = post[post['Close'] < neck_price]
        if below.empty:
            continue

        confirm_date = below.index[0]
        confirm_price = below.loc[confirm_date, 'Close']

        r = row.to_dict()
        r.update({
            'confirm_date': confirm_date,
            'confirm_price': confirm_price
        })
        confirmed.append(r)

    confirmed_df = pd.DataFrame(confirmed)
    return confirmed_df


In [148]:
def compute_forward_returns(df, events_df, horizons=(5, 20, 60)):
    """
    For each event (by confirm_date), compute forward returns.
    Returns are simple percentage returns: Close[t+H] / Close[confirm] - 1.
    """
    events_df = events_df.copy()
    print('in cfr')
    print(events_df.head())
    for h in horizons:
        col = f'ret_{h}d'
        vals = []
        for _, row in events_df.iterrows():
            t0 = row['confirm_date']
            t_fwd = df.index[df.index.get_loc(t0) + h] if (df.index.get_loc(t0) + h) < len(df.index) else None
            if t_fwd is None:
                vals.append(np.nan)
                continue

            price0 = df.loc[t0, 'Close']
            price1 = df.loc[t_fwd, 'Close']
            vals.append(price1 / price0 - 1.0)
        events_df[col] = vals
    return events_df


In [11]:
def sample_random_events(df, n_events, horizons=(5, 20, 60), seed=42, buffer=60):
    """
    Sample random confirmation dates from df index, avoiding first/last `buffer` days
    so all horizons fit. Returns a DataFrame similar to events_df with forward returns.
    """
    rng = np.random.default_rng(seed)
    idx = df.index

    valid_idx = idx[buffer:-buffer]  # avoid edges
    chosen_idx = rng.choice(valid_idx, size=min(n_events, len(valid_idx)), replace=False)

    events = []
    for t0 in chosen_idx:
        row = {'confirm_date': t0, 'confirm_price': df.loc[t0, 'Close']}
        events.append(row)
    rand_df = pd.DataFrame(events)

    # reuse forward-return logic
    rand_df = compute_forward_returns(df, rand_df, horizons=horizons)
    rand_df['type'] = 'random'
    return rand_df


In [12]:
def ma_crossover_signals(df, short_window=20, long_window=50, horizons=(5, 20, 60)):
    data = df.copy()
    data['ma_short'] = data['Close'].rolling(short_window).mean()
    data['ma_long'] = data['Close'].rolling(long_window).mean()

    # short MA crossing from above to below long MA
    prev = data.shift(1)
    cond_prev = prev['ma_short'] > prev['ma_long']
    cond_now = data['ma_short'] <= data['ma_long']
    signals = data[cond_prev & cond_now].dropna()

    events = []
    for t0, row in signals.iterrows():
        events.append({'confirm_date': t0, 'confirm_price': row['Close']})
    ma_df = pd.DataFrame(events)

    ma_df = compute_forward_returns(df, ma_df, horizons=horizons)
    ma_df['type'] = 'ma_crossover'
    return ma_df


In [13]:
def summarize_returns(ret_series):
    """
    ret_series: pandas Series of returns for a given horizon.
    """
    ret = ret_series.dropna()
    if len(ret) < 5:
        return {
            'n': len(ret),
            'mean': np.nan,
            'std': np.nan,
            't_stat': np.nan,
            'p_value': np.nan,
            'hit_ratio_neg': np.nan,
            'sharpe': np.nan,
            'cohen_d': np.nan
        }

    mean = ret.mean()
    std = ret.std(ddof=1)
    t_stat, p_value = stats.ttest_1samp(ret, popmean=0.0)
    hit_ratio_neg = (ret < 0).mean()
    sharpe = mean / std if std > 0 else np.nan
    cohen_d = mean / std if std > 0 else np.nan  # same formula

    return {
        'n': len(ret),
        'mean': mean,
        'std': std,
        't_stat': t_stat,
        'p_value': p_value,
        'hit_ratio_neg': hit_ratio_neg,
        'sharpe': sharpe,
        'cohen_d': cohen_d
    }


In [14]:
def evaluate_all(events_dt, events_rand, events_ma, horizons=(5, 20, 60)):
    """
    Build a summary DataFrame over horizons and event types.
    """
    records = []
    for h in horizons:
        col = f'ret_{h}d'
        for label, df_src in [('double_top', events_dt),
                              ('random', events_rand),
                              ('ma_crossover', events_ma)]:
            stats_dict = summarize_returns(df_src[col])
            stats_dict.update({'horizon': h, 'type': label})
            records.append(stats_dict)

    summary_df = pd.DataFrame(records)
    return summary_df


In [15]:
def tag_weekly_top(df, events_df, lookback_weeks=12):
    """
    For each confirmation date, check if it's within a weekly bar that
    is near the max weekly close over a lookback window – crude 'weekly top'.
    """
    # Resample to weekly close
    weekly = df['Close'].resample('W-FRI').last()
    weekly_max = weekly.rolling(lookback_weeks).max()

    tags = []
    for _, row in events_df.iterrows():
        t_conf = row['confirm_date']
        # find the week label corresponding to this day
        week_label = weekly.index[weekly.index.get_loc(t_conf, method='bfill')]
        wk_close = weekly.loc[week_label]
        wk_max = weekly_max.loc[week_label]
        if pd.isna(wk_max):
            tags.append(False)
        else:
            # consider it a weekly top if within 1% of the rolling max
            tags.append((wk_max - wk_close) / wk_max <= 0.01)

    events_df = events_df.copy()
    events_df['is_weekly_top'] = tags
    return events_df


In [17]:
def plot_return_distributions(dt_events, rand_events, horizon=20):
    col = f'ret_{horizon}d'
    plt.figure(figsize=(8, 5))
    plt.hist(rand_events[col].dropna(), bins=30, alpha=0.5, label='Random')
    plt.hist(dt_events[col].dropna(), bins=30, alpha=0.5, label='Double top')
    plt.axvline(0, linestyle='--')
    plt.title(f'{horizon}-day return distribution')
    plt.xlabel('Return')
    plt.ylabel('Count')
    plt.legend()
    plt.tight_layout()
    plt.show()


In [21]:
def plot_parameter_heatmap(dt_events, horizon=20, gap_bins=[15,20,25,30], vol_bins=[0.0,0.5,0.8,1.0]):
    col = f'ret_{horizon}d'
    data = dt_events.dropna(subset=[col]).copy()
    data['gap_bin'] = pd.cut(data['peak_gap_days'], bins=gap_bins, include_lowest=True)
    data['vol_bin'] = pd.cut(data['vol2_vol1_ratio'], bins=vol_bins, include_lowest=True)

    pivot = data.pivot_table(index='gap_bin', columns='vol_bin', values=col, aggfunc='mean')

    plt.figure(figsize=(6, 5))
    im = plt.imshow(pivot.values, aspect='auto', origin='lower')
    plt.colorbar(im, label=f'Mean {horizon}-day return')
    plt.xticks(ticks=range(len(pivot.columns)), labels=[str(c) for c in pivot.columns], rotation=45)
    plt.yticks(ticks=range(len(pivot.index)), labels=[str(i) for i in pivot.index])
    plt.xlabel('vol2/vol1 bin')
    plt.ylabel('peak_gap_days bin')
    plt.title('Double top parameter “heatmap”')
    plt.tight_layout()
    plt.show()


In [18]:
def summarize_pattern_performance(summary_df, horizon=20):
    row = summary_df[(summary_df['type'] == 'double_top') &
                     (summary_df['horizon'] == horizon)].iloc[0]
    row_rand = summary_df[(summary_df['type'] == 'random') &
                          (summary_df['horizon'] == horizon)].iloc[0]

    mean = row['mean']
    p = row['p_value']
    hit = row['hit_ratio_neg']
    sharpe = row['sharpe']
    d = row['cohen_d']

    mean_rand = row_rand['mean']
    hit_rand = row_rand['hit_ratio_neg']

    text = []

    text.append(f"For {horizon}-day returns after confirmed double tops (n={int(row['n'])}), "
                f"the average return is {mean:.3%} with p-value {p:.3f}.")
    text.append(f"The hit ratio (probability of a negative return) is {hit:.2%} "
                f"vs {hit_rand:.2%} for random dates.")
    text.append(f"The Sharpe ratio is {sharpe:.2f} and the effect size (Cohen's d) is {d:.2f}.")
    text.append("By the project’s working rule, the pattern 'works' at this horizon if:\n"
                "- average return < 0 and p < 0.05;\n"
                "- hit ratio > random;\n"
                "- Sharpe > 0.5 or |d| > 0.3.")

    works = (mean < 0) and (p < 0.05) and (hit > hit_rand) and ((sharpe > 0.5) or (abs(d) > 0.3))
    text.append(f"Based on the current sample, the double top pattern "
                f"{'meets' if works else 'does NOT meet'} these criteria at {horizon} days.")

    return "\n".join(text)


In [145]:
def run_double_top_pipeline(
    ticker: str,
    start: str = "2015-01-01",
    end: str = "2024-12-31",
    horizons=(5, 20, 60),
    save_prefix: str | None = None,
    make_plots: bool = False,
):
    """
    End-to-end pipeline to test Double Top performance for one ticker.

    Steps:
      1. Download daily OHLCV via yfinance.
      2. Detect Double Tops (strict two-peak pattern with trough).
      3. Confirm when price closes below the trough (neckline).
      4. Compute forward returns for double-top events (5/20/60 days by default).
      5. Build baselines: random timestamps + bearish MA crossover.
      6. Compute summary stats & significance tests.
      7. Optionally save CSVs & generate plots.

    Parameters
    ----------
    ticker : str
        Ticker symbol, e.g. "SPY", "AAPL".
    start : str
        Start date for data (YYYY-MM-DD).
    end : str
        End date for data (YYYY-MM-DD).
    horizons : tuple of int
        Forward-return horizons in trading days.
    save_prefix : str or None
        If not None, will save:
            {save_prefix}_events.csv
            {save_prefix}_summary.csv
    make_plots : bool
        If True, will show a distribution plot and a simple parameter heatmap.

    Returns
    -------
    dt_events : pd.DataFrame
        Double-top events with features + forward returns.
    rand_events : pd.DataFrame
        Random baseline events with forward returns.
    ma_events : pd.DataFrame
        Moving-average crossover baseline events.
    summary_df : pd.DataFrame
        Summary statistics across horizons & event types.
    text_summary_20d : str
        Plain-language summary at the 20-day horizon (if available).
    """

    # 1) pull data; do in full run, not local data
    # raw = yf.download(ticker, start=start, end=end)
    # if raw.empty:
    #     raise ValueError(f"No data returned for {ticker} between {start} and {end}.")

    # df = raw[["Open", "High", "Low", "Close", "Volume"]].copy().sort_index()

    # 1) pull data from local files
    df = pd.read_csv(f"sp500/sp500/{ticker}.csv", 
                     header=0, 
                     skiprows=[1],
                     dtype={
                        "Open": float,
                        "High": float,
                        "Low": float,
                        "Close": float,
                        "Volume": float
                        }
    )

    # 2) detect + confirm double tops
    dt_candidates = detect_double_tops(df, peak_window=3, peak_tolerance=0.01,min_peak_gap=15, max_peak_gap=30, min_trough_drop=0.03, require_lower_second_vol=True)

    dt_confirmed = confirm_double_tops(df, dt_candidates, max_confirm_days=40)

    if dt_confirmed.empty:
        print(f"[{ticker}] No confirmed double tops found. Consider loosening parameters.")
        # still build empty containers to keep interface consistent
        dt_events = pd.DataFrame()
        rand_events = pd.DataFrame()
        ma_events = pd.DataFrame()
        summary_df = pd.DataFrame()
        return dt_events, rand_events, ma_events, summary_df, ""

    # optional weekly-level top tagging; not implemented correctly yet
    # dt_confirmed = tag_weekly_top(df, dt_confirmed)

    # 3) forward returns for double-top events
    dt_events = compute_forward_returns(df, dt_confirmed, horizons=horizons)
    dt_events["symbol"] = ticker
    dt_events["type"] = "double_top"

    print(dt_events.head())


    # 4) baselines
    rand_events = sample_random_events(df, n_events=len(dt_events), horizons=horizons)
    rand_events["symbol"] = ticker

    ma_events = ma_crossover_signals(df, horizons=horizons)
    ma_events["symbol"] = ticker

    # 5) summary statistics
    summary_df = evaluate_all(dt_events, rand_events, ma_events, horizons=horizons)
    summary_df["symbol"] = ticker

    # 6) CSVs (if requested)
    if save_prefix is not None:
        dt_events.to_csv(f"{save_prefix}_events.csv", index=False)
        summary_df.to_csv(f"{save_prefix}_summary.csv", index=False)
        print(f"Saved events to {save_prefix}_events.csv and summary to {save_prefix}_summary.csv")

    # 7) plots (optional)
    if make_plots:
        # choose the middle horizon for prettier plots if available
        mid_h = horizons[len(horizons) // 2]
        plot_return_distributions(dt_events, rand_events, horizon=mid_h)
        plot_parameter_heatmap(dt_events, horizon=mid_h)

    # 8) plain-language summary at 20d if available
    h_for_text = 20 if 20 in horizons else horizons[len(horizons) // 2]
    try:
        text_summary = summarize_pattern_performance(summary_df, horizon=h_for_text)
        print("\nText summary:\n")
        print(text_summary)
    except Exception as e:
        text_summary = f"Could not generate text summary (likely too few events). Error: {e}"
        print(text_summary)

    return dt_events, rand_events, ma_events, summary_df, text_summary


In [149]:
import os

root_path = './sp500/sp500/'

dt_events, rand_events, ma_events, summary_df, text_summary = None, None, None, None, None

for current_dir, subdirs, files in os.walk(root_path):
    for fname in files:
        dt_events, rand_events, ma_events, summary_df, text_summary = run_double_top_pipeline(fname.split('.')[0])
        break

in cfr
   peak1_date  peak2_date  trough_date  peak1_price  peak2_price  \
0       120.0       138.0        123.0    37.250332    37.251425   
1       230.0       249.0        239.0    39.106458    39.263570   
2       396.0       413.0        410.0    44.900998    44.817119   
3       396.0       417.0        410.0    44.900998    44.630730   
4      1047.0      1070.0       1051.0    78.013037    78.449795   

   trough_price  peak_gap_days       vol1       vol2  vol2_vol1_ratio  \
0     35.463486           18.0  5265800.0  2949400.0         0.560105   
1     36.878925           19.0  7988900.0  1757000.0         0.219930   
2     42.953227           17.0  1881400.0  1645100.0         0.874402   
3     42.953227           21.0  1881400.0  1333000.0         0.708515   
4     73.852852           23.0  3204100.0  2180100.0         0.680409   

   confirm_date  confirm_price  
0           158      35.389771  
1           255      36.145638  
2           425      41.825581  
3           4

In [139]:
def plot_candles(
    df,
    dt_events,
    title="Double Top Candlestick Chart",
    volume=True,
    mav=None,
    date_range=None,
    save_path=None,
):
    """
    Plot a candlestick chart and overlay Double Top peaks, troughs, and confirmations.

    Parameters
    ----------
    df : pd.DataFrame
        OHLCV DataFrame with DatetimeIndex and columns:
        ['Open', 'High', 'Low', 'Close', 'Volume'].

    dt_events : pd.DataFrame
        Events DataFrame with at least:
        - 'peak1_date'
        - 'peak2_date'
        - 'trough_date'
        - 'confirm_date'

    title : str
        Chart title.

    volume : bool
        Whether to include the volume subplot.

    mav : tuple or None
        Moving averages to draw, e.g. (20, 50).

    date_range : tuple(str or Timestamp, str or Timestamp) or None
        Optional (start, end) to zoom, e.g. ('2020-01-01', '2021-01-01').

    save_path : str or None
        If provided, saves the figure to this path.
    """
    print(df.head())
    print(dt_events.head())
    # Ensure datetime types in dt_events
    for col in ["peak1_date", "peak2_date", "trough_date", "confirm_date"]:
        if col in dt_events.columns:
            dt_events[col] = pd.to_datetime(dt_events[col])

    # Optionally zoom to a date range
    if date_range is not None:
        start, end = pd.to_datetime(date_range[0]), pd.to_datetime(date_range[1])
        df_plot = df.loc[(df.index >= start) & (df.index <= end)].copy()
        events_plot = dt_events[
            (dt_events["confirm_date"] >= start) & (dt_events["confirm_date"] <= end)
        ].copy()
    else:
        df_plot = df.copy()
        events_plot = dt_events.copy()

    if df_plot.empty:
        print("No data in selected date range.")
        return

    # Initialize marker series (index aligned with df_plot)
    idx = df_plot.index
    peak1_series = pd.Series(np.nan, index=idx)
    peak2_series = pd.Series(np.nan, index=idx)
    trough_series = pd.Series(np.nan, index=idx)
    confirm_series = pd.Series(np.nan, index=idx)

    # Fill marker positions from dt_events
    for _, row in events_plot.iterrows():
        # Peaks plotted slightly above the high for visibility
        for col, series in [("peak1_date", peak1_series),
                            ("peak2_date", peak2_series)]:
            d = row[col]
            if d in df_plot.index:
                price = df_plot.loc[d, "High"]
                series.loc[d] = price * 1.01  # 1% above high

        # Trough plotted slightly below the low
        d_trough = row["trough_date"]
        if d_trough in df_plot.index:
            price = df_plot.loc[d_trough, "Low"]
            trough_series.loc[d_trough] = price * 0.99  # 1% below low

        # Confirmation plotted slightly above the close
        d_conf = row["confirm_date"]
        if d_conf in df_plot.index:
            price = df_plot.loc[d_conf, "Close"]
            confirm_series.loc[d_conf] = price * 1.01  # 1% above close

    # Build addplots (big, bold markers so they stand out)
    apds = []

    # Peak 1 markers (bright green ▲ with black outline)
    if not peak1_series.isna().all():
        apds.append(
            mpf.make_addplot(
                peak1_series,
                type="scatter",
                marker="^",
                markersize=200,
                color="lime",
                edgecolor="black",
                linewidths=1.5,
            )
        )

    # Peak 2 markers (dark green ▲)
    if not peak2_series.isna().all():
        apds.append(
            mpf.make_addplot(
                peak2_series,
                type="scatter",
                marker="^",
                markersize=200,
                color="green",
                edgecolor="black",
                linewidths=1.5,
            )
        )

    # Trough markers (large orange ●)
    if not trough_series.isna().all():
        apds.append(
            mpf.make_addplot(
                trough_series,
                type="scatter",
                marker="o",
                markersize=180,
                color="orange",
                edgecolor="black",
                linewidths=1.5,
            )
        )

    # Confirmation markers (huge red ▼ with black outline)
    if not confirm_series.isna().all():
        apds.append(
            mpf.make_addplot(
                confirm_series,
                type="scatter",
                marker="v",
                markersize=240,
                color="red",
                edgecolor="black",
                linewidths=1.5,
            )
        )

    # Final plot call
    mpf.plot(
        df_plot,
        type="candle",
        style="yahoo",
        title=title,
        volume=volume,
        mav=mav,
        addplot=apds,
        figsize=(12, 6),
        tight_layout=True,
        savefig=save_path,
        warn_too_much_data=1000000000
    )


In [140]:
df = pd.read_csv("./sp500/sp500/AAPL.csv", 
                 parse_dates=["Date"], 
                 index_col="Date",
                 dtype={
                    "Open": float,
                    "High": float,
                    "Low": float,
                    "Close": float,
                    "Volume": float
                },
                header = 0,
                skiprows=[1]
    )
plot_candles(df, dt_events, title="AAPL Candles", mav=(20,50), save_path="aapl_chart.png")

                Close       High        Low       Open       Volume
Date                                                               
2015-01-02  24.261051  24.729274  23.821675  24.718178  212818400.0
2015-01-05  23.577574  24.110150  23.391173  24.030263  257142000.0
2015-01-06  23.579798  23.839428  23.218089  23.641931  263188400.0
2015-01-07  23.910431  24.010288  23.677428  23.788382  160423600.0
2015-01-08  24.829128  24.886824  24.121246  24.238858  237458000.0
                     peak1_date                    peak2_date  \
0 1970-01-01 00:00:00.000000120 1970-01-01 00:00:00.000000138   
1 1970-01-01 00:00:00.000000230 1970-01-01 00:00:00.000000249   
2 1970-01-01 00:00:00.000000396 1970-01-01 00:00:00.000000413   
3 1970-01-01 00:00:00.000000396 1970-01-01 00:00:00.000000417   
4 1970-01-01 00:00:00.000001047 1970-01-01 00:00:00.000001070   

                    trough_date  peak1_price  peak2_price  trough_price  \
0 1970-01-01 00:00:00.000000123    37.250332    37.251425 