In [None]:
# Imports and setup
import os
import glob
import pandas as pd
import numpy as np
import ccxt
import vectorbt as vbt
from gluonts.dataset.common import ListDataset
from gluonts.torch.model.deepar import DeepAREstimator
import concurrent.futures
import torch

# Use Tensor Cores for faster ops
torch.set_float32_matmul_precision('high')
print("Starting DeepAR backtest pipeline with enhanced logging...")

# --- Config ---
DATA_DIR = "data/binance_backtesting"
os.makedirs(DATA_DIR, exist_ok=True)
prediction_length = 10
context_length = 120
init_cash = 100000.0
max_position_size = 0.10  # Maximum 10% per trade
stop_loss = 0.03      # 3% stop-loss
take_profit = 0.07    # 7% take-profit
max_workers = 32      # threads for signal generation
fetch_workers = 8     # threads for fetching

# Bollinger Bands params
bb_window = 20
bb_std_factor = 2

# VMA params
vma_window = 120

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- Step 1: Parallel fetch & cache data ---
print("[Step 1] Fetching OHLCV in parallel...")
exchange = ccxt.binanceus()
exchange.load_markets()
usdt_symbols = [s for s in exchange.symbols if s.endswith('/USDT')]
if not usdt_symbols:
    raise RuntimeError("No USDT symbols found")
if len(usdt_symbols) > 10:
    print(f"    Found {len(usdt_symbols)} symbols; fetching first 10 for demo.")
    usdt_symbols = usdt_symbols[:10]

six_weeks_ms = 42 * 24 * 60 * 60 * 1000
end_time = exchange.milliseconds()
start_time = end_time - six_weeks_ms

def fetch_and_cache(symbol: str):
    print(f"    [Fetch] {symbol}")
    fname = symbol.replace('/', '_') + '.parquet'
    path = os.path.join(DATA_DIR, fname)
    if os.path.exists(path):
        print(f"      Cached, skipping")
        return
    all_ohlcv = []
    since_ms = start_time
    while True:
        chunk = exchange.fetch_ohlcv(symbol, '1m', since=since_ms, limit=1000)
        if not chunk:
            break
        all_ohlcv.extend(chunk)
        since_ms = chunk[-1][0] + 60_000
        if len(chunk) < 1000:
            break
    df = pd.DataFrame(all_ohlcv, columns=['timestamp','open','high','low','close','volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df['symbol'] = symbol
    df.to_parquet(path)
    print(f"      Saved {len(df)} rows for {symbol}")

with concurrent.futures.ThreadPoolExecutor(max_workers=fetch_workers) as ex:
    ex.map(fetch_and_cache, usdt_symbols)
print("[Step 1] Fetch complete\n")

# --- Step 1b: Load data + compute indicators ---
print("[Step 1b] Loading data and computing RSI, Bollinger Bands, VMA...")
market_data = {}
for path in glob.glob(os.path.join(DATA_DIR, '*.parquet')):
    sym = os.path.basename(path).replace('.parquet','').replace('_','/')
    df = pd.read_parquet(path)
    
    # Handle the case where timestamp might already be an index or have a different name
    if 'timestamp' in df.columns:
        df = df.set_index('timestamp')
    elif df.index.name != 'timestamp' and df.index.dtype != 'datetime64[ns]':
        # Try to find a datetime column to use as index
        datetime_cols = [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]
        if datetime_cols:
            df = df.set_index(datetime_cols[0])
        else:
            print(f"Warning: No timestamp column found for {sym}, using default index")
    
    # RSI
    delta = df['close'].diff()
    up = delta.clip(lower=0); down = -delta.clip(upper=0)
    ma_up = up.ewm(span=14, adjust=False).mean()
    ma_down = down.ewm(span=14, adjust=False).mean()
    df['rsi'] = 100 - (100/(1 + ma_up/(ma_down + 1e-9)))
    # Bollinger Bands
    df['bb_mid'] = df['close'].rolling(bb_window).mean()
    std = df['close'].rolling(bb_window).std()
    df['bb_upper'] = df['bb_mid'] + bb_std_factor * std
    df['bb_lower'] = df['bb_mid'] - bb_std_factor * std
    # Volume Moving Average
    df['vma'] = df['volume'].rolling(vma_window).mean()
    df.bfill(inplace=True)
    df.fillna(0, inplace=True)
    market_data[sym] = df
print(f"    Prepared indicators for {len(market_data)} symbols\n")

# --- Step 2: Train DeepAR with sliding windows ---
print("[Step 2] Preparing sliding-window training data...")
train_items = []
window_stride = prediction_length
for symbol, df in market_data.items():
    n = len(df)
    cutoff = n - (14 * 24 * 60)
    if cutoff <= context_length + prediction_length:
        print(f"    Skipping {symbol}: insufficient data (n={n})")
        continue
    for end_i in range(context_length + prediction_length, cutoff, window_stride):
        start_i = end_i - (context_length + prediction_length)
        train_items.append({
            'target': df['close'].iloc[start_i:end_i].values,
            'feat_dynamic_real': [
                df['volume'].iloc[start_i:end_i].values,
                df['rsi'].iloc[start_i:end_i].values,
                df['bb_upper'].iloc[start_i:end_i].values,
                df['bb_lower'].iloc[start_i:end_i].values,
                df['vma'].iloc[start_i:end_i].values
            ],
            'start': df.index[start_i],
            'item_id': symbol
        })
print(f"    Training on {len(train_items)} windows")
training_ds = ListDataset(train_items, freq='T')
estimator = DeepAREstimator(
    freq='T',
    prediction_length=prediction_length,
    context_length=context_length,
    num_layers=3,
    hidden_size=100,
    batch_size=1024,
    trainer_kwargs={
        'max_epochs': 6,
        'accelerator': 'gpu' if device.type=='cuda' else 'cpu',
        'devices': 1,
        'precision': 16,
        'logger': False,
        'limit_val_batches': 0,
    }
)
predictor = estimator.train(training_data=training_ds)
if hasattr(predictor, 'network'):
    predictor.network.to(device)
    predictor.network.eval()
    predictor.network.half()
print("    Training complete\n")

# --- Step 3: Precompute and inference separation ---
print("[Step 3] Precomputing sliding windows on CPU...")
from numpy.lib.stride_tricks import sliding_window_view
windows_data = {}
for symbol, df in market_data.items():
    n = len(df)
    test_win = 14 * 24 * 60
    start_i = max(context_length, n - test_win)
    end_i   = n - prediction_length
    if end_i - start_i <= 0:
        continue
    closes_np = sliding_window_view(df['close'].values, context_length)[start_i-context_length:end_i-context_length]
    vols_np   = sliding_window_view(df['volume'].values, context_length)[start_i-context_length:end_i-context_length]
    rsi_np    = sliding_window_view(df['rsi'].values, context_length)[start_i-context_length:end_i-context_length]
    bb_up_np  = sliding_window_view(df['bb_upper'].values, context_length)[start_i-context_length:end_i-context_length]
    bb_low_np = sliding_window_view(df['bb_lower'].values, context_length)[start_i-context_length:end_i-context_length]
    vma_np    = sliding_window_view(df['vma'].values, context_length)[start_i-context_length:end_i-context_length]
    idxs = np.arange(start_i, end_i)
    windows_data[symbol] = (closes_np, vols_np, rsi_np, bb_up_np, bb_low_np, vma_np, idxs)
print("[Step 3] CPU aggregation complete")

print("[Step 3] Preparing global dataset for GPU inference...")
all_ds_items, index_map = [], []
for symbol, (closes_np, vols_np, rsi_np, bb_up_np, bb_low_np, vma_np, idxs) in windows_data.items():
    for close_win, vol_win, rsi_win, bb_up_win, bb_low_win, vma_win, idx in zip(
        closes_np, vols_np, rsi_np, bb_up_np, bb_low_np, vma_np, idxs
    ):
        all_ds_items.append({
            'target': close_win.tolist(),
            'feat_dynamic_real': [
                vol_win.tolist(),
                rsi_win.tolist(),
                bb_up_win.tolist(),
                bb_low_win.tolist(),
                vma_win.tolist()
            ],
            'start': market_data[symbol].index[idx-context_length]
        })
        index_map.append((symbol, idx))
print(f"    Aggregated {len(all_ds_items)} windows into one dataset")

dataset = ListDataset(all_ds_items, freq='T')
print("[Step 3] Running inference on GPU…")
with torch.amp.autocast(device.type, dtype=torch.float16):
    forecasts = list(predictor.predict(dataset, num_samples=1))

# --- Step 4: Map signals and backtest ---
price_df = pd.DataFrame({s: market_data[s]['close'] for s in market_data}).dropna(axis=1)
n = min(map(len, price_df.values.T))
price_df = price_df.iloc[-n:]
symbols = price_df.columns.tolist()
entries = {sym: np.zeros(len(price_df), dtype=bool) for sym in symbols}
exits   = {sym: np.zeros(len(price_df), dtype=bool) for sym in symbols}

# Store prediction confidence for dynamic position sizing
confidence_scores = {sym: np.zeros(len(price_df)) for sym in symbols}

for (symbol, idx), fc in zip(index_map, forecasts):
    if symbol not in symbols:
        continue
    rel_idx = idx - (len(market_data[symbol]) - len(price_df))
    if not 0 <= rel_idx < len(price_df):
        continue
    price = price_df[symbol].values[rel_idx]
    mean_pred = float(fc.mean[-1])
    pred_percentile = float(fc.quantile(0.975)[-1])
    
    # Calculate prediction confidence based on the mean prediction relative to current price
    # Normalize confidence between 0.0 and 1.0
    predicted_return = (mean_pred / price) - 1.0
    
    # Only consider positive expected returns
    if predicted_return > 0.05:  # 5% threshold for entry
        entries[symbol][rel_idx] = True
        
        # Store confidence score based on expected return (capped at 0.5 for safety)
        confidence = min(predicted_return, 0.5)
        confidence_scores[symbol][rel_idx] = confidence
        
        exit_point = rel_idx + prediction_length
        if exit_point < len(price_df):
            exits[symbol][exit_point] = True

print("[Step 4] Calculating dynamic position sizes based on prediction confidence...")
# Create dynamic position sizing based on prediction confidence
position_sizes = {}
for symbol in symbols:
    # Scale the confidence scores to position sizes between 0.01 (1%) and max_position_size (10%)
    # Higher confidence = larger position size, but never exceeding max_position_size
    position_sizes[symbol] = np.zeros(len(price_df))
    for i in range(len(price_df)):
        if entries[symbol][i]:
            # Calculate position size: min position size (1%) + scaled confidence (up to 9% more)
            # This ensures position size is between 1% and max_position_size
            position_sizes[symbol][i] = 0.01 + (confidence_scores[symbol][i] * (max_position_size - 0.01))
            # Ensure we don't exceed max position size
            position_sizes[symbol][i] = min(position_sizes[symbol][i], max_position_size)

# Convert position sizes to DataFrame
position_size_df = pd.DataFrame(position_sizes, index=price_df.index)

print("[Step 4] Backtesting with vectorbt with dynamic sizing, SL/TP...")
entries_df = pd.DataFrame(entries, index=price_df.index)
exits_df   = pd.DataFrame(exits,   index=price_df.index)

# Create portfolio with dynamic position sizing
pf = vbt.Portfolio.from_signals(
    close=price_df,
    entries=entries_df,
    exits=exits_df,
    init_cash=init_cash,
    fees=0.001,
    slippage=0.001,
    size=position_size_df,  # Now using the dynamic position sizes dataframe
    size_type='Percent',
    cash_sharing=True
)
print("    Dynamic position sizing backtest complete")

# Apply stop-loss and take-profit
print("    Applying stop-loss and take-profit overlays...")
try:
    pf = pf.apply_stop_loss(stop_loss, sl_stop_type='percent')
    pf = pf.apply_take_profit(take_profit, tp_stop_type='percent')
    print("    Stop-loss and take-profit applied")
except AttributeError:
    print("    Stop-loss/take-profit methods not available in this VectorBT version. Please upgrade to v0.26+ to use overlays.")

print("    Final backtest ready    Backtest complete")
stats = pf.stats()
print(stats)
stats.to_csv(os.path.join(DATA_DIR, "backtest_stats.csv"))

# Save position sizing statistics
position_size_stats = {
    'mean': position_size_df.mean(),
    'median': position_size_df.median(),
    'min': position_size_df.min(),
    'max': position_size_df.max(),
    'std': position_size_df.std()
}
pd.DataFrame(position_size_stats).to_csv(os.path.join(DATA_DIR, "position_size_stats.csv"))

# Create plots
import matplotlib.pyplot as plt
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# Plot overall performance
pf.plot(ax=ax1)
ax1.set_title('Portfolio Performance')

# Plot position sizes over time
for symbol in symbols:
    sizes = position_size_df[symbol][position_size_df[symbol] > 0]
    if len(sizes) > 0:
        ax2.scatter(sizes.index, sizes.values, label=symbol, alpha=0.7)
ax2.set_ylim(0, max_position_size * 1.1)
ax2.set_title('Dynamic Position Sizes Over Time')
ax2.set_ylabel('Position Size (%)')
ax2.legend()

plt.tight_layout()
plt.savefig(os.path.join(DATA_DIR, "backtest_results.png"))
plt.show()

print("Pipeline finished")

Starting DeepAR backtest pipeline with enhanced logging...
Using device: cuda
[Step 1] Fetching OHLCV in parallel...
    Found 192 symbols; fetching first 10 for demo.
    [Fetch] 1INCH/USDT
      Cached, skipping
    [Fetch] AAVE/USDT
      Cached, skipping
    [Fetch] ACH/USDT
    [Fetch] ADA/USDT
      Cached, skipping
    [Fetch] ADX/USDT
      Cached, skipping
      Cached, skipping
    [Fetch] AIXBT/USDT
    [Fetch] ALGO/USDT
    [Fetch] ALICE/USDT
      Cached, skipping
    [Fetch] ALPINE/USDT
      Cached, skipping
      Cached, skipping
      Cached, skipping
    [Fetch] ANKR/USDT
      Cached, skipping
[Step 1] Fetch complete

[Step 1b] Loading data and computing RSI, Bollinger Bands, VMA...
    Prepared indicators for 192 symbols

[Step 2] Preparing sliding-window training data...
    Skipping AIXBT/USDT: insufficient data (n=18373)
    Skipping ANT/USDT: insufficient data (n=0)
    Skipping BOND/USDT: insufficient data (n=0)
    Skipping BUSD/USDT: insufficient data (n=0)
 

C:\Users\saber\anaconda3\Lib\site-packages\lightning\fabric\connector.py:571: `precision=16` is supported for historical reasons but its usage is discouraged. Please set your precision to 16-mixed instead!
Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
C:\Users\saber\anaconda3\Lib\site-packages\lightning\pytorch\trainer\configuration_validator.py:70: You defined a `validation_step` but have no `val_dataloader`. Skipping val loop.
C:\Users\saber\anaconda3\Lib\site-packages\lightning\pytorch\callbacks\model_checkpoint.py:654: Checkpoint directory C:\Users\saber\OneDrive\Documents\GitHub\CryptoBot4Dummies\checkpoints exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type        | Params | Mode  | In sizes                                                      | Out sizes   
-----------------------------------------------------------------------------

Training: |          | 0/? [00:00<?, ?it/s]

Epoch 0, global step 50: 'train_loss' reached 0.68300 (best 0.68300), saving model to 'C:\\Users\\saber\\OneDrive\\Documents\\GitHub\\CryptoBot4Dummies\\checkpoints\\epoch=0-step=50-v33.ckpt' as top 1
Epoch 1, global step 100: 'train_loss' reached -2.25782 (best -2.25782), saving model to 'C:\\Users\\saber\\OneDrive\\Documents\\GitHub\\CryptoBot4Dummies\\checkpoints\\epoch=1-step=100.ckpt' as top 1
Epoch 2, global step 150: 'train_loss' was not in top 1
Epoch 3, global step 200: 'train_loss' was not in top 1
Epoch 4, global step 250: 'train_loss' reached -2.33908 (best -2.33908), saving model to 'C:\\Users\\saber\\OneDrive\\Documents\\GitHub\\CryptoBot4Dummies\\checkpoints\\epoch=4-step=250-v2.ckpt' as top 1
Epoch 5, global step 300: 'train_loss' was not in top 1
`Trainer.fit` stopped: `max_epochs=6` reached.


    Training complete

[Step 3] Precomputing sliding windows on CPU...
[Step 3] CPU aggregation complete
[Step 3] Preparing global dataset for GPU inference...
