### Note: This file is adapted from `'Candlestick_CombinewithOtherIndicators_Historical15min'`, modified to work with real-time Binance API.

- To avoid retroactive signal changes, all candlestick pattern and signal logic are now restricted to the latest completed candle only (df.iloc[-2]). This ensures the chart, log, and prediction remain fully synchronized in real-time.

###  To-Do List

1. **Refine Strategy Combinations**
   - Test different combinations of candlestick patterns and indicators.
   - Remove or adjust underperforming signals based on analysis.
   -  Goal: Push live signal accuracy above **55%** (Prof's suggestion).

2. **Enhance Signal Granularity**
   - Current logic outputs: `Bullish` / `Neutral` / `Bearish`.
   - Expand to multi-tier strength:
     - `Strong Bullish` / `Bullish` / `Weak Bullish`
     - `Strong Bearish` / `Bearish` / `Weak Bearish`

3. **Add Live Evaluation Metrics**
   -  `prediction_logger.py` is implemented to track signal performance.
   - It logs:
     - Prediction direction (UP/DOWN)
     - Next candle return
     - Whether the prediction was correct
   - Accuracy rate is displayed live in console.
   - Logs are exportable as `prediction_log.csv`.
   -  Further metrics (e.g., trade-based PnL, Sharpe ratio) can be added later.

4. **We also can Modularize Codebase + Document Each Module**

Split the  Notebook into clean .py modules like below:


| Module               | Responsibility                                   | File                     |
|----------------------|--------------------------------------------------|--------------------------|
| Binance API          | Fetch live K-line data                           | `data_loader.py`         |
| Indicators           | MA, RSI, ATR, MACD, etc.                         | `indicator_calculator.py`|
| Patterns             | TA-Lib / custom pattern detection                | `pattern_detector.py`    |
| Signal Logic         | Combine pattern + indicator into signal          | `signal_generator.py`    |
| Chart Plotting       | Plot with real-time signal arrows                | `plotter.py`             |
| **Signal Logging**   | Persistently store trading signals to CSV        | `signal_logger.py`       |
| **Prediction Logging**   | Track accuracy + export                          | `prediction_logger.py`   |
| Main Execution       | Run live loop and combine modules                | `main_live.py`           |

Currently,  `signal_logger.py` and `prediction_logger.py` are implemented but still need adjustments.  Ex. `signal_logger.py` module can persistently stores bullish and bearish trading signals to a CSV file, ensuring they persist across program restarts. It’s used in `generate_combined_signals` to record signals and in `plot_realtime_signals` to retrieve historical signals for plotting.


In [1]:
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import talib
import os
import numpy as np
from scipy.signal import dfreqresp


In [2]:
from binance.client import Client
# sys.path.append(os.path.abspath(".."))  # root /PycharmProjects/MMAT
from config.load_env import load_keys

keys = load_keys()
#print("Loaded keys:", keys)
client = Client(keys['api_key'], keys['secret_key'])

###  Connect to Live Binance API:

In [3]:
# sys.path.append(os.path.abspath(".."))  # root /PycharmProjects/MMAT
from config.load_env import load_keys

keys = load_keys()
#print("Loaded keys:", keys)
client = Client(keys['api_key'], keys['secret_key'])

In [4]:
import talib
import os
import time
from binance.client import Client
from dotenv import load_dotenv
try:
    from config.load_env import load_keys
except ImportError:
    # Fallback if config.load_env is unavailable
    def load_keys():
        load_dotenv()
        api_key = os.getenv('BINANCE_API_KEY')
        api_secret = os.getenv('BINANCE_SECRET_KEY')
        if api_key and api_secret:
            return {'api_key': api_key, 'secret_key': api_secret}
        raise ValueError("No API keys found. Set BINANCE_API_KEY and BINANCE_SECRET_KEY in environment or .env file.")

def load_data(csv_path):
    try:
        df = pd.read_csv(csv_path, index_col='timestamp', parse_dates=True)
        df = df[['open', 'high', 'low', 'close', 'volume']].copy()
        print(f"Total K-lines loaded: {len(df)}")
        print("First 5 rows:")
        print(df.head())
        return df
    except FileNotFoundError:
        print(f"CSV file '{csv_path}' not found.")
        return None

def fetch_binance_data(api_key, api_secret, symbol='BTCUSDT', interval=Client.KLINE_INTERVAL_5MINUTE, limit=1000):

    try:
        client = Client(api_key, api_secret)
        klines = client.get_klines(symbol=symbol, interval=interval, limit=limit)
        df = pd.DataFrame(klines, columns=[
            'timestamp', 'open', 'high', 'low', 'close', 'volume',
            'close_time', 'quote_asset_volume', 'num_trades',
            'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'
        ])
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.set_index('timestamp', inplace=True)
        df.index = df.index.tz_localize('UTC').tz_convert('Asia/Singapore')  # Set to Singapore Time
        df = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
        print(f"Fetched {len(df)} K-lines from Binance API:")
        print(df.head())
        return df
    except Exception as e:
        print(f"Error fetching Binance data: {e}")
        return None

### [For Historical File Used] Resample  to 15 min (Can try other different Time horizon):

In [5]:
#def resample_to_15min(df):
#    df_15min = df.resample('15min').agg({
#        'open': 'first',
#        'high': 'max',
#        'low': 'min',
#        'close': 'last',
#        'volume': 'sum'
#    }).dropna()
#    print(f"Resampled to 15min, total K-lines: {len(df_15min)}")
#    return df_15min

### Select Candlestick Patterns for futher combinations with other Indicators 

#### (Below are the 15 ones that I selected based on the accuracy rate >33% from testing all Candlestick Patterns based on historical 15min, and grouped into Bullish & Bearish Patterns) :

In [6]:

def calculate_patterns(df):
    """
    Detect selected high-accuracy candlestick patterns ONLY on the latest completed candle.
    """
    patterns = {
        'Hammer': talib.CDLHAMMER,
        'InvertedHammer': talib.CDLINVERTEDHAMMER,
        'BullishEngulfing': lambda o, h, l, c: np.where(talib.CDLENGULFING(o, h, l, c) == 100, 100, 0),
        'PiercingLine': talib.CDLPIERCING,
        'MorningStar': talib.CDLMORNINGSTAR,
        'DragonflyDoji': talib.CDLDRAGONFLYDOJI,
        'LongLine': talib.CDLLONGLINE,
        'HangingMan': talib.CDLHANGINGMAN,
        'ShootingStar': talib.CDLSHOOTINGSTAR,
        'BearishEngulfing': lambda o, h, l, c: np.where(talib.CDLENGULFING(o, h, l, c) == -100, -100, 0),
        'DarkCloudCover': talib.CDLDARKCLOUDCOVER,
        'EveningDojiStar': talib.CDLEVENINGDOJISTAR,
        'EveningStar': talib.CDLEVENINGSTAR,
        'GravestoneDoji': talib.CDLGRAVESTONEDOJI,
        'ThreeLineStrike': talib.CDL3LINESTRIKE,
    }

    # Initialize all pattern columns to 0
    for name in patterns.keys():
        df[name] = 0

    # Only calculate the latest completed candle (iloc[-2])
    i = len(df) - 2
    for name, func in patterns.items():
        result = func(df['open'], df['high'], df['low'], df['close'])
        if isinstance(result, np.ndarray):
            value = result[i]
        else:
            value = result.iloc[i]

        df.at[df.index[i], name] = value


    # Optional: print counts for info
    # for name in patterns.keys():
    #    count = (df[name].abs() > 0).sum()
    #   print(f"{name} detected {count} times.")

    # Debug: print pattern value for latest candle
    for name in patterns.keys():
        print(f"{name} at [-2]: {df[name].iloc[-2]}")

    bullish_patterns = ['Hammer', 'InvertedHammer', 'BullishEngulfing', 'PiercingLine','MorningStar', 'DragonflyDoji', 'LongLine', 'ThreeLineStrike']

    bearish_patterns = ['HangingMan', 'ShootingStar', 'BearishEngulfing', 'DarkCloudCover','EveningDojiStar', 'EveningStar', 'GravestoneDoji','LongLine','ThreeLineStrike']

    return df, patterns, bullish_patterns, bearish_patterns


In [7]:
# Ensure we only use past data for signal generation
# This function assumes all indicator values are based on past prices only

def generate_signals(df, patterns, window=1):
    """
    Generate candlestick pattern signals using only the latest closed candle (index -2).
    Avoids retroactive rewriting of historical signals.
    """
    # Initialize signal and direction columns
    for name in patterns.keys():
        df[f'Signal_{name}'] = 0
        df[f'Direction_{name}'] = 'NONE'

    i = len(df) - 2  # Only detect on the latest completed candle

    for name in patterns.keys():
        value = df[name].iloc[i]

        # Strong bullish
        if name in ['BullishEngulfing', 'ThreeLineStrike'] and value == 100:
            df.loc[df.index[i], f'Signal_{name}'] = 1
            df.loc[df.index[i], f'Direction_{name}'] = 'UP'

        # General bullish
        elif name in ['Hammer', 'InvertedHammer', 'PiercingLine',
                      'MorningStar', 'DragonflyDoji', 'LongLine'] and value > 0:
            df.loc[df.index[i], f'Signal_{name}'] = 1
            df.loc[df.index[i], f'Direction_{name}'] = 'UP'

        # Strong bearish
        elif name in ['BearishEngulfing','ThreeLineStrike'] and value == -100:
            df.loc[df.index[i], f'Signal_{name}'] = -1
            df.loc[df.index[i], f'Direction_{name}'] = 'DOWN'

        # General bearish
        elif name in ['HangingMan', 'ShootingStar', 'DarkCloudCover','EveningDojiStar', 'EveningStar','LongLine'] and value < 0:
            df.loc[df.index[i], f'Signal_{name}'] = -1
            df.loc[df.index[i], f'Direction_{name}'] = 'DOWN'

        # Special case
        elif name == 'GravestoneDoji' and value == 100:
            df.loc[df.index[i], f'Signal_{name}'] = -1
            df.loc[df.index[i], f'Direction_{name}'] = 'DOWN'

    return df





### Import Indicators:
#### (Below are RSI, MA5days, MA20days, Volume, ATR, Mean ATR, MACD --> feel free to adjust the time period or add other indicators):

### Current Candlestick Pattern + Technical Indicator Trigger Logic (Can Adjust the conditions):
A combined signal is only considered valid when a specific candlestick pattern appears and at least one of the following technical indicator conditions is met: MA, RSI, MACD, or ATR.

- **Indicators**:
- MA5, MA20 (moving averages).
- RSI (14-period).
- ATR (14-period), mean ATR (20-period).
- MACD (12, 26, 9).
- Volume (compared to 20-period average).
- **Trigger Logic**:

| Category          | Patterns                            | Indicator Conditions (≥1 required)                     |
|-------------------|-------------------------------------|-------------------------------------------------------|
| Bullish (Strong)  | BullishEngulfing, ThreeLineStrike   | MA5 > MA20, RSI > 50, MACD > Signal, ATR > mean_ATR, High Volume |
| Bullish (Moderate)| Hammer, InvertedHammer, etc.        | Same as above                                         |
| Bearish (Strong)  | BearishEngulfing                    | MA5 < MA20, RSI < 45, MACD < Signal, ATR > mean_ATR, High Volume |
| Bearish (Moderate)| HangingMan, ShootingStar, etc.      | Same as above                                         |
- **Logic**:
- Checks `df.iloc[-2]` for a pattern signal.
- Requires at least one indicator condition to confirm (e.g., `MA5 > MA20`).
- Logs confirmed signals to `signal_logger`.
- Resolves conflicts: Only one signal (bullish or bearish) is logged per candle.
- **Output**: `DataFrame` with `bullish_combined`, `bearish_combined`, and trigger details.



In [8]:
import numpy as np
import talib

# High Volume condition: current volume > threshold × average of past volumes
def is_high_volume(df, i, lookback=20, threshold=1.5):
    if i < lookback:
        return False
    avg_vol = df['volume'].iloc[i - lookback:i].mean()
    return df['volume'].iloc[i] > threshold * avg_vol

# Combine candlestick patterns with indicators to generate signals
def generate_combined_signals(df, patterns, signal_logger=None):
    """
    Generate combined signals based on candlestick patterns and technical indicators.
    Logs signals to signal_logger if provided, ensuring no duplicates.

    Args:
        df (pd.DataFrame): DataFrame with price data and candlestick patterns.
        patterns (dict): Dictionary of candlestick pattern functions.
        signal_logger (SignalHistoryLogger, optional): Logger for saving signals to CSV.
    """
    # Define pattern categories
    bullish_patterns_strong = ['BullishEngulfing', 'ThreeLineStrike']
    bullish_patterns = ['Hammer', 'InvertedHammer', 'PiercingLine', 'MorningStar', 'DragonflyDoji', 'LongLine']
    bearish_patterns_strong = ['BearishEngulfing']
    bearish_patterns = ['HangingMan', 'ShootingStar', 'DarkCloudCover', 'EveningDojiStar', 'EveningStar', 'GravestoneDoji', 'LongLine']

    # Initialize signal columns
    df['bullish_combined'] = 0
    df['bearish_combined'] = 0
    df['bullish_direction'] = 'NONE'
    df['bearish_direction'] = 'NONE'
    df['bullish_trigger'] = ''
    df['bearish_trigger'] = ''

    # Calculate technical indicators
    df['MA5'] = talib.SMA(df['close'], 5)
    df['MA20'] = talib.SMA(df['close'], 20)
    df['RSI'] = talib.RSI(df['close'], 14)
    df['ATR'] = talib.ATR(df['high'], df['low'], df['close'], 14)
    df['mean_ATR'] = df['ATR'].rolling(window=20).mean()
    df['MACD'], df['MACD_Signal'], _ = talib.MACD(df['close'], 12, 26, 9)

    # Generate binary pattern signal columns
    for name in patterns.keys():
        if name == 'GravestoneDoji':
            df[f'Signal_{name}'] = df[name].apply(lambda x: -1 if x == 100 else 0)
        elif name in bullish_patterns_strong:
            df[f'Signal_{name}'] = df[name].apply(lambda x: 1 if x == 100 else 0)
        elif name in bearish_patterns_strong:
            df[f'Signal_{name}'] = df[name].apply(lambda x: -1 if x == -100 else 0)
        else:
            df[f'Signal_{name}'] = df[name].apply(lambda x: 1 if x > 0 else -1 if x < 0 else 0)

    # Evaluate only the latest completed candle
    i = len(df) - 2
    volume_condition = is_high_volume(df, i)
    ma_condition_bull = df['MA5'].iloc[i] > df['MA20'].iloc[i]
    ma_condition_bear = df['MA5'].iloc[i] < df['MA20'].iloc[i]
    rsi_condition_bull = df['RSI'].iloc[i] > 50
    rsi_condition_bear = df['RSI'].iloc[i] < 45
    macd_bull = df['MACD'].iloc[i] > df['MACD_Signal'].iloc[i]
    macd_bear = df['MACD'].iloc[i] < df['MACD_Signal'].iloc[i]
    atr_condition = df['ATR'].iloc[i] > df['mean_ATR'].iloc[i]

    # Check for existing signals at this timestamp to avoid duplicates
    existing_signals = []
    if signal_logger:
        history = signal_logger.get_history()
        if not history.empty:
            existing_signals = history[history['timestamp'] == df.index[i]]['type'].tolist()

    # Bullish signal logic
    for pattern in bullish_patterns + bullish_patterns_strong:
        if df[f'Signal_{pattern}'].iloc[i] == 1:
            trigger_parts = [pattern]
            if ma_condition_bull:
                trigger_parts.append('MA5>MA20')
            if rsi_condition_bull:
                trigger_parts.append('RSI>50')
            if macd_bull:
                trigger_parts.append('MACD_Bullish')
            if atr_condition:
                trigger_parts.append('HighVolatility')
            if volume_condition:
                trigger_parts.append('HighVolume')
            if len(trigger_parts) >= 2:  # Pattern + at least one indicator
                df.loc[df.index[i], 'bullish_combined'] = 1
                df.loc[df.index[i], 'bullish_direction'] = 'UP'
                trigger_text = ' + '.join(trigger_parts)
                df.loc[df.index[i], 'bullish_trigger'] = trigger_text
                if signal_logger and 'bullish' not in existing_signals:
                    signal_logger.add_signal('bullish', df.index[i], df['close'].iloc[i], trigger=trigger_text)

    # Bearish signal logic
    for pattern in bearish_patterns + bearish_patterns_strong:
        if df[f'Signal_{pattern}'].iloc[i] == -1:
            trigger_parts = [pattern]
            if ma_condition_bear:
                trigger_parts.append('MA5<MA20')
            if rsi_condition_bear:
                trigger_parts.append('RSI<45')
            if macd_bear:
                trigger_parts.append('MACD_Bearish')
            if atr_condition:
                trigger_parts.append('HighVolatility')
            if volume_condition:
                trigger_parts.append('HighVolume')
            if len(trigger_parts) >= 2:
                df.loc[df.index[i], 'bearish_combined'] = -1
                df.loc[df.index[i], 'bearish_direction'] = 'DOWN'
                trigger_text = ' + '.join(trigger_parts)
                df.loc[df.index[i], 'bearish_trigger'] = trigger_text
                if signal_logger and 'bearish' not in existing_signals:
                    signal_logger.add_signal('bearish', df.index[i], df['close'].iloc[i], trigger=trigger_text)

    # Debug summary
    print(f"Bullish Signal at [-2]: {df['bullish_combined'].iloc[-2]}")
    print(f"Bearish Signal at [-2]: {df['bearish_combined'].iloc[-2]}")
    print(df[df['bullish_combined'] == 1][['close', 'bullish_trigger']].tail(5))
    print(df[df['bearish_combined'] == -1][['close', 'bearish_trigger']].tail(5))

    return df

In [9]:
signal_history=[]

### Evaluation: (Currently use Accuracy rates)

In [10]:
def evaluate_patterns(df, patterns, window=1, threshold=0.0005):
    """
    Evaluate the accuracy of each candlestick pattern signal.

    This function measures by calculating the forward return after each signal and comparing it against a defined threshold.

    Parameters:
    
    (1) window :  default=1
        Holding period in bars/candles to compute future returns (e.g., 1 bar ahead).

    (2) threshold : default=0.0005
        Minimum return required for a signal to be considered successful (e.g., 0.05%).

    """
    results = {}

    #  Compute future return
    df['next_close'] = df['close'].shift(-window)
    df['return'] = (df['next_close'] - df['close']) / df['close']

    # Evaluate each raw candlestick pattern signal
    for name in list(patterns.keys()):
        signal_col = f'Signal_{name}'
        signals = df[df[signal_col] != 0]
        total_signals = len(signals)

        if total_signals == 0:
            results[name] = {'accuracy': 0, 'total_signals': 0, 'correct_signals': 0}
            continue

        correct_signals = len(signals[
            ((signals[signal_col] == 1) & (df.loc[signals.index, 'return'] >= threshold)) |
            ((signals[signal_col] == -1) & (df.loc[signals.index, 'return'] <= -threshold))
        ])
        accuracy = correct_signals / total_signals * 100

        results[name] = {
            'accuracy': accuracy,
            'total_signals': total_signals,
            'correct_signals': correct_signals
        }

    #  Evaluate combined signals (pattern + indicator)
    for signal_col in ['bullish_combined', 'bearish_combined']:
        signals = df[df[signal_col] != 0]
        total_signals = len(signals)

        if total_signals == 0:
            results[signal_col] = {'accuracy': 0, 'total_signals': 0, 'correct_signals': 0}
            continue

        correct_signals = len(signals[
            ((signals[signal_col] == 1) & (df.loc[signals.index, 'return'] >= threshold)) |
            ((signals[signal_col] == -1) & (df.loc[signals.index, 'return'] <= -threshold))
        ])
        accuracy = correct_signals / total_signals * 100

        results[signal_col] = {
            'accuracy': accuracy,
            'total_signals': total_signals,
            'correct_signals': correct_signals
        }

    return results


### Plot Realtime result:

In [11]:
import os
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from signal_logger import SignalHistoryLogger

def plot_realtime_signals(df, symbol='BTCUSDT', data_range=50, output_dir='/Users/wynn/PycharmProjects/MMAT/plots/', signal_logger=None):
    if signal_logger is None:
        raise ValueError("signal_logger must be provided")

    df_plot = df.iloc[-data_range:].copy()

    # Latest forming candle hover text
    df_plot['hover_text'] = np.where(
        df_plot.index == df.index[-1],
        ' Latest forming candle (not evaluated)',
        ''
    )

    # Get real-time signal markers from current df
    up_signals = df_plot[df_plot['bullish_combined'] == 1]
    down_signals = df_plot[df_plot['bearish_combined'] == -1]

    print(f"Plotting Real-Time: {len(up_signals)} Bullish UP signals, {len(down_signals)} Bearish DOWN signals in last {data_range} K-lines")

    fig = make_subplots(
        rows=4, cols=1,
        shared_xaxes=True,
        vertical_spacing=0.1,
        subplot_titles=['Candlestick + MA', 'RSI', 'ATR', 'Volume'],
        row_heights=[0.4, 0.2, 0.2, 0.2]
    )

    # === Candlestick + MAs ===
    fig.add_trace(go.Candlestick(
        x=df_plot.index,
        open=df_plot['open'],
        high=df_plot['high'],
        low=df_plot['low'],
        close=df_plot['close'],
        name='Candlestick',
        increasing_line_color='green',
        decreasing_line_color='red'
    ), row=1, col=1)

    fig.add_trace(go.Scatter(
        x=df_plot.index, y=df_plot['MA5'], mode='lines', name='5 MA', line=dict(color='blue')
    ), row=1, col=1)
    fig.add_trace(go.Scatter(
        x=df_plot.index, y=df_plot['MA20'], mode='lines', name='20 MA', line=dict(color='purple')
    ), row=1, col=1)

    # === Real-time signal markers (this cycle only) ===
    fig.add_trace(go.Scatter(
        x=up_signals.index,
        y=up_signals['close'] * 1.005,
        mode='markers',
        marker=dict(symbol='triangle-up', color='green', size=10),
        name='Bullish Signal',
        text=up_signals['bullish_trigger'],
        hoverinfo='text+x+y'
    ), row=1, col=1)

    fig.add_trace(go.Scatter(
        x=down_signals.index,
        y=down_signals['close'] * 0.995,
        mode='markers',
        marker=dict(symbol='triangle-down', color='red', size=10),
        name='Bearish Signal',
        text=down_signals['bearish_trigger'],
        hoverinfo='text+x+y'
    ), row=1, col=1)

    # === Forming candle marker ===
    forming_candle = df_plot[df_plot.index == df.index[-1]]
    if not forming_candle.empty:
        fig.add_trace(go.Scatter(
            x=forming_candle.index,
            y=forming_candle['close'] * 1.002,
            mode='markers',
            name='Forming Candle',
            marker=dict(symbol='circle', color='gray', size=8, opacity=0.3),
            text=forming_candle['hover_text'],
            hoverinfo='text+x+y',
            hoverlabel=dict(bgcolor='lightgray'),
            showlegend=False
        ), row=1, col=1)

    # Historical signal markers from CSV
    signal_df = signal_logger.get_history()
    print(f"signal_df columns: {signal_df.columns}")  # Debug print
    if not signal_df.empty:
        signal_df = signal_df[signal_df['timestamp'] >= df_plot.index[0]]
        signal_df['timestamp'] = pd.to_datetime(signal_df['timestamp'])
        signal_df['timestamp'] = signal_df['timestamp'].dt.tz_convert(df_plot.index.tz)
        signal_df = signal_df.drop_duplicates(subset=['timestamp', 'type'])

        for _, row in signal_df.iterrows():
            if row['timestamp'] in df_plot.index:
                # Check if 'trigger' column exists; if not, use a default message
                trigger_text = (
                    f"{row['type'].capitalize()} Signal: {row['trigger']}"
                    if 'trigger' in signal_df.columns and pd.notna(row['trigger']) and row['trigger']
                    else f"{row['type'].capitalize()} Signal"
                )
                fig.add_trace(go.Scatter(
                    x=[row['timestamp']],
                    y=[row['price'] * (1.005 if row['type'] == 'bullish' else 0.995)],
                    mode='markers',
                    marker=dict(
                        symbol='triangle-up' if row['type'] == 'bullish' else 'triangle-down',
                        color='green' if row['type'] == 'bullish' else 'red',
                        size=12
                    ),
                    name='',
                    text=[trigger_text],
                    hoverinfo='text+x+y',
                    showlegend=False
                ), row=1, col=1)

    # === RSI ===
    fig.add_trace(go.Scatter(
        x=df_plot.index,
        y=df_plot['RSI'],
        mode='lines',
        name='RSI',
        line=dict(color='blue')
    ), row=2, col=1)
    fig.add_hline(y=50, line_dash='dash', line_color='black', row=2, col=1)

    # === ATR ===
    fig.add_trace(go.Scatter(
        x=df_plot.index,
        y=df_plot['ATR'],
        mode='lines',
        name='ATR',
        line=dict(color='orange')
    ), row=3, col=1)
    fig.add_trace(go.Scatter(
        x=df_plot.index,
        y=df_plot['mean_ATR'] * 1.2,
        mode='lines',
        name='1.2 * mean_ATR',
        line=dict(color='red', dash='dash')
    ), row=3, col=1)

    # === Volume ===
    fig.add_trace(go.Bar(
        x=df_plot.index,
        y=df_plot['volume'],
        name='Volume',
        marker_color='blue'
    ), row=4, col=1)
    fig.add_trace(go.Scatter(
        x=df_plot.index,
        y=df_plot['Volume_MA20'] * 1.3,
        mode='lines',
        name='1.3 * Volume_MA20',
        line=dict(color='red', dash='dash')
    ), row=4, col=1)

    fig.update_layout(
        title=f'[Testing] Real-Time 5 Min Signals for {symbol}',
        xaxis_title='Time',
        yaxis_title='Price ($)',
        xaxis_rangeslider_visible=False,
        showlegend=True,
        height=800,
        template='plotly_white'
    )

    # Save to a fixed HTML file
    os.makedirs(output_dir, exist_ok=True)
    html_path = os.path.join(output_dir, f'realtime_signals_{symbol}.html')
    try:
        # Add meta-refresh tag to auto-refresh every 300 seconds (5 minutes)
        html_content = fig.to_html(include_plotlyjs='cdn')
        html_content = html_content.replace(
            '<head>',
            '<head><meta http-equiv=\"refresh\" content=\"300\">'
        )
        with open(html_path, 'w') as f:
            f.write(html_content)
        print(f" Updated real-time plot: {html_path} (auto-refreshes every 5 minutes)")
    except Exception as e:
        print(f"Error saving real-time plot: {e}")

### Run Realtime Data:

In [12]:
import atexit
from prediction_logger import PredictionLogger
from signal_logger import SignalHistoryLogger
import time
import talib
from binance.client import Client

# Initialize loggers
logger = PredictionLogger()
signal_logger = SignalHistoryLogger(filename='/Users/wynn/PycharmProjects/MMAT/signal_history.csv')

# Register atexit handlers to save logs on program exit
atexit.register(lambda: logger.save_to_csv("TestLive_prediction_log.csv"))
atexit.register(lambda: signal_logger.save_to_csv("signal_history.csv"))

def run_realtime_signals(api_key, api_secret, symbol='BTCUSDT',
                        interval=Client.KLINE_INTERVAL_5MINUTE,
                        limit=1000, sleep_seconds=300, signal_logger=None,
                        prediction_logger=None, debug=False):
    """
    Run real-time signal generation for the specified symbol using Binance API.
    Fetches 5-minute K-line data, calculates candlestick patterns and indicators,
    generates signals, and plots results. Logs signals and predictions to CSV.

    Args:
        api_key (str): Binance API key.
        api_secret (str): Binance API secret.
        symbol (str): Trading pair (default: 'BTCUSDT').
        interval (str): K-line interval (default: 5-minute).
        limit (int): Number of K-lines to fetch (default: 100).
        sleep_seconds (int): Seconds to wait between updates (default: 300).
        signal_logger (SignalHistoryLogger): Logger for saving signals to CSV.
        prediction_logger (PredictionLogger): Logger for tracking prediction accuracy.
        debug (bool): If True, print detailed debugging information (default: False).
    """
    if signal_logger is None or prediction_logger is None:
        raise ValueError("signal_logger and prediction_logger must be provided")

    print("Starting real-time signal generation. Press Ctrl+C to stop.")

    while True:
        try:
            # Fetch latest K-line data from Binance
            df = fetch_binance_data(api_key, api_secret, symbol, interval, limit)
            if df is None or df.empty:
                print("Failed to fetch data. Retrying in 60 seconds.")
                time.sleep(60)
                continue

            # Calculate technical indicators
            df['MA5'] = talib.SMA(df['close'], timeperiod=5)
            df['MA20'] = talib.SMA(df['close'], timeperiod=20)
            df['RSI'] = talib.RSI(df['close'], timeperiod=14)
            df['ATR'] = talib.ATR(df['high'], df['low'], df['close'], timeperiod=14)
            df['mean_ATR'] = df['ATR'].rolling(window=20).mean()
            df['MACD'], df['MACD_Signal'], _ = talib.MACD(df['close'], fastperiod=12, slowperiod=26, signalperiod=9)
            df['Volume_MA20'] = talib.SMA(df['volume'], timeperiod=20)

            # Detect candlestick patterns and generate signals
            df, patterns, bullish_patterns, bearish_patterns = calculate_patterns(df)
            df = generate_signals(df, patterns)
            df = generate_combined_signals(df, patterns, signal_logger=signal_logger)

            # Analyze the latest completed candle (df.iloc[-2])
            latest_ts = df.index[-2]
            latest_signal = df.iloc[-2][[f'Signal_{name}' for name in patterns.keys()]]

            # Print signal summary
            print(f"\n[{latest_ts}] Signal Summary:")
            triggered_patterns = []
            for name in patterns.keys():
                signal = latest_signal[f'Signal_{name}']
                direction = 'UP' if signal == 1 else 'DOWN' if signal == -1 else None
                if direction:
                    triggered_patterns.append(f"{name}: {direction}")
            if triggered_patterns:
                print("\n".join(triggered_patterns))
            else:
                print("No candlestick patterns detected.")

            # Combined signal and prediction
            bullish_signal = df['bullish_combined'].iloc[-2]
            bearish_signal = df['bearish_combined'].iloc[-2]
            bullish_trigger = df['bullish_trigger'].iloc[-2]
            bearish_trigger = df['bearish_trigger'].iloc[-2]

            # Determine prediction for the next 5 minutes
            if bullish_signal == 1 and bearish_signal != -1:
                prediction = "UP"
                print(f"\nCombined Signal: BULLISH (Trigger: {bullish_trigger})")
            elif bearish_signal == -1 and bullish_signal != 1:
                prediction = "DOWN"
                print(f"\nCombined Signal: BEARISH (Trigger: {bearish_trigger})")
            else:
                prediction = "NEUTRAL"
                print("\nCombined Signal: NEUTRAL")

            print(f"Prediction for next 5min: {prediction}")

            # Log signals to CSV
            if bullish_signal == 1:
                signal_logger.add_signal('bullish', df.index[-2], df['close'].iloc[-2])
                print(f"[Saved] Bullish @ {df.index[-2]}")
            if bearish_signal == -1:
                signal_logger.add_signal('bearish', df.index[-2], df['close'].iloc[-2])
                print(f"[Saved] Bearish @ {df.index[-2]}")

            # Plot the chart
            plot_realtime_signals(df, symbol, data_range=50, signal_logger=signal_logger)

            # Debug output
            if debug:
                print("\n--- Debug: Technical Indicator Conditions ---")
                print(f"MA5 > MA20: {df['MA5'].iloc[-2] > df['MA20'].iloc[-2]}")
                print(f"RSI: {df['RSI'].iloc[-2]:.2f}")
                print(f"MACD: {df['MACD'].iloc[-2]:.2f} | Signal: {df['MACD_Signal'].iloc[-2]:.2f}")
                print(f"ATR > mean_ATR: {df['ATR'].iloc[-2] > df['mean_ATR'].iloc[-2]}")
                print(f"Volume: {df['volume'].iloc[-2]:.2f} | Volume_MA20: {df['Volume_MA20'].iloc[-2]:.2f}")

            # Log prediction
            if len(df) >= 2:
                close_now = df['close'].iloc[-1]
                close_prev = df['close'].iloc[-2]
                ts = df.index[-1]
                prediction_logger.record_prediction(ts, prediction, close_now, close_prev)
                print(f"Current Hit Rate: {prediction_logger.get_hit_rate():.2%}")

            print(f"Waiting {sleep_seconds} seconds for next update...")
            time.sleep(sleep_seconds)

        except KeyboardInterrupt:
            print("\nStopped real-time signal generation. Exiting...")
            break
        except Exception as e:
            print(f"Error in real-time loop: {e}. Retrying in 60 seconds.")
            time.sleep(60)

###  Live Signal Overview

| Module / Function                  | Scope                             | Role / Logic Description                                          |
|------------------------------------|-----------------------------------|-------------------------------------------------------------------|
| `fetch_binance_data()`             | Every 5 minutes                   | Fetches Binance 5min K-line data                                 |
| `df.index = tz_convert()`          | All timestamps                    | Converts timestamp to local timezone (Asia/Singapore)            |
| `generate_signals()`               | Only `df.iloc[-2]`                | Detects candlestick patterns (e.g., Hammer, Engulfing) on latest completed candle |
| `generate_combined_signals()`      | Only `df.iloc[-2]`                | Confirms signal with MA, RSI, MACD, ATR, Volume, etc.            |
| `df.iloc[-2]`                      | **Latest completed candle**       | Used as the basis for signal & prediction                        |
| `Prediction for next 5min:`        | One per cycle                     | Output: BULLISH / BEARISH / NEUTRAL based on confirmed signal    |
| `plot_realtime_signals()`          | **Last 50 candles (`df_plot`)**   | Visualizes candles and markers (signal arrows)                   |
| `bullish_signals / bearish_signals` | Within `df_plot`                  | Shows signal markers (green/red triangles)                       |
| `while True + time.sleep(300)`     | Entire loop                       | Runs full signal detection cycle every 5 minutes                 |

---

###  Plot Logic Summary

- The chart updates **every 5 minutes**
- Pattern detection (`generate_signals`) + indicator confirmation (`generate_combined_signals`) are applied only to the **latest completed candle** (`df.iloc[-2]`)
- **Prediction is for the next 5 minutes**, based on that completed candle
- **The chart shows the most recent 50 candles**, including signal markers
- Even if **no signal is triggered**, the chart still updates each cycle

---

###  Signal vs Plot:

| Purpose                  | Scope                   | Based On                        |
|--------------------------|--------------------------|----------------------------------|
| **Next 5min Prediction** | Latest completed candle   | `df.iloc[-2]` (combined logic)   |
| **Chart Plot**           | Last 50 candles           | Any past candles with triggered signals |

---
###  Candle Index Reference

| Code           | Candle Represented          | Description                                                               |
|----------------|-----------------------------|---------------------------------------------------------------------------|
| `df.iloc[-2]`  | Last **completed** candle   |  Used for signal generation: pattern detection + indicator confirmation  |
| `df.iloc[-1]`  | Current **forming** candle  |  Not analyzed (to avoid lookahead bias), but shown visually on the chart |
---

### Def Main:

In [None]:
import atexit
from prediction_logger import PredictionLogger
from signal_logger import SignalHistoryLogger
from binance.client import Client
import pandas as pd
import talib
import os

def main(realtime=True, debug=False):
    """
    Main function to run candlestick pattern analysis and signal generation.
    Supports both real-time data from Binance API and historical data from CSV.
    Generates signals, evaluates accuracy, and plots results.

    Args:
        realtime (bool): If True, use real-time Binance API data; else, use CSV data (default: True).
        debug (bool): If True, print detailed debugging information (default: False).
    """
    # Configuration
    use_api = realtime
    symbol = 'BTCUSDT'
    csv_path = '/Users/wynn/PycharmProjects/MMAT/data/btc_1min.csv'
    signal_log_path = '/Users/wynn/PycharmProjects/MMAT/signal_history.csv'

    # Initialize loggers
    signal_logger = SignalHistoryLogger(filename=signal_log_path)
    prediction_logger = PredictionLogger()

    # Register atexit handlers to save logs on program exit
    atexit.register(lambda: prediction_logger.save_to_csv('/Users/wynn/PycharmProjects/MMAT/TestLive_prediction_log.csv'))
    atexit.register(lambda: signal_logger.save_to_csv(signal_log_path))

    if use_api:
        try:
            # Load Binance API keys
            keys = load_keys()
            api_key = keys['api_key']
            api_secret = keys['secret_key']
        except Exception as e:
            print(f"Failed to load API keys: {e}")
            return

        if realtime:
            # Run real-time signal generation
            run_realtime_signals(api_key, api_secret, symbol=symbol, signal_logger=signal_logger,
                                prediction_logger=prediction_logger, debug=debug)
            return

        # Fetch historical data from Binance API
        df = fetch_binance_data(api_key, api_secret, symbol=symbol)
        if df is None or df.empty:
            print("Failed to fetch Binance data, exiting.")
            return
    else:
        # Load historical data from CSV
        try:
            df = load_data(csv_path)
            if df is None or df.empty:
                print("Failed to load CSV data, exiting.")
                return
            # Resample to 15-minute intervals (uncomment if needed)
            # df = resample_to_15min(df)
        except Exception as e:
            print(f"Error loading CSV data: {e}")
            return

    # Calculate candlestick patterns and generate signals
    df, patterns, bullish_patterns, bearish_patterns = calculate_patterns(df)
    df = generate_signals(df, patterns)
    df = generate_combined_signals(df, patterns, signal_logger=signal_logger)  # Pass signal_logger

    # Evaluate pattern accuracy
    accuracy_results = evaluate_patterns(df, patterns)
    print("\n--- Pattern Signal Accuracy ---")
    for name, metrics in sorted(accuracy_results.items(), key=lambda x: x[1]['accuracy'], reverse=True):
        print(f"{name} - Accuracy: {metrics['accuracy']:.2f}%, Total: {metrics['total_signals']}, "
              f"Correct: {metrics['correct_signals']}")

    # Plot results
    plot_realtime_signals(df, symbol, data_range=50, signal_logger=signal_logger)

if __name__ == "__main__":
    main(realtime=True, debug=False)

Starting real-time signal generation. Press Ctrl+C to stop.
Fetched 1000 K-lines from Binance API:
                                open       high        low      close  \
timestamp                                                               
2025-05-30 03:25:00+08:00  105885.08  105978.52  105856.44  105965.57   
2025-05-30 03:30:00+08:00  105965.57  106090.45  105880.80  105969.84   
2025-05-30 03:35:00+08:00  105969.84  106000.00  105891.38  105925.99   
2025-05-30 03:40:00+08:00  105925.99  105950.15  105703.54  105766.81   
2025-05-30 03:45:00+08:00  105766.82  105824.01  105725.03  105824.00   

                              volume  
timestamp                             
2025-05-30 03:25:00+08:00   69.56168  
2025-05-30 03:30:00+08:00  107.07776  
2025-05-30 03:35:00+08:00   46.09896  
2025-05-30 03:40:00+08:00   70.60048  
2025-05-30 03:45:00+08:00   79.94452  
Hammer at [-2]: 0
InvertedHammer at [-2]: 0
BullishEngulfing at [-2]: 0
PiercingLine at [-2]: 0
MorningStar at [-2]:

### Key Components
| Module / Function              | Role                                                                 |
|--------------------------------|----------------------------------------------------------------------|
| `fetch_binance_data`           | Fetches 5-minute K-line data from Binance API.                       |
| `calculate_patterns`           | Detects 15 candlestick patterns on the latest completed candle.      |
| `generate_signals`             | Generates raw pattern signals (UP/DOWN/NONE).                        |
| `generate_combined_signals`    | Confirms signals with indicators (e.g., MA, RSI).                    |
| `evaluate_patterns`            | Evaluates historical signal accuracy (used in non-real-time mode).   |
| `plot_realtime_signals`        | Plots candlestick chart with real-time and historical signals.       |
| `run_realtime_signals`         | Runs the 5-minute real-time loop for data fetching, signals, and plotting. |
| `main`                         | Orchestrates real-time or historical analysis.                      |
| `SignalHistoryLogger`          | Logs signals to `signal_history.csv`.                               |
| `PredictionLogger`             | Logs predictions and accuracy to `TestLive_prediction_log.csv`.     |

---

#### `SignalHistoryLogger`
- **Purpose**: Logs confirmed bullish/bearish signals to `signal_history.csv`.
- **Structure**:

| Column    | Description                     |
|-----------|---------------------------------|
| timestamp | Signal timestamp (Asia/Singapore) |
| type      | `bullish` or `bearish`          |
| price     | Close price at signal           |
- **Logic**:
- Signals are logged in `generate_combined_signals` when `bullish_combined = 1` or `bearish_combined = -1`.
- Appends to CSV in real-time via `add_signal`.
- Saved on exit via `atexit` handler.


####  Grey Hover Circle Signal on the Latest Forming Candle:

In our real-time signal detection logic, **all pattern and indicator signals are only calculated on the most recently closed candle**, not on the most recent (still-forming) candle.

####  System Design Logic

- We only analyze `df.iloc[-2]`, which represents the **last completed candle**.
- The latest candle (grey hover) on the chart (i.e., `df.iloc[-1]`) is still in progress and not yet confirmed, so we do **not** use it for signal detection.
- This design prevents premature signals and ensures we only use finalized market data.

####  Example Timeline

| Timestamp   | Candle Role                     | Analyzed? | Can Trigger Signal? |
|-------------|----------------------------------|---------|--------------------|
| 23:45       | Most recent **completed** candle (`df.iloc[-2]`) |  Yes  |  Yes             |
| 00:00       | Currently **forming** candle (`df.iloc[-1]`)     |  No   |  No              |

####  Visual Clarification

The rightmost candle on the chart might visually appear to match a candlestick pattern (e.g., Hammer, Engulfing), but **our system will not analyze it until it is completed**. That’s why no triangle marker appears on it immediately after an update.

---

####  Summary

Signals only appear **after** a candle has fully formed and passed all pattern and indicator checks. This avoids any form of look-ahead bias and aligns with proper real-time trading system standards.
