<a href="https://colab.research.google.com/github/MarriRohan/Stock-Trading-Bot-with-Deep-Q-Learning/blob/main/Stock-Trading-Bot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:

!pip install -q torch PyYAML


In [3]:
import random
import math
import numpy as np
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
import yaml
from typing import Tuple, Dict, Any

# Example config (you can also load from YAML file)
CFG = {
    "max_capital": 100000,
    "daily_loss_limit": 0.02,    # 2% of capital
    "drawdown_limit": 0.15,      # 15% peak-to-trough
    "risk_per_trade": 0.01,      # 1% risk per trade
    "sl_grid": [0.5, 0.75, 1.0], # multiples of ATR (we'll use price range proxy)
    "tp_grid": [1.0, 1.5, 2.0],
    "trailing_grid": [0.5, 0.75, 1.0],
    "learning_rate": 5e-4,
    "gamma": 0.99,
    "batch_size": 64,
    "replay_size": 20000,
    "train_start": 500,
    "episodes": 20,
    "sync_every": 5
}


In [4]:
# Build discrete action grid that maps an integer action index
# to (sl_mult, tp_mult, trailing_mult) where multipliers are relative to ATR proxy.
def build_action_grid(sl_grid, tp_grid, trailing_grid):
    grid = []
    for sl in sl_grid:
        for tp in tp_grid:
            for tr in trailing_grid:
                grid.append((sl, tp, tr))
    return grid

ACTION_GRID = build_action_grid(CFG["sl_grid"], CFG["tp_grid"], CFG["trailing_grid"])
ACTION_DIM = len(ACTION_GRID)
print("Action grid size:", ACTION_DIM)


Action grid size: 27


In [5]:
class TradingEnv:
    """
    Minimal trading environment for demo.
    - data: list/array of prices (close)
    - action: integer index mapping to (SL_mult, TP_mult, trailing_mult)
    - position sizing uses risk_per_trade and SL distance to compute size
    - Enforces MAX_CAPITAL, DAILY_LOSS_LIMIT, DRAWDOWN_LIMIT
    NOTE: This is a simplified env for demonstration and testing only.
    """
    def __init__(self, prices, config: Dict[str, Any]):
        self.prices = np.asarray(prices, dtype=float)
        self.cfg = config
        self.max_capital = config["max_capital"]
        self.daily_loss_limit = config["daily_loss_limit"]
        self.drawdown_limit = config["drawdown_limit"]
        self.risk_per_trade = config["risk_per_trade"]
        self.action_grid = ACTION_GRID
        self.reset()

    def reset(self):
        self.wallet = float(self.max_capital)
        self.equity_peak = float(self.wallet)
        self.position = None           # dict: {'size', 'entry_price', 'sl', 'tp', 'trailing'}
        self.current_step = 0
        self.daily_start_equity = float(self.wallet)
        self.done = False
        return self._get_obs()

    def step(self, action_idx: int) -> Tuple[np.ndarray, float, bool, dict]:
        info = {}
        price = float(self.prices[self.current_step])
        reward = 0.0

        # Map action index to sl/tp/tr multipliers
        sl_mult, tp_mult, tr_mult = self.action_grid[action_idx]

        # Use a simple ATR proxy: local range of last 5 bars (if available) else 1.0
        window = 5
        if self.current_step >= 1:
            start = max(0, self.current_step - window)
            local = self.prices[start:self.current_step+1]
            atr_proxy = float(np.mean(np.abs(np.diff(local)))) if len(local) > 1 else 1.0
            if atr_proxy <= 0:
                atr_proxy = 1.0
        else:
            atr_proxy = 1.0

        # If flat, open a long position of size computed by risk per trade and SL distance
        if self.position is None:
            # Suppose we open LONG only for demo
            sl_distance = sl_mult * atr_proxy
            if sl_distance <= 0:
                sl_distance = 1.0
            # Risk amount in absolute rupees
            risk_amount = self.risk_per_trade * self.wallet
            # position size (number of units) = risk / sl_distance
            size = risk_amount / sl_distance
            # compute SL and TP prices
            sl_price = price - sl_distance
            tp_price = price + tp_mult * atr_proxy
            # trailing amount in price units
            trailing_amt = tr_mult * atr_proxy
            self.position = {
                "size": size,
                "entry_price": price,
                "sl": sl_price,
                "tp": tp_price,
                "trailing_amt": trailing_amt,
                "peak_price": price  # used for trailing stop logic
            }
            info["order"] = "open_long"
        else:
            # Evaluate the position at current price and check SL/TP/trailing
            p = price
            pos = self.position
            size = pos["size"]
            entry = pos["entry_price"]

            # update peak price for trailing
            if p > pos["peak_price"]:
                pos["peak_price"] = p

            # Check TP
            if p >= pos["tp"]:
                pnl = (pos["tp"] - entry) * size
                self.wallet += pnl
                reward = pnl
                info["exit"] = "tp_hit"
                self.position = None

            # Check SL
            elif p <= pos["sl"]:
                pnl = (pos["sl"] - entry) * size
                self.wallet += pnl
                reward = pnl
                info["exit"] = "sl_hit"
                self.position = None

            else:
                # Check trailing stop: if price has fallen from peak more than trailing_amt
                if pos["peak_price"] - p >= pos["trailing_amt"]:
                    # execute at current price
                    pnl = (p - entry) * size
                    self.wallet += pnl
                    reward = pnl
                    info["exit"] = "trailing_hit"
                    self.position = None
                else:
                    # hold â€” no reward this tick
                    reward = 0.0

        # Update equity peak/drawdown & daily loss
        if self.wallet > self.equity_peak:
            self.equity_peak = self.wallet
        drawdown = (self.equity_peak - self.wallet) / max(1.0, self.equity_peak)
        if drawdown >= self.drawdown_limit:
            info["circuit"] = "drawdown_limit"
            self.done = True

        daily_loss = (self.daily_start_equity - self.wallet) / max(1.0, self.daily_start_equity)
        if daily_loss >= self.daily_loss_limit:
            info["circuit"] = "daily_loss_limit"
            self.done = True

        # Move next
        self.current_step += 1
        if self.current_step >= len(self.prices) - 1:
            # close any open position at last price
            if self.position is not None:
                final_price = float(self.prices[-1])
                pnl = (final_price - self.position["entry_price"]) * self.position["size"]
                self.wallet += pnl
                reward += pnl
                info["exit"] = info.get("exit", "") + "|final_close"
                self.position = None
            self.done = True

        obs = self._get_obs()
        return obs, float(reward), bool(self.done), info

    def _get_obs(self) -> np.ndarray:
        # Observation: [price, wallet_ratio, equity_peak_ratio, step_norm]
        price = float(self.prices[self.current_step])
        wallet_ratio = self.wallet / max(1.0, self.max_capital)
        peak_ratio = self.equity_peak / max(1.0, self.max_capital)
        step_norm = float(self.current_step) / max(1, len(self.prices))
        return np.array([price, wallet_ratio, peak_ratio, step_norm], dtype=np.float32)


In [6]:
class SimpleQNet(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, output_dim)
        )

    def forward(self, x):
        return self.net(x)


class DQNAgent:
    def __init__(self, state_dim, action_dim, cfg):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.q = SimpleQNet(state_dim, action_dim)
        self.target_q = SimpleQNet(state_dim, action_dim)
        self.optimizer = optim.Adam(self.q.parameters(), lr=cfg.get("learning_rate", 1e-3))
        self.replay = deque(maxlen=cfg.get("replay_size", 50000))
        self.batch_size = cfg.get("batch_size", 64)
        self.gamma = cfg.get("gamma", 0.99)
        self.epsilon = 1.0
        self.eps_min = 0.05
        self.eps_decay = 0.995
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.q.to(self.device)
        self.target_q.to(self.device)

    def act(self, state: np.ndarray) -> int:
        if random.random() < self.epsilon:
            return random.randrange(self.action_dim)
        self.q.eval()
        with torch.no_grad():
            s_t = torch.tensor(state, dtype=torch.float32, device=self.device).unsqueeze(0)
            qvals = self.q(s_t)
            action = int(torch.argmax(qvals, dim=1).cpu().item())
        self.q.train()
        return action

    def remember(self, s, a, r, s2, done):
        self.replay.append((s, a, r, s2, done))

    def learn(self):
        if len(self.replay) < self.batch_size:
            return
        batch = random.sample(self.replay, self.batch_size)
        s, a, r, s2, done = zip(*batch)
        s = torch.tensor(np.stack(s), dtype=torch.float32, device=self.device)
        a = torch.tensor(a, dtype=torch.int64, device=self.device).unsqueeze(1)
        r = torch.tensor(r, dtype=torch.float32, device=self.device).unsqueeze(1)
        s2 = torch.tensor(np.stack(s2), dtype=torch.float32, device=self.device)
        done = torch.tensor(done, dtype=torch.float32, device=self.device).unsqueeze(1)

        qvals = self.q(s).gather(1, a)
        with torch.no_grad():
            qnext = self.target_q(s2).max(1)[0].unsqueeze(1)
        target = r + (1.0 - done) * self.gamma * qnext
        loss = nn.functional.mse_loss(qvals, target)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # epsilon decay
        if self.epsilon > self.eps_min:
            self.epsilon *= self.eps_decay

    def sync_target(self):
        self.target_q.load_state_dict(self.q.state_dict())


In [7]:
# Toy price series: sine wave + linear drift to simulate market moves
def make_toy_prices(n=1000):
    x = np.linspace(0, 20 * math.pi, n)
    prices = (np.sin(x) * 5.0) + (np.linspace(0, 20, n)) + 100.0
    # add small noise
    prices += np.random.normal(scale=0.5, size=n)
    return prices

prices = make_toy_prices(800)
env = TradingEnv(prices, CFG)
state_dim = 4
action_dim = ACTION_DIM
agent = DQNAgent(state_dim, action_dim, CFG)

# Training loop (fast demo)
for ep in range(CFG["episodes"]):
    s = env.reset()
    done = False
    total_reward = 0.0
    steps = 0
    while not done:
        a = agent.act(s)
        s2, r, done, info = env.step(a)
        agent.remember(s, a, r, s2, done)
        agent.learn()
        s = s2
        total_reward += r
        steps += 1
    if ep % CFG["sync_every"] == 0:
        agent.sync_target()
    print(f"Episode {ep+1}/{CFG['episodes']}  steps={steps}  total_reward={total_reward:.2f}  final_wallet={env.wallet:.2f}  info={info}")


Episode 1/20  steps=59  total_reward=-2631.80  final_wallet=97368.20  info={'exit': 'sl_hit', 'circuit': 'daily_loss_limit'}
Episode 2/20  steps=799  total_reward=109974.50  final_wallet=209974.50  info={'exit': '|final_close'}
Episode 3/20  steps=783  total_reward=251179.31  final_wallet=351179.31  info={'exit': 'sl_hit', 'circuit': 'drawdown_limit'}
Episode 4/20  steps=55  total_reward=-2009.49  final_wallet=97990.51  info={'exit': 'sl_hit', 'circuit': 'daily_loss_limit'}
Episode 5/20  steps=691  total_reward=189697.28  final_wallet=289697.28  info={'exit': 'sl_hit', 'circuit': 'drawdown_limit'}
Episode 6/20  steps=783  total_reward=176767.00  final_wallet=276767.00  info={'exit': 'sl_hit', 'circuit': 'drawdown_limit'}
Episode 7/20  steps=783  total_reward=202155.57  final_wallet=302155.57  info={'exit': 'sl_hit', 'circuit': 'drawdown_limit'}
Episode 8/20  steps=761  total_reward=157033.49  final_wallet=257033.49  info={'exit': 'sl_hit', 'circuit': 'drawdown_limit'}
Episode 9/20  ste

In [8]:
# Save Q-network weights (optional in Colab)
torch.save(agent.q.state_dict(), "dqn_qnet.pth")
print("Model saved to dqn_qnet.pth")

# Quick one-episode evaluation with greedy policy
env_eval = TradingEnv(prices, CFG)
s = env_eval.reset()
done = False
total = 0.0
while not done:
    # greedy action
    agent.epsilon = 0.0
    a = agent.act(s)
    s, r, done, info = env_eval.step(a)
    total += r
print("Eval final wallet:", env_eval.wallet, "total pnl:", total, "info:", info)


Model saved to dqn_qnet.pth
Eval final wallet: 365828.13565495634 total pnl: 265828.1356549564 info: {'exit': 'sl_hit', 'circuit': 'drawdown_limit'}
