## Brief overview of this strategy

This code is inspired by a famous momentum strategy, with EMA and MACD as the main indicator. 

THe essence is that, 
- when EMA5 > EMA50 and MACD > 0, the trend is considered "bullish", we enter long position
- when EMA5 < EMA50 and MACD < 0, the trend is considered "bearish", we enter short position

#### Final version of stock algo-trading

##### What we have tested: 

- timeframe of data ( 5min / 15min / 30min / 1 hr / 2hr / 3 hr / 6 hr / 8 hr / 1 day)

- stockwise, no.of trade within a trend ( 1 / unlimited)

##### Result: 

- Using (8 hr / 1 day), allow only 1 trade to be executed per trend, results in the best performance

In [None]:
from AlgoAPI import AlgoAPIUtil, AlgoAPI_Backtest
import pandas as pd
import numpy as np
from datetime import datetime

class BearishFSM: # Finite State Machine
    """
    Manages trading state, order placement, and risk management based on signals
    generated from the 8-hour bar data.
    """
    # ==============================================================
    # ========================  __init__ ===========================
    # ==============================================================
    def __init__(self, df, instrument, evt, account_balance=0, tick_size=0.01):
        self.df = df
        self.instrument = instrument
        self.evt = evt
        self.account_balance = account_balance
        self.tick_size = tick_size

        # Position state
        self.position_open = False
        self.pending = None
        self.stop_loss_price = None
        self.take_profit_price = None
        self.enter_market_trade_side = None
        self.tradeID = None
        
        # Trend state
        self.last_trend_seen = 'NO'
        self.trade_executed = False

        # --- Strategy Parameters ---
        self.risk_reward_ratio = 1.8
        self.atr_period = 14
        self.atr_multiplier = 2.0
        self.risk_percent = 0.5

    # ==============================================================
    # ===================== run_strategy_logic =====================
    # ==============================================================
    def run_strategy_logic(self, ts):
        """
        Main FSM logic, called by InstrumentBar when a new 8H bar closes.
        'ts' is the timestamp of the bar to analyze.
        """
        # Log EMA_50 at the current timestamp
        if ts in self.df.index:
            ema_50 = self.df.loc[ts, "EMA_50"]
            self.evt.consoleLog(f"{self.instrument} EMA_50 at {ts}: {ema_50:.4f}")
        else:
            self.evt.consoleLog(f"EMA_50 not found for {ts} in {self.instrument} data.")

        # If a position is open, monitor for exit conditions
        if self.position_open:
            self.monitor_exit(ts)
            return

        # Ensure timestamp exists in the DataFrame
        if ts not in self.df.index:
            self.evt.consoleLog(f"Error: Timestamp {ts} not found in FSM DataFrame for {self.instrument}.")
            return

        trend = self.df.loc[ts, "trend"]

        # If the trend changes, reset the flag to allow a new trade in this leg
        if trend != self.last_trend_seen:
            self.trade_executed = False
            self.last_trend_seen = trend

        if trend == "NO" or self.trade_executed:
            return # Do nothing on neutral trend or if already traded this leg

        # --- Entry Signal Detected ---
        close_price = self.df.loc[ts, "Close"]
        stop, take = self._calculate_atr(entry_point=ts, trend=trend)

        stop_loss_dollar = abs(close_price - stop)
        if stop_loss_dollar < self.tick_size:
            self.evt.consoleLog(f"Stop loss is too small for {self.instrument}, skipping trade.")
            return

        volume = self.calculate_share_size(
            risk_percent=self.risk_percent,
            stop_loss_dollar=stop_loss_dollar,
            entry_price=close_price
        )

        if volume > 0:
            self.send_entry_order(ts=ts, trend=trend, entry_price=close_price, stop=stop, take=take, volume=volume)
        else:
            self.evt.consoleLog(f"Calculated volume is 0 for {self.instrument}, no order sent.")

    # ==============================================================
    # ===================== Helper Functions =======================
    # ==============================================================
    def calculate_share_size(self, risk_percent, stop_loss_dollar, entry_price, max_leverage=4.5):
        risk_amount = self.account_balance * risk_percent / 100
        raw_size = risk_amount // stop_loss_dollar
        max_size_margin = (self.account_balance * max_leverage) // entry_price
        final_size = int(min(raw_size, max_size_margin))
        self.evt.consoleLog(
            f"{self.instrument} bal={self.account_balance:.2f} risk%={risk_percent} sl$={stop_loss_dollar:.2f} "
            f"riskAmt={risk_amount:.2f} rawShares={raw_size} marginCap={max_size_margin} finalShares={final_size}"
        )
        return final_size

    def wilder_atr(self, tr, length=14):
        return tr.ewm(alpha=1/length, adjust=False).mean()
        
    def _calculate_atr(self, entry_point, trend="NO"):
        idx = self.df.index.get_loc(entry_point)
        tr = (self.df["High"] - self.df["Low"]).abs().combine(
             (self.df["High"] - self.df["Close"].shift()).abs(), max).combine(
             (self.df["Low"] - self.df["Close"].shift()).abs(), max)
        
        atr = self.wilder_atr(tr, self.atr_period).iloc[idx]
        risk = self.atr_multiplier * atr
        close = self.df.loc[entry_point, "Close"]
    
        stop = close + risk if trend == "BE" else close - risk
        take = close - self.risk_reward_ratio * risk if trend == "BE" else close + self.risk_reward_ratio * risk
        
        return stop, take
        
    def send_entry_order(self, ts, trend, entry_price, stop, take, volume):
        self.pending = {
            "orderRef": ts, "trend": trend, "stop": stop, "take": take, "volume": volume
        }
        order = AlgoAPIUtil.OrderObject(
            instrument=self.instrument,
            orderRef=ts,
            openclose="open",
            buysell=1 if trend == "BU" else -1,
            ordertype=0, # Market Order
            volume=volume,
            stopLossLevel=stop,
            takeProfitLevel=take
        )
        self.evt.sendOrder(order)
        self.trade_executed = True
        self.evt.consoleLog(
            f"{self.instrument} Order sent for trend: {trend} at price: {entry_price} | Stop: {stop} | Take: {take} | Volume: {volume}"
        )

    def monitor_exit(self, ts):
        if self.enter_market_trade_side == "BU":
            if (self.df.loc[ts]["EMA_5"] < self.df.loc[ts]["EMA_50"]) or (self.df.loc[ts]["MACD_fast"] < 0):
                self.evt.consoleLog(f"Exit condition met for LONG {self.instrument}. Closing position.")
                self.evt.update_opened_order(tradeID=self.tradeID, tp=self.df.loc[ts]["Close"], sl=self.df.loc[ts]["Close"])
                self.reset()
        elif self.enter_market_trade_side == "BE":
            if (self.df.loc[ts]["EMA_5"] > self.df.loc[ts]["EMA_50"]) or (self.df.loc[ts]["MACD_fast"] > 0):
                self.evt.consoleLog(f"Exit condition met for SHORT {self.instrument}. Closing position.")
                self.evt.update_opened_order(tradeID=self.tradeID, tp=self.df.loc[ts]["Close"], sl=self.df.loc[ts]["Close"])
                self.reset()
                    
    def reset(self):
        self.position_open = False
        self.stop_loss_price = None
        self.take_profit_price = None
        self.tradeID = None
        self.enter_market_trade_side = None

class InstrumentBar:
    """
    Handles data acquisition and preparation for a single instrument.
    It fetches historical 1-hour data, resamples it to 8-hour bars,
    calculates technical indicators, and triggers the trading FSM.
    """
    # FIX 1: Add `initial_timestamp` to the constructor to avoid using datetime.utcnow()
    def __init__(self, symbol: str, evt, initial_timestamp):
        self.symbol = symbol
        self.evt = evt
        self.fsm = None
        self.data = pd.DataFrame() 
        self.last_processed_8h_timestamp = None

        # Load initial data on startup using the provided timestamp
        initial_dt = pd.to_datetime(initial_timestamp)
        if not self._load_and_prepare_data(end_timestamp=initial_dt):
            self.evt.consoleLog(f"CRITICAL: Failed to initialize historical data for {self.symbol}. The bot may not trade.")

    # FIX 2: Change method signature to only accept end_timestamp
    def _load_and_prepare_data(self, end_timestamp):
        """
        Queries historical data, resamples to 8H, calculates indicators,
        and updates the internal DataFrame 'self.data'.
        """
        contract = {"instrument": self.symbol}
        num_of_bars = 500
        interval = 'D'
        # Use the provided end_timestamp directly. This solves the "front running" warning.
        query_timestamp = end_timestamp

        self.evt.consoleLog(f"Querying {num_of_bars} '{interval}' bars for {self.symbol} ending at {query_timestamp}")
        hist_data = self.evt.getHistoricalBar(contract, num_of_bars, interval, query_timestamp)
        # self.evt.consoleLog(hist_data)
        if not hist_data or len(hist_data) < 50:
            self.evt.consoleLog(f"Warning: Insufficient historical data for {self.symbol}. Received {len(hist_data)} bars.")
            return False

        rows = [{"Datetime": pd.to_datetime(bar['t']), "Open": bar["o"], "High": bar["h"], "Low": bar["l"], "Close": bar["c"]} for ts, bar in hist_data.items()]
        
        if not rows:
            self.evt.consoleLog(f"Warning: No data rows processed for {self.symbol}.")
            return False

        df_1h = pd.DataFrame(rows).set_index("Datetime").sort_index()
        # self.evt.consoleLog(df_1h)
        # FIX 3: THE CORE FIX. Resample the entire DataFrame at once using an aggregation dictionary.
        # This creates the correct OHLC columns with uppercase names, solving the KeyError.
        """
        aggregation_rules = {
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last'
        }
        resampled_df = df_1h.resample('8H', label='right', closed='right').agg(aggregation_rules)
        self.evt.consoleLog(resampled_df)
        if resampled_df.empty:
            self.evt.consoleLog(f"Warning: Resampling to 8H resulted in an empty DataFrame for {self.symbol}.")
            return False
            
        resampled_df.dropna(inplace=True)

        if len(resampled_df) < 50:
            self.evt.consoleLog(f"Warning: Not enough 8H bars ({len(resampled_df)}) to calculate EMA_50 for {self.symbol}.")
            return False
        """
        resampled_df = df_1h
        # Calculate Indicators (This section now works correctly)
        resampled_df["EMA_5"] = resampled_df["Close"].ewm(span=5, adjust=False).mean()
        resampled_df["EMA_50"] = resampled_df["Close"].ewm(span=50, adjust=False).mean()
        resampled_df["EMA_12"] = resampled_df["Close"].ewm(span=12, adjust=False).mean()
        resampled_df["EMA_26"] = resampled_df["Close"].ewm(span=26, adjust=False).mean()
        resampled_df["MACD_slow"] = resampled_df["EMA_12"] - resampled_df["EMA_26"]
        resampled_df["MACD_fast"] = resampled_df["MACD_slow"].ewm(span=9, adjust=False).mean()
        resampled_df["trend"] = np.where(
            (resampled_df["EMA_5"] > resampled_df["EMA_50"]) & (resampled_df["MACD_fast"] > 0), "BU",
            np.where((resampled_df["EMA_5"] < resampled_df["EMA_50"]) & (resampled_df["MACD_fast"] < 0), "BE", "NO")
        )
        
        self.data = resampled_df.copy()
        return True

    def update(self, snap, balance):
        """
        Called on each data snapshot. It determines if a new 8H bar has closed
        and triggers a data refresh and the FSM logic if it has.
        """
        current_dt = pd.to_datetime(snap["timestamp"])

        if self.data.empty:
            return # Wait until data is successfully loaded

        # Initialize on first run after data load
        if self.last_processed_8h_timestamp is None:
            # Check if there's any data to process before initializing
            if not self.data.empty:
                self.last_processed_8h_timestamp = self.data.index[-1]
                # Create the FSM instance now that we have data
                self.fsm = BearishFSM(self.data, self.symbol, self.evt, balance)
            return

        # A new 8-hour bar has closed if the current time has crossed the next 8-hour boundary
        # relative to the last bar we processed.
        if current_dt > self.last_processed_8h_timestamp:
            self.evt.consoleLog(f"New 1D bar detected for {self.symbol}. Current time: {current_dt}, Last bar time: {self.last_processed_8h_timestamp}.")
            
            if self._load_and_prepare_data(end_timestamp=current_dt):
                if self.data.index[-1] > self.last_processed_8h_timestamp:
                    self.last_processed_8h_timestamp = self.data.index[-1]
                    
                    # Update FSM with the latest data and balance
                    self.fsm.df = self.data
                    self.fsm.account_balance = balance
                    
                    # Run the FSM logic on the most recently closed bar
                    self.fsm.run_strategy_logic(self.last_processed_8h_timestamp)
                else:
                    self.evt.consoleLog("Data reloaded, but no new 1D bar was generated. Waiting.")
            else:
                 self.evt.consoleLog("Failed to load and prepare new data.")

class AlgoEvent:

    def __init__(self):
        self.evt = None
        self.books = {}

    def start(self, mEvt):
        self.evt = AlgoAPI_Backtest.AlgoEvtHandler(self, mEvt)
        self.evt.start()
            
    def on_bulkdatafeed(self, isSync, bd, ab):
        balance = ab["availableBalance"]
        
        for sym, snap in bd.items():
            if snap['instrument'] is None: continue

            if sym not in self.books:
                self.evt.consoleLog(f"First time seeing {sym}, creating InstrumentBar.")
                # FIX 4: Pass the current snapshot's timestamp to the constructor
                self.books[sym] = InstrumentBar(sym, self.evt, snap['timestamp'])
            
            # FIX 5: Always call update. The update method is responsible for its own state management
            # (e.g., creating the FSM on the first valid run).
            self.books[sym].update(snap, balance)

    def on_orderfeed(self, of):
        sym = of.instrument
        if sym not in self.books or self.books[sym].fsm is None:
            return
        
        fsm = self.books[sym].fsm
        
        if of.openclose == "open" and of.status == "success":
            if fsm.pending and of.orderRef == fsm.pending["orderRef"]:
                fsm.position_open = True
                fsm.stop_loss_price = fsm.pending["stop"]
                fsm.take_profit_price = fsm.pending["take"]
                fsm.enter_market_trade_side = fsm.pending["trend"]
                fsm.tradeID = of.tradeID
                fsm.pending = None
                self.evt.consoleLog(f"Position opened for {sym}. TradeID: {of.tradeID}")

        elif of.openclose == "open" and of.status != "success":
            if fsm.pending and of.orderRef == fsm.pending["orderRef"]:
                self.evt.consoleLog(f"Order for {sym} rejected or failed. Reason: {of.status}")
                fsm.pending = None
                fsm.trade_executed = False 
                                     
        if of.openclose == 'close' and of.status == 'success':
            self.evt.consoleLog(f"Position closed for {sym}. TradeID: {of.tradeID}")
            fsm.reset()
    
    # ... other event handlers remain unchanged ...
    def on_newsdatafeed(self, nd):
        pass

    def on_weatherdatafeed(self, wd):
        pass
    
    def on_econsdatafeed(self, ed):
        pass
        
    def on_corpAnnouncement(self, ca):
        pass
    
    def on_dailyPLfeed(self, pl):
        pass

    def on_openPositionfeed(self, op, oo, uo):
        pass