In [None]:
import pandas as pd
import numpy as np
import vectorbt as vbt
import tqdm
from tabulate import tabulate
from script import Strategy

class Backtester:
    def __init__(self, data: pd.DataFrame, initial_value: float):
        self.data = data
        self.portfolio_value = initial_value
        self.cash = initial_value
        self.investment = 0.0
        self.current_index = 1
        tickers = data.columns.get_level_values(0).unique()
        self.positions = pd.Series(0, index=tickers)
        self.all_positions = pd.DataFrame(columns=tickers)
        self.tradingState = {}
        self.dates = []
        self.portfolio_history = []
        self.cash_history = []
        self.investment_history = []

    def calculate_positions(self, signal: pd.Series, value, open=True) -> pd.Series:
        if not isinstance(signal, pd.Series):
            raise TypeError(f'For timestamp {self.data.index[self.current_index]}, signal must be a pandas Series, got {type(signal)}')
        if abs(signal).sum() - 1 > 1e-6:
            raise ValueError(f'For timestamp {self.data.index[self.current_index]} the sum of the abs(signals) must not be greater than 1, got {abs(signal).sum()}')
        
        unchanged_positions = self.positions.copy()
        nan_mask = signal.isna()

        prices = (
            self.data.xs('open', level=1, axis=1).iloc[self.current_index]
            if open
            else self.data.xs('close', level=1, axis=1).iloc[self.current_index]
        )

        float_shares = (signal * value) / prices.replace(0, np.nan)

        float_shares = (
            float_shares
            .replace([np.inf, -np.inf], 0)
            .fillna(0)
        )

        new_positions = pd.Series(0, index=float_shares.index, dtype=int)
        longs  = float_shares > 0
        shorts = float_shares < 0

        new_positions[longs]  = np.floor(float_shares[longs]).astype(int)
        new_positions[shorts] = np.ceil (float_shares[shorts]).astype(int)
        
        # Restore unchanged positions for NaN signals
        new_positions[nan_mask] = unchanged_positions[nan_mask]

        return new_positions

    def calculate_cash(self, positions: pd.Series, open=True) -> float:
        index = self.current_index
        price = self.data.xs('open',level=1,axis=1).iloc[index] if open else self.data.xs('close',level=1,axis=1).iloc[index]
        return self.portfolio_value - (abs(positions) * price).sum()

    def update_investment(self, positions: pd.Series, new_day=False) -> float:
        index = self.current_index
        price1 = self.data.xs('close',level=1,axis=1).iloc[index-1] if new_day else self.data.xs('open',level=1,axis=1).iloc[index]
        price2 = self.data.xs('open',level=1,axis=1).iloc[index] if new_day else self.data.xs('close',level=1,axis=1).iloc[index]
        return (positions * (price2 - price1)).sum() + self.investment

    def run(self):
        processed_data = Strategy().process_data(self.data)
        self.all_positions.loc[self.data.index[0]] = self.positions
        traderData = ""
        for i in tqdm.tqdm(range(1, len(self.data))):
            self.tradingState = {
                'processed_data': processed_data[:i],
                'investment': self.investment,
                'cash': self.cash,
                'current_timestamp': self.data.index[self.current_index],
                'traderData': traderData,
                'positions': self.positions,
            }
            signal, traderData = Strategy().get_signals(self.tradingState)
            if signal is None:
                raise ValueError(f'For timestamp {self.data.index[self.current_index]}, signal is None')
            self.investment = self.update_investment(self.positions, new_day=True)
            self.portfolio_value = self.investment + self.cash
            self.positions = self.calculate_positions(signal, self.portfolio_value)
            self.cash = self.calculate_cash(self.positions)
            self.all_positions.loc[self.data.index[i]] = self.positions
            self.current_index += 1

    def vectorbt_run(self):
        open_prices = self.data.xs('open', level=1, axis=1).loc[self.all_positions.index, self.all_positions.columns]
        close_prices = self.data.xs('close', level=1, axis=1).loc[self.all_positions.index, self.all_positions.columns]

        order_size = self.all_positions.diff().fillna(0).astype(int)
        order_size = order_size.mask(order_size == 0)
        order_size.to_csv('order_size.csv')

        portfolio = vbt.Portfolio.from_orders(
            close=close_prices,
            size=order_size,
            price=open_prices,
            init_cash=initial_value,
            freq='1D',
            direction=2,
            cash_sharing=True
        )
        stats_eq = portfolio.stats()
        stats_df = stats_eq.to_frame(name='Value').reset_index()
        stats_df.columns = ['Metric', 'Value']

        print(tabulate(
            stats_df,
            headers='keys',
            tablefmt='psql',
            showindex=False,
            floatfmt=".3f"
        ))

data = pd.read_csv(
    'data/multi_level_ohlcv.csv',
    index_col=0, header=[0,1], parse_dates=True
)[5000:]

first_5_tickers = data.columns.get_level_values(0).unique()[0:2]
data = data.loc[:, data.columns.get_level_values(0).isin(first_5_tickers)]

initial_value = 200000.0
backtester = Backtester(data, initial_value)
backtester.run()
backtester.all_positions.to_csv('positions.csv')
backtester.vectorbt_run()


100%|██████████| 853/853 [00:01<00:00, 561.76it/s]


+----------------------------+----------------------+
| Metric                     | Value                |
|----------------------------+----------------------|
| Start                      | 2021-10-26 09:15:00  |
| End                        | 2025-04-02 09:15:00  |
| Period                     | 854 days 00:00:00    |
| Start Value                | 200000.0             |
| End Value                  | 342925.6803953524    |
| Total Return [%]           | 71.46284019767623    |
| Benchmark Return [%]       | 67.84705427798723    |
| Max Gross Exposure [%]     | 114.22757731678016   |
| Total Fees Paid            | 0.0                  |
| Max Drawdown [%]           | 29.97675224558923    |
| Max Drawdown Duration      | 220 days 00:00:00    |
| Total Trades               | 6                    |
| Total Closed Trades        | 4                    |
| Total Open Trades          | 2                    |
| Open Trade PnL             | 147573.20839535244   |
| Win Rate [%]              

In [4]:
import plotly.graph_objects as go

positions = backtester.all_positions
tickers = positions.columns
open_prices = backtester.data.xs('open', level=1, axis=1).loc[positions.index, tickers]
close_prices = backtester.data.xs('close', level=1, axis=1).loc[positions.index, tickers]
order_size = positions.diff().fillna(0).astype(int).mask(lambda x: x == 0, np.nan)

pf = vbt.Portfolio.from_orders(
    close=close_prices,
    size=order_size,
    price=open_prices,
    init_cash=initial_value,
    freq='1D',
    direction=2,
    cash_sharing=True
)

stats_eq = pf.stats()
eq_curve = pf.value()

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=eq_curve.index,
    y=eq_curve.values,
    mode='lines',
    name='Equity Curve',
    line=dict(color='green', width=3)
))
fig.update_layout(
    title='Portfolio Equity Curve',
    xaxis_title='Date',
    yaxis_title='Portfolio Value',
    template='plotly_white'
)
fig.show()


In [6]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

returns = eq_curve.pct_change().fillna(0)
cum_max = eq_curve.cummax()
drawdown = (eq_curve - cum_max) / cum_max
mean_ret = returns.mean()
median_ret = returns.median()

fig = make_subplots(rows=1, cols=2, subplot_titles=('Daily Returns', 'Drawdown Curve'))
fig.add_trace(
    go.Histogram(
        x=returns,
        nbinsx=50,
        marker_color='orange',
        marker_line_color='white',
        marker_line_width=1,
        opacity=0.8,
        name='Returns'
    ),
    row=1, col=1
)

fig.add_trace(
    go.Scatter(
        x=drawdown.index,
        y=drawdown.values,
        mode='lines',
        line=dict(color='red', width=2),
        name='Drawdown'
    ),
    row=1, col=2
)

fig.update_xaxes(
    title_text='Return',
    row=1, col=1,
    nticks=20,
    showgrid=True
)
fig.update_yaxes(
    title_text='Frequency',
    row=1, col=1,
    showgrid=True
)

fig.update_xaxes(
    title_text='Date',
    row=1, col=2,
    showgrid=False
)
fig.update_yaxes(
    title_text='Drawdown',
    row=1, col=2,
    showgrid=True
)

fig.update_layout(
    title_text='Returns Distribution & Drawdown',
    bargap=0.1,               
    template='plotly_white',
    showlegend=False,
    width=900,
    height=400
)

fig.show()


In [11]:
# Cell 7 — Candlestick + Trades + Holding Visualization (Plotly, box-zoom only, legend top-left)
import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio

pio.renderers.default = 'notebook_connected'

TICKER = "360ONE"

# 1) Extract OHLC & positions from backtester
ohlcv = backtester.data[TICKER][["open", "high", "low", "close"]].copy()
ohlcv.index = pd.to_datetime(ohlcv.index)
pos   = backtester.all_positions[TICKER].reindex(ohlcv.index).ffill().fillna(0)

# 2) Remove weekends
mask = ohlcv.index.weekday < 5
ohlcv = ohlcv.loc[mask]
pos   = pos.loc[mask]

# 3) Build df, compute trades & holdings
df = ohlcv.assign(pos=pos)
df["dpos"]    = df["pos"].diff().fillna(df["pos"])
buys          = df.index[df["dpos"] >  0]
sells         = df.index[df["dpos"] <  0]
df["holding"] = df["pos"] != 0
df["grp"]     = (df["holding"] != df["holding"].shift(fill_value=False)).cumsum()

blocks = []
for _, sub in df.groupby("grp"):
    if sub["holding"].iat[0]:
        clr = "rgba(0,200,0,0.2)" if sub["pos"].mean() > 0 else "rgba(200,0,0,0.2)"
        blocks.append((sub.index[0], sub.index[-1], clr))

# 4) Build figure
fig = go.Figure()

# Candlestick trace
fig.add_trace(go.Candlestick(
    x=df.index,
    open=df["open"], high=df["high"],
    low=df["low"],   close=df["close"],
    name=TICKER
))

# Shaded holding periods
for start, end, color in blocks:
    fig.add_vrect(x0=start, x1=end, fillcolor=color, line_width=0)

# Buy markers
fig.add_trace(go.Scatter(
    x=buys, y=df.loc[buys,"low"]*0.99,
    mode="markers",
    marker=dict(symbol="triangle-up", color="green", size=10),
    name="Buy"
))

# Sell markers
fig.add_trace(go.Scatter(
    x=sells, y=df.loc[sells,"high"]*1.01,
    mode="markers",
    marker=dict(symbol="triangle-down", color="red", size=10),
    name="Sell"
))

# 5) Layout with box-zoom only and legend in top-left
fig.update_layout(
    title=f"{TICKER} Candlestick Chart with Trades & Holdings",
    dragmode="zoom",            # box zoom
    hovermode="x unified",
    xaxis=dict(
        title="Date",
        type="date",
        fixedrange=False,
        rangebreaks=[dict(bounds=["sat", "sun"])]
    ),
    yaxis=dict(
        title="Price",
        tickformat=",.2f",
        fixedrange=False
    ),
    template="plotly_white",
    legend=dict(
        orientation="h",
        x=0,      # left
        y=1,      # top
        xanchor="left",
        yanchor="top"
    )
)

# 6) Show with mode bar restricted to box zoom + reset only
fig.show(config={
    "scrollZoom": False,
    "modeBarButtonsToRemove": [
        "pan2d", "select2d", "lasso2d", 
        "zoomIn2d", "zoomOut2d", "autoScale2d"
    ]
})


In [13]:
# Cell 8 — FIFO PnL breakdown (realized vs unrealized) + Market Value with Plotly
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.io as pio

pio.renderers.default = 'notebook_connected'

TICKER = "3MINDIA"

# Guard: need both price data and positions
if TICKER not in data or TICKER not in positions:
    print(f"Ticker '{TICKER}' not found in data or positions.")
else:
    # --- Prepare df of close prices + positions on trading days only ---
    ohlcv = data[TICKER][["close"]].copy()
    ohlcv.index = pd.to_datetime(ohlcv.index)
    pos = positions[TICKER].reindex(ohlcv.index).ffill().fillna(0)
    pos.index = pd.to_datetime(pos.index)
    df = ohlcv.assign(pos=pos).dropna()
    df = df[df.index.weekday < 5]  # drop weekends

    # compute daily position changes
    df["dpos"] = df["pos"].diff().fillna(df["pos"])

    # containers
    inventory = []      # list of [qty_remaining, cost_price]
    realized_pnl = 0.0
    rec = {
        "market_value": [],
        "realized_pnl": [],
        "unrealized_pnl": [],
        "total_pnl": []
    }

    # step through each day
    for dt, row in df.iterrows():
        price = row["close"]
        change = row["dpos"]

        # 1) BUY: add to inventory
        if change > 0:
            inventory.append([change, price])

        # 2) SELL: match FIFO, accumulate realized PnL
        elif change < 0:
            qty_to_sell = -change
            while qty_to_sell > 0 and inventory:
                qty_on_hand, cost = inventory[0]
                used = min(qty_on_hand, qty_to_sell)
                realized_pnl += used * (price - cost)
                qty_on_hand  -= used
                qty_to_sell  -= used
                if qty_on_hand == 0:
                    inventory.pop(0)
                else:
                    inventory[0][0] = qty_on_hand

        # 3) Unrealized PnL & Market Value
        unreal = sum((price - cost) * qty for qty, cost in inventory)
        mktv   = sum(qty * price for qty, cost in inventory)

        # record
        rec["market_value"].append(mktv)
        rec["realized_pnl"].append(realized_pnl)
        rec["unrealized_pnl"].append(unreal)
        rec["total_pnl"].append(realized_pnl + unreal)

    # attach to df
    for col in rec:
        df[col] = rec[col]

    # --- Plot with Plotly ---
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=df.index,
        y=df["market_value"],
        mode="lines",
        name="Market Value",
        line=dict(width=2, color="blue")
    ))
    fig.add_trace(go.Scatter(
        x=df.index,
        y=df["realized_pnl"],
        mode="lines",
        name="Cumulative Realized PnL",
        line=dict(width=2, color="green")
    ))
    fig.add_trace(go.Scatter(
        x=df.index,
        y=df["unrealized_pnl"],
        mode="lines",
        name="Unrealized PnL",
        line=dict(width=2, color="orange")
    ))
    fig.add_trace(go.Scatter(
        x=df.index,
        y=df["total_pnl"],
        mode="lines",
        name="Total PnL",
        line=dict(width=2, color="red")
    ))

    fig.update_layout(
        title=f"{TICKER} Market Value & PnL Breakdown",
        hovermode="x unified",
        xaxis=dict(
            title="Date",
            type="date",
            rangebreaks=[dict(bounds=["sat","sun"])],
            rangeslider=dict(visible=True),
            fixedrange=False
        ),
        yaxis=dict(
            title="Amount",
            tickformat=", .2f",
            fixedrange=False
        ),
        template="plotly_white",
        legend=dict(
            orientation="h",
            y=1.02,
            x=0,
            xanchor="left"
        )
    )
    fig.show(config={
        "scrollZoom": False,
        "modeBarButtonsToRemove": [
            "pan2d", "select2d", "lasso2d",
            "zoomIn2d", "zoomOut2d", "autoScale2d"
        ]
    })
