# Crypto Portfolio Backtesting System with Nautilus Trader

This notebook establishes a foundational system for backtesting and comparing different portfolio management strategies for cryptocurrencies using the `nautilus-trader` framework.

We will implement and compare two fundamental strategies:
1.  **Hodler Strategy**: Buys and holds BTC and ETH in equal dollar amounts at the start of the backtest.
2.  **Index Rebalance Strategy**: Buys BTC and ETH and periodically rebalances the portfolio to maintain a fixed 50/50 weight.

We will use 1-hour bar data for `BTC/USDT` and `ETH/USDT`.

In [None]:
# !pip install nautilus_trader pyarrow pandas matplotlib numpy

## 1. Imports and Setup

First, we import all the necessary components from `nautilus-trader` and other libraries for data handling and plotting. We'll also set up our logging and data directory.

In [15]:
import os
import shutil
import zipfile
from decimal import Decimal
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Core Nautilus Trader components
from nautilus_trader.backtest.node import BacktestNode
from nautilus_trader.config import BacktestRunConfig, BacktestVenueConfig, BacktestDataConfig
from nautilus_trader.core.datetime import dt_to_unix_nanos
from nautilus_trader.model.data import Bar, BarType, BarSpecification
from nautilus_trader.model.enums import AccountType, OrderSide, OrderType, PositionSide, TimeInForce
from nautilus_trader.model.events import OrderFilled
from nautilus_trader.model.identifiers import InstrumentId, StrategyId, Venue
from nautilus_trader.model.objects import Money, Price, Quantity
from nautilus_trader.model.orders import MarketOrder
from nautilus_trader.model.position import Position
from nautilus_trader.persistence.catalog import ParquetDataCatalog
from nautilus_trader.trading.strategy import Strategy

# Set up logging to see the output from the backtest engine
from nautilus_trader.common.component import Logger

# Create a directory for our backtest data
CATALOG_PATH = os.path.expanduser("~/backtest_jup/data/catalog")
DATA_DIR = os.path.expanduser("~/backtest_jup/data")
if os.path.exists(CATALOG_PATH):
    shutil.rmtree(CATALOG_PATH)
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(CATALOG_PATH, exist_ok=True)

print(f"Data directory: {DATA_DIR}")
print(f"Data catalog path: {CATALOG_PATH}")

Data directory: /Users/yb/backtest_jup/data
Data catalog path: /Users/yb/backtest_jup/data/catalog


## 2. Data Preparation

We will now process historical 1-hour bar data from CSV files downloaded from the [Binance Public Data](https://github.com/binance/binance-public-data/) repository.

The function below is tailored to read these specific CSVs, which have no headers and a fixed column structure. To make this notebook runnable, we will first generate some sample CSV files in the expected format.

In [17]:
def process_binance_csv(file_path):
    """Reads a Binance klines CSV and formats it for Nautilus."""
    
    # Define the schema based on Binance's documentation
    columns = [
        'open_time', 'open', 'high', 'low', 'close', 'volume',
        'close_time', 'quote_asset_volume', 'number_of_trades',
        'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'
    ]
    
    df = pd.read_csv(file_path, header=None, names=columns)
    # Convert unix timestamp to a datetime object for the timestamp
    df['timestamp'] = pd.to_datetime(df['open_time'], unit='us')
    
    # Select and rename columns to what Nautilus expects
    df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
    
    # Add the required timestamp columns for Nautilus
    df['ts_event'] = df['timestamp'].apply(dt_to_unix_nanos)
    df['ts_init'] = df['timestamp'].apply(dt_to_unix_nanos)
    
    return df

# --- Create Sample Binance CSV files for demonstration ---
# def create_sample_csv(symbol, date_str, start_price, drift, volatility):
#     filepath = os.path.join(DATA_DIR, f"{symbol}-1h-{date_str}.csv")
#     print(f"Creating sample file: {filepath}")
#     start = datetime.strptime(date_str, "%Y-%m")
#     end = (start + pd.DateOffset(months=1)) - pd.Timedelta(hours=1)
    
#     dates = pd.to_datetime(pd.date_range(start=start, end=end, freq='1H'))
#     n_bars = len(dates)
    
#     returns = np.random.normal(loc=drift, scale=volatility, size=n_bars)
#     prices = start_price * (1 + returns).cumprod()
    
#     df = pd.DataFrame()
#     df['open_time'] = [int(d.timestamp() * 1000) for d in dates]
#     df['open'] = prices
#     df['high'] = prices * (1 + np.random.uniform(0, volatility, n_bars))
#     df['low'] = prices * (1 - np.random.uniform(0, volatility, n_bars))
#     df['close'] = prices * (1 + np.random.normal(0, volatility, n_bars))
#     df['volume'] = np.random.randint(100, 1000, n_bars)
#     df['close_time'] = [int((d + timedelta(hours=1)).timestamp() * 1000 - 1) for d in dates]
#     df['quote_asset_volume'] = df['volume'] * df['close']
#     df['number_of_trades'] = np.random.randint(50, 500, n_bars)
#     df['taker_buy_base_asset_volume'] = df['volume'] / 2
#     df['taker_buy_quote_asset_volume'] = df['quote_asset_volume'] / 2
#     df['ignore'] = 0
    
#     df.to_csv(filepath, header=False, index=False)
#     return filepath

# create_sample_csv("BTCUSDT", "2023-01", 20000, 0.0001, 0.02)
# create_sample_csv("ETHUSDT", "2023-01", 1500, 0.00015, 0.025)

# --- Process the data and write to the catalog ---
VENUE = Venue("BINANCE")
catalog = ParquetDataCatalog(CATALOG_PATH)

for symbol in ["BTCUSDT", "ETHUSDT"]:
    print(f"\nProcessing {symbol}...")
    # In a real scenario, you would list all your downloaded CSV files for the symbol here
    # For example: file_paths = glob.glob(f'{DATA_DIR}/{symbol}-1h-*.csv')
    file_paths = [os.path.join(DATA_DIR, f"{symbol}-1h-2025-01.csv")] # Using our sample file
    
    all_dfs = [process_binance_csv(fp) for fp in file_paths]
    df_combined = pd.concat(all_dfs).sort_values('timestamp').reset_index(drop=True)
    
    instrument_id_str = f"{symbol}.{VENUE}"
    print(instrument_id_str)
    bar_type = BarType(InstrumentId.from_str(instrument_id_str), BarSpecification.from_str("1h"))
    df_combined["bar_type"] = str(bar_type)
    
    table = pa.Table.from_pandas(df_combined)
    catalog.write(data=[table], bar_type=str(bar_type))
    print(f"Wrote {len(df_combined)} bars for {symbol} to catalog.")
    print(df_combined.head())


Processing BTCUSDT...
BTCUSDT.BINANCE


ValueError: The `BarSpecification` string value was malformed, was 1H

## 3. Define Trading Instruments

We need to define the `Instrument` objects for the assets we want to trade. This includes information like the asset pair, venue, price precision, and quantity precision.

In [None]:
from nautilus_trader.model.identifiers import AssetId
from nautilus_trader.model.instruments import Instrument

# Configure a simulated venue with a simple percentage-based commission
venue_config = BacktestVenueConfig(
    name="BINANCE",
    oms_type="HEDGING", # Use HEDGING for simple portfolio management
    account_type=AccountType.CASH,
    base_currency=AssetId.from_str("USDT"),
    starting_balances=[Money(10_000, AssetId.from_str("USDT"))],
    # Apply a 0.1% taker fee to simulate exchange costs
    maker_fee_rate=Decimal("0.001"), 
    taker_fee_rate=Decimal("0.001"), 
)

def make_crypto_instrument(symbol: str, price_precision: int, size_precision: int) -> Instrument:
    """Helper function to create a crypto instrument."""
    base, quote = symbol.split("-")
    return Instrument(
        instrument_id=InstrumentId.from_str(f"{symbol}.{VENUE}"),
        raw_symbol=f"{base}{quote}",
        base_asset=AssetId.from_str(base),
        quote_asset=AssetId.from_str(quote),
        price_precision=price_precision,
        size_precision=size_precision,
        lot_size=Quantity(10 ** -size_precision, size_precision),
    )

instruments = [
    make_crypto_instrument("BTC-USDT", price_precision=2, size_precision=5),
    make_crypto_instrument("ETH-USDT", price_precision=2, size_precision=4),
]

print("Defined Instruments:")
for inst in instruments:
    print(inst)

## 4. Implement Trading Strategies

Here we define the logic for our two strategies.

### Hodler Strategy

This is the simplest strategy. It allocates a portion of the starting capital to each asset, buys them on the first data tick (bar), and then holds them for the entire duration of the backtest.

In [None]:
class Hodler(Strategy):
    """
    A simple buy-and-hold strategy. 
    Buys a fixed dollar amount of each subscribed instrument at the start.
    """
    def __init__(self, strategy_id: StrategyId, instruments: list[Instrument]):
        super().__init__(strategy_id)
        self.instruments = instruments
        self.has_invested = False

    def on_start(self):
        """Called once at the start of the backtest."""
        self.subscribe_bars([BarType.from_instrument(inst, "1H") for inst in self.instruments])
        self.log.info(f"{self.strategy_id}: Subscribed to bars.")

    def on_bar(self, bar: Bar):
        """Called on each bar of data."""
        if self.has_invested:
            return # Only invest once
        
        # Get total equity and calculate allocation per instrument
        account = self.portfolio.account(AccountType.CASH)
        total_equity = self.portfolio.equity(account)
        allocation_per_instrument = total_equity / len(self.instruments)

        self.log.info(
            f"{self.strategy_id}: Initial investment event. Total equity: {total_equity:.2f}"
        )

        for instrument in self.instruments:
            if bar.bar_type.instrument_id != instrument.instrument_id:
                continue
            
            # Calculate order quantity based on allocation and current price
            quantity = allocation_per_instrument.value / bar.close
            order_qty = instrument.make_quantity(quantity)
            
            self.log.info(
                f"{self.strategy_id}: Submitting market order for {order_qty} of {instrument.id}"
            )
            
            # Submit a market order to buy
            order = MarketOrder(
                strategy_id=self.id,
                instrument_id=instrument.id,
                quantity=order_qty,
                side=OrderSide.BUY,
                time_in_force=TimeInForce.GTC,
            )
            self.submit_order(order)

        self.has_invested = True

    def on_order_filled(self, report: OrderFilled):
        """Log when an order has been filled."""
        self.log.info(
            f"{self.strategy_id}: FILLED {report.side} {report.filled_qty} of {report.instrument_id} at {report.avg_price}"
        )

    def on_stop(self):
        """Called once at the end of the backtest."""
        self.log.info(f"{self.strategy_id}: Backtest finished.")

### Index Rebalance Strategy

This strategy also starts by buying the assets, but it includes logic to periodically rebalance the portfolio to maintain a target weight for each asset. We'll set a rebalance interval (e.g., every 7 days) and on that schedule, the strategy will adjust the holdings by buying or selling.

In [None]:
class IndexRebalance(Strategy):
    """
    A strategy that maintains a target weight for each instrument in the portfolio,
    rebalancing at a fixed interval.
    """
    def __init__(self, strategy_id: StrategyId, instruments: list[Instrument], rebalance_period_days: int = 7):
        super().__init__(strategy_id)
        self.instruments = instruments
        self.instrument_ids = [inst.id for inst in instruments]
        self.rebalance_period_ns = timedelta(days=rebalance_period_days).total_seconds() * 1e9
        
        # Target a 50/50 portfolio allocation
        self.target_weights = {inst.id: 1.0 / len(instruments) for inst in instruments}
        self.is_rebalancing = False
        
    def on_start(self):
        """Set up subscriptions and schedule the rebalancing event."""
        bar_types = [BarType.from_instrument(inst, "1H") for inst in self.instruments]
        self.subscribe_bars(bar_types)
        self.log.info(f"{self.strategy_id}: Subscribed to bars.")
        
        # Schedule the first rebalance and all subsequent ones
        self.clock.schedule_interval(self.rebalance, self.rebalance_period_ns)

    def rebalance(self):
        """The core rebalancing logic."""
        self.log.info(f"{self.strategy_id}: === REBALANCING EVENT at {self.clock.timestamp_ns_to_datetime(self.clock.timestamp_ns)} ===")
        self.is_rebalancing = True
        
        account = self.portfolio.account(AccountType.CASH)
        total_equity = self.portfolio.equity(account)
        
        for instrument_id, target_weight in self.target_weights.items():
            target_value = total_equity * target_weight
            
            # Get current position and value
            position = self.portfolio.get_position(instrument_id)
            current_price = self.get_last_bar(position.instrument_id).close
            current_value = position.quantity_as_decimal * current_price
            
            # Calculate the difference and the required trade quantity
            value_diff = target_value.value - current_value
            trade_qty_decimal = value_diff / current_price
            
            if abs(trade_qty_decimal) < instrument_id.instrument.lot_size:
                self.log.info(f"{self.strategy_id}: No rebalance needed for {instrument_id}.")
                continue

            trade_qty = instrument_id.instrument.make_quantity(abs(trade_qty_decimal))
            side = OrderSide.BUY if trade_qty_decimal > 0 else OrderSide.SELL
            
            self.log.info(
                f"{self.strategy_id}: Rebalancing {instrument_id}. Target: ${target_value.value:,.2f}, Current: ${current_value:,.2f}. Will {side} {trade_qty}."
            )

            order = MarketOrder(
                strategy_id=self.id,
                instrument_id=instrument_id,
                quantity=trade_qty,
                side=side,
                time_in_force=TimeInForce.IOC,  # Use IOC for rebalancing orders
            )
            self.submit_order(order)
        
        self.is_rebalancing = False

    def on_bar(self, bar: Bar):
        """Handle the initial investment on the first bar."""
        if not self.portfolio.has_been_funded():
            self.is_rebalancing = True # Mark as rebalancing to allow initial trades
        
        # Only perform initial investment if not funded yet
        if not self.portfolio.is_funded(AccountType.CASH) and not self.is_rebalancing:
            self.rebalance()

    def on_order_filled(self, report: OrderFilled):
        """Log filled orders."""
        self.log.info(
            f"{self.strategy_id}: FILLED {report.side} {report.filled_qty} of {report.instrument_id} at {report.avg_price}"
        )

    def on_stop(self):
        """Log end of backtest."""
        self.log.info(f"{self.strategy_id}: Backtest finished.")

## 5. Configure and Run the Backtest

Now we put everything together. We'll create instances of our strategies and configure the `BacktestNode` to run them simultaneously. This allows for a direct, side-by-side comparison.

In [None]:
# Create unique IDs for our strategies
hodler_strategy_id = StrategyId("HODLER")
rebalance_strategy_id = StrategyId("REBALANCE")

config = BacktestRunConfig(
    engine={
        "logger": {"log_level": LogLevel.INFO, "handler_type": "stream"},
    },
    data=BacktestDataConfig(catalog_path=CATALOG_PATH),
    venues=[venue_config],
    strategies=[
        Hodler(strategy_id=hodler_strategy_id, instruments=instruments),
        IndexRebalance(strategy_id=rebalance_strategy_id, instruments=instruments, rebalance_period_days=30),
    ],
)

# Instantiate and run the backtest node
node = BacktestNode(config)
results = node.run()

print("\nBacktest finished!")

## 6. Analyze and Compare Results

With the backtest complete, we can now analyze the performance of each strategy.

We'll extract the portfolio equity logs for both strategies and plot them on a chart to visualize their performance over time. We will also print the final equity for each.

In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker

# Set plotting style
plt.style.use('seaborn-v0_8-darkgrid')

# Extract portfolio logs for each strategy
hodler_log = results.trader.portfolio_log(strategy_id=hodler_strategy_id)
df_hodler = pd.DataFrame(hodler_log)
df_hodler['timestamp'] = pd.to_datetime(df_hodler['timestamp'], unit='ns')
df_hodler = df_hodler.set_index('timestamp')

rebalance_log = results.trader.portfolio_log(strategy_id=rebalance_strategy_id)
df_rebalance = pd.DataFrame(rebalance_log)
df_rebalance['timestamp'] = pd.to_datetime(df_rebalance['timestamp'], unit='ns')
df_rebalance = df_rebalance.set_index('timestamp')

# Plot the equity curves
fig, ax = plt.subplots(figsize=(14, 7))

ax.plot(df_hodler.index, df_hodler['equity'], label='Hodler Strategy', color='cyan')
ax.plot(df_rebalance.index, df_rebalance['equity'], label='Index Rebalance Strategy (30 days)', color='magenta')

ax.set_title('Strategy Performance Comparison (BTC/ETH)', fontsize=16)
ax.set_ylabel('Portfolio Equity (USDT)', fontsize=12)
ax.set_xlabel('Date', fontsize=12)
ax.legend(fontsize=12)

# Format y-axis to show dollar values
formatter = mticker.FormatStrFormatter('$%.0f')
ax.yaxis.set_major_formatter(formatter)

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Print final equity
final_equity_hodler = df_hodler['equity'].iloc[-1]
final_equity_rebalance = df_rebalance['equity'].iloc[-1]

print(f"Starting Equity: ${df_hodler['equity'].iloc[0]:,.2f}")
print(f"Final Equity (Hodler):      ${final_equity_hodler:,.2f}")
print(f"Final Equity (Rebalance):   ${final_equity_rebalance:,.2f}")

### Performance Statistics

Finally, let's generate and display detailed performance statistics for each strategy using the built-in reporting functionality.

In [None]:
print("--- HODLER STRATEGY STATS ---")
results.trader.strategies[hodler_strategy_id].report.print()

print("\n--- INDEX REBALANCE STRATEGY STATS ---")
results.trader.strategies[rebalance_strategy_id].report.print()