## <u>**1 - Importing Test Data & Testing** </u>

This section focuses on the initial setup, data acquisition, and the core logic for generating trading signals based on candlestick reversal patterns. It covers:

-   **Data Download**: Downloads 15-minute candlestick data for the EUR/USD forex pair using Yahoo Finance.
-   **Pattern Detection Logic (`signal_generator`)**: Defines a Python function to identify specific candlestick reversal patterns. The patterns currently implemented are:
    -   **Bullish Engulfing**: A strong reversal pattern indicating a potential price increase (Buy Signal).
    -   **Bearish Engulfing**: A strong reversal pattern indicating a potential price decrease (Sell Signal).
    -   **Morning Star**: A three-candle bullish reversal pattern (Buy Signal).
    -   **Three White Soldiers**: A strong bullish reversal/continuation pattern over four candles (Buy Signal).
    -   **Doji**: A neutral candle indicating indecision (No Pattern/No Trade).
-   **Signal Generation**: Applies the `signal_generator` function across the downloaded dataset to create a new `signal` column.
    -   `0` = No pattern detected
    -   `1` = Buy signal
    -   `2` = Sell signal
-   **Output Utilization**: The generated `signal` column can be used for backtesting trading strategies or as an input for an automated trading system.


In [None]:
import yfinance as yf
import pandas as pd

dataF = yf.download("EURUSD=X", start="2025-7-15", end="2025-7-23", interval='5m')
dataF.iloc[:, :]

  dataF = yf.download("EURUSD=X", start="2025-7-15", end="2025-7-23", interval='5m')
[*********************100%***********************]  1 of 1 completed


Price,Close,High,Low,Open,Volume
Ticker,EURUSD=X,EURUSD=X,EURUSD=X,EURUSD=X,EURUSD=X
Datetime,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2
2025-07-14 23:00:00+00:00,1.166861,1.166997,1.166861,1.166861,0
2025-07-14 23:05:00+00:00,1.166861,1.166861,1.166725,1.166725,0
2025-07-14 23:10:00+00:00,1.166725,1.166861,1.166725,1.166861,0
2025-07-14 23:15:00+00:00,1.166589,1.166725,1.166589,1.166725,0
2025-07-14 23:20:00+00:00,1.166453,1.166589,1.166453,1.166589,0
...,...,...,...,...,...
2025-07-22 22:35:00+00:00,1.175226,1.175226,1.175088,1.175088,0
2025-07-22 22:40:00+00:00,1.175364,1.175364,1.175226,1.175226,0
2025-07-22 22:45:00+00:00,1.175364,1.175364,1.175226,1.175364,0
2025-07-22 22:50:00+00:00,1.175226,1.175364,1.175226,1.175226,0


In [None]:
import logging

# Logging for heartbeat
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("bot_heartbeat.log"), # Log to a file
    ]
)

In [None]:
def signal_generator(df):

    # Safely extract candle data, initializing older candles to None if not available
    open_c = df['Open'].iloc[-1]
    close_c = df['Close'].iloc[-1]
    high_c = df['High'].iloc[-1]
    low_c = df['Low'].iloc[-1]
    
    open_p = df['Open'].iloc[-2]
    close_p = df['Close'].iloc[-2]
    high_p = df['High'].iloc[-2]
    low_p = df['Low'].iloc[-2]

    open_pp = None # previous previous candle
    close_pp = None
    high_pp = None
    low_pp = None

    open_ppp = None # previous previous previous candle
    close_ppp = None
    high_ppp = None
    low_ppp = None

    # Populate older candle data only if enough rows exist
    if len(df) >= 3:
        open_pp = df['Open'].iloc[-3]
        close_pp = df['Close'].iloc[-3]
        high_pp = df['High'].iloc[-3]
        low_pp = df['Low'].iloc[-3]
    if len(df) >= 4:
        open_ppp = df['Open'].iloc[-4]
        close_ppp = df['Close'].iloc[-4]
        high_ppp = df['High'].iloc[-4]
        low_ppp = df['Low'].iloc[-4]


    # --- Bullish Engulfing Pattern (Buy Signal: 1) ---
    # Current candle is bullish AND its body completely engulfs the previous bearish candle's body
    if (close_c > open_c and        # Current candle is bullish
        close_p < open_p and        # Previous candle was bearish
        close_c > open_p and        # Current close is higher than previous open (engulfs top)
        open_c < close_p):          # Current open is lower than previous close (engulfs bottom)
        logging.debug(f"Bullish Engulfing detected: C({open_c:.5f},{close_c:.5f}) P({open_p:.5f},{close_p:.5f})")
        return 1 # Buy Signal

    # --- Bearish Engulfing Pattern (Sell Signal: 2) ---
    # Current candle is bearish AND its body completely engulfs the previous bullish candle's body
    elif (close_c < open_c and       # Current candle is bearish
          close_p > open_p and       # Previous candle was bullish
          open_c > close_p and       # Current open is higher than previous close (engulfs top)
          close_c < open_p):         # Current close is lower than previous open (engulfs bottom)
        logging.debug(f"Bearish Engulfing detected: C({open_c:.5f},{close_c:.5f}) P({open_p:.5f},{close_p:.5f})")
        return 2 

    # --- Morning Star Pattern (Bullish Reversal: Buy Signal: 1) ---
    # Requires 3 candles:
    # 1. Long bearish candle (previous_previous_candle)
    # 2. Small-bodied candle (could be Doji, bullish or bearish) that gaps down or is below 1st's body
    # 3. Long bullish candle (current_candle) that gaps up or is above 2nd's body and closes well into the body of the first candle
    elif (open_pp is not None and                                # Ensure 3rd candle (index -3) exists
          close_pp < open_pp and                                 # 1st candle (ppp) is bearish
          abs(open_p - close_p) < (high_p - low_p) * 0.5 and    # 2nd candle (p) has small body (e.g., Doji or spinning top)
          max(close_p, open_p) < open_pp and                     # 2nd candle's body is below 1st candle's open (gaps down)
          close_c > open_c and                                  # 3rd candle (c) is bullish
          open_c > max(close_p, open_p) and                     # 3rd candle opens above 2nd candle's body (gaps up)
          close_c > (open_pp + close_pp) / 2):                  # 3rd candle closes more than halfway into 1st's body
        logging.debug(f"Morning Star detected: C({open_c:.5f},{close_c:.5f}) P({open_p:.5f},{close_p:.5f}) PP({open_pp:.5f},{close_pp:.5f})")
        return 1

    # --- Three White Soldiers Pattern (Bullish Reversal/Continuation: Buy Signal: 1) ---
    # Requires 4 candles:
    # 1. Long bearish candle (ppp) typically precedes it (though not strictly part of the pattern definition itself, common context)
    # 2. Bullish candle (pp) opens within or near body of ppp, closes higher
    # 3. Bullish candle (p) opens within or near body of pp, closes higher
    # 4. Bullish candle (c) opens within or near body of p, closes higher
    elif (open_ppp is not None and                                # Ensure 4th candle (index -4) exists
          # Optional: close_ppp < open_ppp and                     # 1st candle (ppp) is bearish (context)
          close_pp > open_pp and                                  # 2nd candle (pp) is bullish
          open_pp > min(open_ppp, close_ppp) and open_pp < max(open_ppp, close_ppp) and # 2nd opens within or above 1st's body
          close_p > open_p and                                    # 3rd candle (p) is bullish
          open_p > min(open_pp, close_pp) and open_p < max(open_pp, close_pp) and     # 3rd opens within or above 2nd's body
          close_c > open_c and                                    # 4th candle (c) is bullish
          open_c > min(open_p, close_p) and open_c < max(open_p, close_p)):         # 4th opens within or above 3rd's body
        logging.debug(f"Three White Soldiers detected: C({open_c:.5f},{close_c:.5f}) P({open_p:.5f},{close_p:.5f}) PP({open_pp:.5f},{close_pp:.5f}) PPP({open_ppp:.5f},{close_ppp:.5f})")
        return 1

    # --- Doji Pattern (Neutral / No Pattern: 0) ---
    # A candle where the open and close prices are very close, indicating indecision.
    elif abs(open_c - close_c) < (high_c - low_c) * 0.05:
        logging.debug(f"Doji detected, returning 0 (Close: {close_c:.5f})")
        return 0

    # No Specific Pattern Detected
    else:
        return 0

In [12]:
# Flatten columns if MultiIndex (e.g., after yfinance download)
if isinstance(dataF.columns, pd.MultiIndex):
    dataF.columns = dataF.columns.get_level_values(0)

# We need 4 candles for the 'Three White Soldiers' pattern (iloc[-1] to iloc[-4]).
# So, the loop starts from index 3 to ensure dataF[i-3] is valid.
# The first 3 signals (for indices 0, 1, 2) will be 0 as there's not enough preceding data.
REQUIRED_CANDLES_FOR_LARGEST_PATTERN = 4
signal = [0] * (REQUIRED_CANDLES_FOR_LARGEST_PATTERN - 1) # Pre-fill with 3 zeros for indices 0, 1, 2

for i in range(REQUIRED_CANDLES_FOR_LARGEST_PATTERN - 1, len(dataF)):
    # Slice to get 4 candles: dataF[i-3], dataF[i-2], dataF[i-1], dataF[i]
    df = dataF[i-(REQUIRED_CANDLES_FOR_LARGEST_PATTERN-1):i+1]
    signal.append(signal_generator(df))

dataF['signal'] = signal
dataF.signal.value_counts()

signal
0    1692
1      14
2       4
Name: count, dtype: int64

## <u>**2 - Connect to live trading service**</u>

This section establishes the connection to the OANDA live trading service and sets up the automated trading logic. It covers:

-   **Configuration**: Defines mappings for trading intervals (`M1`, `M5`, etc.) to OANDA's granularity and cron schedule settings. It also loads sensitive API credentials (access token and account ID) securely from environment variables using `python-dotenv`.
-   **Data Acquisition**: Implements a `get_candles` function to retrieve live candlestick data for EUR/USD from OANDA.
-   **Monitoring**: Includes a utility function (`get_memory_usage_mb`) to monitor the bot's memory usage during operation, aiding in performance and stability checks.
-   **Trade Execution Logic (`trading_job`)**: This core function is scheduled to run periodically and performs the following steps:
    -   Logs current memory usage.
    -   Fetches the latest candlestick data.
    -   Converts the live candle data into a Pandas DataFrame.
    -   Generates a trading `signal` (buy, sell, or no action) using the `signal_generator` function defined in Section 1.
    -   Calculates **Take Profit (TP)** and **Stop Loss (SL)** levels dynamically based on recent price action to manage risk.
    -   Places market orders (buy or sell) with attached TP/SL orders via the OANDA API if an actionable signal is detected.
    -   Logs all trading decisions and API responses.
    -   Includes robust error handling, triggering an `emergency_stop` in case of critical failures.
-   **Emergency Stop**: Provides `close_all_positions` and `emergency_stop` functions to immediately close any open positions and shut down the trading bot, offering a critical safety mechanism.
-   **Scheduling**: Utilizes `APScheduler` to run the `trading_job` automatically at predefined intervals (e.g., every 5 minutes for 'M5' trading interval) during weekdays.
-   **Interactive Control**: Presents an "EMERGENCY STOP" button using `ipywidgets` for manual intervention to stop the bot and close all positions instantly.

In [13]:
from apscheduler.schedulers.blocking import BlockingScheduler
from oandapyV20 import API
import oandapyV20.endpoints.orders as orders
from oandapyV20.contrib.requests import MarketOrderRequest
from oanda_candles import Pair, Gran, CandleClient
from oandapyV20.contrib.requests import TakeProfitDetails, StopLossDetails

In [14]:
from oanda_candles import Gran

INTERVAL_MAP = {
    "M1": Gran.M1,
    "M5": Gran.M5,
    "M15": Gran.M15,
    "M30": Gran.M30,
    "H1": Gran.H1,
    "H4": Gran.H4,
    "D": Gran.D
}

SCHEDULER_MINUTE_MAP = {
    "M1": "*",    # Every minute
    "M5": "*/5",  # Every 5th minute
    "M15": "*/15", # Every 15th minute
    "M30": "*/30", # Every 30th minute
    "H1": "1",    # Minute 1 of every hour (hourly)
    "H4": "1",    # Minute 1 of every hour (hourly, for 4-hour candles)
    "D": "1"      # Minute 1 of every hour (hourly, for daily candles)
}

In [15]:
import psutil
import os

def get_memory_usage_mb():
    """Returns memory usage of the current process in MB."""
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    return mem_info.rss / (1024 * 1024) # Resident Set Size in MB

In [16]:
from dotenv import load_dotenv

load_dotenv()  # Loads variables from .env

access_token = os.getenv("OANDA_API_KEY")
accountID = os.getenv("OANDA_ACCOUNT_ID")
trading_interval_str = os.getenv("TRADING_INTERVAL", "M5")

In [17]:
def get_candles(n):
    trading_interval_str = os.getenv("TRADING_INTERVAL", "M5") # Default to M1 if not set
    granularity = INTERVAL_MAP.get(trading_interval_str, Gran.M5)

    client = CandleClient(access_token, real=False)
    collector = client.get_collector(Pair.EUR_USD, granularity)
    candles = collector.grab(n)
    return candles

candles = get_candles(10)
for candle in candles:
    print(float(str(candle.bid.o))>1)

True
True
True
True
True
True
True
True
True
True


In [18]:
scheduler = BlockingScheduler()

In [19]:
import oandapyV20.endpoints.positions as positions

def close_all_positions():
    client = API(access_token)
    # This closes all open positions for EUR/USD
    data = {
        "longUnits": "ALL",   # Close all long positions
        "shortUnits": "ALL"   # Close all short positions
    }
    r = positions.PositionClose(accountID, instrument="EUR_USD", data=data)
    response = client.request(r)
    print("All positions closed:", response)

In [20]:
def emergency_stop():
    close_all_positions()
    scheduler.shutdown()
    print("All positions closed and bot stopped.")

In [None]:
def trading_job():
    logging.info("--- Trading job started ---")
    
    try:
        current_memory_mb = get_memory_usage_mb()
        logging.info(f"Current memory usage: {current_memory_mb:.2f} MB")
    except NameError:
        logging.warning("get_memory_usage_mb function not found. Skipping memory logging.")
    except Exception as e:
        logging.warning(f"Error getting memory usage: {e}")

    try:
        candles = get_candles(10)

        if not candles:
            logging.warning("No candles received. Skipping trading decision.")
            return # Exit the job if no candles

        if len(candles) < 10:
            logging.warning(f"Only {len(candles)} candles received. Need at least 10 for flexible pattern analysis. Skipping trading decision.")
            return # Exit if not enough candles

        dfstream = pd.DataFrame(columns=['Open', 'Close', 'High', 'Low'])

        i=0
        for candle in candles:
            dfstream.loc[i, ['Open']] = float(str(candle.bid.o))
            dfstream.loc[i, ['Close']] = float(str(candle.bid.c))
            dfstream.loc[i, ['High']] = float(str(candle.bid.h))
            dfstream.loc[i, ['Low']] = float(str(candle.bid.l))
            i=i+1
        
        dfstream['Open'] = dfstream['Open'].astype(float)
        dfstream['Close'] = dfstream['Close'].astype(float)
        dfstream['High'] = dfstream['High'].astype(float)
        dfstream['Low'] = dfstream['Low'].astype(float)

        if dfstream.isnull().values.any():
            logging.error("DataFrame contains NaN values after conversion. Skipping trading decision.")
            return

        signal = signal_generator(dfstream.iloc[:-1, :])

        client = API(access_token)

        SLTPRatio = 2
        previous_candleR = abs(dfstream['High'].iloc[-2]-dfstream['Low'].iloc[-2]) 

        current_open_price = dfstream['Open'].iloc[-1]
        SLBuy = current_open_price - previous_candleR
        SLSell = current_open_price + previous_candleR
        TPBuy = current_open_price + previous_candleR * SLTPRatio
        TPSell = current_open_price - previous_candleR * SLTPRatio

        logging.info(f"DataFrame for signal (last 2 candles shown):\\n{dfstream.iloc[:-1, :].tail(2)}")
        logging.info(f"Calculated TP/SL: TPBuy={TPBuy:.5f}, SLBuy={SLBuy:.5f}, TPSell={TPSell:.5f}, SLSell={SLSell:.5f}")
        logging.info(f"Generated Trading Signal: {signal}")

        # Sell
        if signal == 1:
            logging.info("SELL signal detected. Attempting to place order.")
            mo = MarketOrderRequest(instrument="EUR_USD", units=-1000, 
                                    takeProfitOnFill=TakeProfitDetails(price=str(TPSell)).data, 
                                    stopLossOnFill=StopLossDetails(price=str(SLSell)).data)
            r = orders.OrderCreate(accountID, data=mo.data)
            rv = client.request(r)
            logging.info(f"Sell Order Response: {rv}")
        
        # Buy
        elif signal == 2:
            logging.info("BUY signal detected. Attempting to place order.")
            mo = MarketOrderRequest(instrument="EUR_USD", units=1000, 
                                    takeProfitOnFill=TakeProfitDetails(price=str(TPBuy)).data, 
                                    stopLossOnFill=StopLossDetails(price=str(SLBuy)).data)
            r = orders.OrderCreate(accountID, data=mo.data)
            rv = client.request(r)
            logging.info(f"Buy Order Response: {rv}")
        
        # No Pattern
        else:
            logging.info("No actionable trading signal (0). No order placed.")

    except Exception as e:
        logging.exception("Critical error in trading_job! Triggering emergency stop.")
        try:
            emergency_stop()
        except NameError:
            logging.error("Emergency stop function not found. Please ensure 'emergency_stop()' is defined.")
            pass
        except Exception as es_e:
            logging.exception(f"Error during emergency_stop execution: {es_e}")

In [22]:
import ipywidgets as widgets
from IPython.display import display

button = widgets.Button(description="EMERGENCY STOP", button_style='danger')

def on_button_clicked(b):
    emergency_stop()

button.on_click(on_button_clicked)
display(button)

Button(button_style='danger', description='EMERGENCY STOP', style=ButtonStyle())

## <u>**3 - Calling Functions**</u>

This section outlines how the automated trading bot's core functionality is initiated and managed through a scheduled job. It specifically details:

-   **Scheduler Initialization**: Initializes a `BlockingScheduler` from `APScheduler` to manage the execution of the trading job.
-   **Dynamic Scheduling**: Configures the `trading_job` to run automatically at specific intervals using a cron-like schedule. The frequency of execution (`minute` setting) is dynamically determined based on the `TRADING_INTERVAL` environment variable (e.g., every 5 minutes for 'M5' candles).
-   **Operational Hours**: The job is set to run only on weekdays, from midnight to 11 PM UTC, aligning with typical forex market hours.
-   **Timezone Management**: The scheduler is configured with the 'Europe/London' timezone to ensure accurate timing of job execution.
-   **Bot Activation**: Starts the scheduler, putting the automated trading system into operation, continuously monitoring for signals and executing trades as programmed.

In [None]:
scheduler_minute_setting = SCHEDULER_MINUTE_MAP.get(trading_interval_str, "*")

scheduler.add_job(
    trading_job,
    'cron',
    day_of_week='mon-fri',
    hour='00-23',
    minute=scheduler_minute_setting,
    start_date='2025-07-23 12:00:00',
    timezone='Europe/London'
)

logging.info("Scheduler started.")
scheduler.start()

2025-07-24 14:32:58,911 - INFO - Adding job tentatively -- it will be properly scheduled when the scheduler starts
2025-07-24 14:32:58,912 - INFO - Scheduler started.
2025-07-24 14:32:58,913 - INFO - Added job "trading_job" to job store "default"
2025-07-24 14:32:58,914 - INFO - Scheduler started
2025-07-24 14:35:00,009 - INFO - Running job "trading_job (trigger: cron[day_of_week='mon-fri', hour='0-23', minute='*/5'], next run at: 2025-07-24 14:40:00 BST)" (scheduled at 2025-07-24 14:35:00+01:00)
2025-07-24 14:35:00,011 - INFO - --- Trading job started ---
2025-07-24 14:35:00,012 - INFO - Current memory usage: 152.30 MB
2025-07-24 14:35:00,358 - INFO - setting up API-client for environment practice
2025-07-24 14:35:00,360 - INFO - DataFrame for signal (last 2 candles shown):\n      Open    Close     High      Low
7  1.17605  1.17686  1.17723  1.17595
8  1.17689  1.17625  1.17729  1.17622
2025-07-24 14:35:00,360 - INFO - Calculated TP/SL: TPBuy=1.17837, SLBuy=1.17516, TPSell=1.17409, SL