In [1]:
import sys
import os
from pathlib import Path
import asyncio
import time

# Find project root from current notebook location
current_dir = Path.cwd()
if 'examples' in str(current_dir):
    # When running from examples folder
    project_root = current_dir.parent.parent
else:
    # When running from project root
    project_root = current_dir

# Add project root to Python path
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Import required modules
from typing import List, Dict, Any, Optional
from datetime import datetime
import ray

from quantbt import (
    TradingStrategy,   
    BacktestConfig,    
    # Order related
    Order, OrderSide, OrderType,
)

# Ray-based optimization system
from quantbt.ray import (
    RayClusterManager,
    RayDataManager
)

# Simplified Actor structure
from quantbt.ray.backtest_actor import BacktestActor

# Newly developed monitoring system
from quantbt.ray.monitoring import ProgressTracker, SimpleMonitor

In [2]:
class SimpleSMAStrategy(TradingStrategy):
    """SMA Strategy (for Ray optimization)
    
    High-performance streaming approach:
    - Indicator calculation: Polars vector operations
    - Signal generation: Row-by-row streaming processing
    
    Buy: Price above buy_sma
    Sell: Price below sell_sma
    """
    
    def __init__(self, buy_sma: int = 15, sell_sma: int = 30):
        super().__init__(
            name="SimpleSMAStrategy",
            config={
                "buy_sma": buy_sma,
                "sell_sma": sell_sma
            },
            position_size_pct=0.8,  # 80% position size
            max_positions=1
        )
        self.buy_sma = buy_sma
        self.sell_sma = sell_sma
        
    def _compute_indicators_for_symbol(self, symbol_data):
        """Calculate moving average indicators by symbol (Polars vector operations)"""
        
        # Check chronological sorting
        data = symbol_data.sort("timestamp")
        
        # Calculate simple moving averages
        buy_sma = self.calculate_sma(data["close"], self.buy_sma)
        sell_sma = self.calculate_sma(data["close"], self.sell_sma)
        
        # Add indicator columns (prevent duplicates)
        columns_to_add = []
        
        # Add buy_sma column
        buy_sma_name = f"sma_{self.buy_sma}"
        columns_to_add.append(buy_sma.alias(buy_sma_name))
        
        # Add sell_sma column (check for duplicates)
        sell_sma_name = f"sma_{self.sell_sma}"
        if sell_sma_name != buy_sma_name:  # Add only if not duplicate
            columns_to_add.append(sell_sma.alias(sell_sma_name))
        
        return data.with_columns(columns_to_add)
    
    def generate_signals(self, current_data: Dict[str, Any]) -> List[Order]:
        """Generate signals based on row data"""
        orders = []
        
        if not self.broker:
            return orders
        
        symbol = current_data['symbol']
        current_price = current_data['close']
        
        # Get SMA values (only one column exists if same value)
        buy_sma_name = f'sma_{self.buy_sma}'
        sell_sma_name = f'sma_{self.sell_sma}'
        
        buy_sma = current_data.get(buy_sma_name)
        if buy_sma_name == sell_sma_name:
            sell_sma = buy_sma  # Same SMA value case
        else:
            sell_sma = current_data.get(sell_sma_name)
        
        # Skip if indicators are not calculated
        if buy_sma is None or sell_sma is None:
            return orders
        
        current_positions = self.get_current_positions()
        
        # Buy signal: price above buy_sma + no position
        if current_price > buy_sma and symbol not in current_positions:
            portfolio_value = self.get_portfolio_value()
            quantity = self.calculate_position_size(symbol, current_price, portfolio_value)
            
            if quantity > 0:
                order = Order(
                    symbol=symbol,
                    side=OrderSide.BUY,
                    quantity=quantity,
                    order_type=OrderType.MARKET
                )
                orders.append(order)
        
        # Sell signal: price below sell_sma + position exists
        elif current_price < sell_sma and symbol in current_positions and current_positions[symbol] > 0:
            order = Order(
                symbol=symbol,
                side=OrderSide.SELL,
                quantity=current_positions[symbol],
                order_type=OrderType.MARKET
            )
            orders.append(order)
        
        return orders


In [None]:
# 1. Initialize monitoring system
print("🔧 Initializing monitoring system...")
progress_tracker = None  # Initialize later
simple_monitor = SimpleMonitor()
print("✅ SimpleMonitor initialization completed")

# 2. RayClusterManager setup and initialization
ray_cluster_config = {
    "num_cpus": 32,
    "object_store_memory": 1000 * 1024 * 1024 * 8,  # 8GB
    "ignore_reinit_error": True,
    "logging_level": "INFO"  # Changed to INFO for debugging
}

cluster_manager = RayClusterManager(ray_cluster_config)

print("🔧 Initializing Ray cluster...")
if not cluster_manager.initialize_cluster():
    print("❌ Ray cluster initialization failed")
else:
    print("✅ Ray cluster initialization complete")

# 3. Output cluster status and resource information
cluster_resources = cluster_manager.get_cluster_resources()
available_resources = cluster_manager.get_available_resources()

print(f"📊 Cluster resources:")
print(f"   - Total CPU: {cluster_resources['cpu']}")
print(f"   - Object Store: {cluster_resources['object_store']:,} bytes")
print(f"   - Number of nodes: {cluster_resources['nodes']}")
print(f"   - Available CPU: {available_resources['cpu']}")

# 4. Basic backtest configuration
config = BacktestConfig(
    symbols=["KRW-BTC"],
    start_date=datetime(2024, 1, 1),
    end_date=datetime(2024, 12, 31),
    timeframe="1d",
    initial_cash=10_000_000,
    commission_rate=0.0,
    slippage_rate=0.0,
    save_portfolio_history=False
)
print("✅ Backtest configuration completed")

🔧 Initializing monitoring system...
✅ SimpleMonitor initialization completed
🔧 Initializing Ray cluster...


2025-06-27 09:56:56,732	INFO worker.py:1908 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


✅ Ray cluster initialization complete
📊 Cluster resources:
   - Total CPU: 32.0
   - Object Store: 8,388,608,000.0 bytes
   - Number of nodes: 1
   - Available CPU: 32.0
✅ Backtest configuration completed


처리중... 365/365: 100%|██████████| 365/365 [00:00<00:00]


In [None]:
# 5. Parameter grid definition and combination generation
param_grid = {
    'buy_sma': [10, 15, 20, 25],      # Buy SMA: 10, 15, 20, 25
    'sell_sma': [25, 30, 35, 40]      # Sell SMA: 25, 30, 35, 40
}
total_combinations = len(param_grid['buy_sma']) * len(param_grid['sell_sma'])
print(f"\n✅ Parameter grid definition completed: {total_combinations} combinations")
print(f"   - Buy SMA: {param_grid['buy_sma']}")
print(f"   - Sell SMA: {param_grid['sell_sma']}")

# 6. RayDataManager creation and data loading
print("\n🔧 Creating RayDataManager and loading data")
data_manager = RayDataManager.remote()
print("✅ RayDataManager creation completed")

# Pre-load data (zero-copy method)
print("📊 Loading actual data... (zero-copy method)")

# load_real_data already returns ray.ObjectRef so await is not needed
data_ref = data_manager.load_real_data.remote(
    symbols=config.symbols,
    start_date=config.start_date,
    end_date=config.end_date,
    timeframe=config.timeframe
)

# 7. Worker environment preparation (based on actual task count)
worker_env = cluster_manager.prepare_worker_environment(
    expected_tasks=total_combinations,  # Pass actual combination count
    memory_per_task_mb=200  # Memory per task
)

print(f"🎯 Worker environment preparation:")
print(f"   - Optimal number of workers: {worker_env['optimal_workers']}")
print(f"   - Memory per task: {worker_env['memory_per_task_mb']}MB")


✅ Parameter grid definition completed: 16 combinations
   - Buy SMA: [10, 15, 20, 25]
   - Sell SMA: [25, 30, 35, 40]

🔧 Creating RayDataManager and loading data
✅ RayDataManager creation completed
📊 Loading actual data... (zero-copy method)
🎯 Worker environment preparation:
   - Optimal number of workers: 32
   - Memory per task: 200MB


In [5]:
# 8. BacktestActor creation (simplified structure)
num_actors = worker_env['optimal_workers']
print(f"\n🎯 Creating {num_actors} BacktestActors... (shared_data_ref only)")

actors = []
for i in range(num_actors):
    # Pass only shared_data_ref (best performance)
    actor = BacktestActor.remote(f"actor_{i}", shared_data_ref=data_ref)
    actors.append(actor)

# Actor initialization
config_dict = {
    'symbols': config.symbols,
    'start_date': config.start_date,
    'end_date': config.end_date,
    'timeframe': config.timeframe,
    'initial_cash': config.initial_cash,
    'commission_rate': config.commission_rate,
    'slippage_rate': config.slippage_rate,
    'save_portfolio_history': config.save_portfolio_history
    # shared_data_ref is already passed during Actor creation
}

init_results = await asyncio.gather(*[
    actor.initialize_engine.remote(config_dict) for actor in actors
])

successful_actors = sum(init_results)
print(f"✅ BacktestActor initialization: {successful_actors}/{num_actors} successful")


🎯 Creating 32 BacktestActors... (shared_data_ref only)
✅ BacktestActor initialization: 32/32 successful


In [6]:
# 9. Generate parameter combinations
from itertools import product
param_combinations = []
for buy_sma, sell_sma in product(param_grid['buy_sma'], param_grid['sell_sma']):
    param_combinations.append({
        'buy_sma': buy_sma,
        'sell_sma': sell_sma
    })

# 10. Initialize and start progress tracker
print("\n⚡ Starting distributed backtest execution with monitoring system")
print("=" * 70)

# Now that we know total_combinations, initialize ProgressTracker
progress_tracker = ProgressTracker(total_tasks=total_combinations)
progress_tracker.start()
print(f"✅ ProgressTracker initialization completed (total {total_combinations} tasks)")

optimization_start = time.time()

# Distribute tasks by Actor
tasks = []
for i, params in enumerate(param_combinations):
    actor_idx = i % len(actors)
    actor = actors[actor_idx]
    
    task = actor.execute_backtest.remote(params, SimpleSMAStrategy)
    tasks.append((i, params, task))


⚡ Starting distributed backtest execution with monitoring system
✅ ProgressTracker initialization completed (total 16 tasks)


In [7]:
# 11. Wait for task completion with real-time monitoring
print(f"📊 Executing {total_combinations} backtests in parallel with real-time monitoring... ")
print("-" * 70)

completed_tasks = 0
results = []

# Set update interval for real-time progress display
update_interval = max(1, total_combinations // 10)  # Update in 10% increments

for i, (task_id, params, task) in enumerate(tasks):
    try:
        # Wait for backtest results
        result = await task
        completed_tasks += 1
        
        # Save results
        backtest_result = {
            'params': params,
            'result': result,
            'success': True,
            'task_id': task_id
        }
        results.append(backtest_result)
        
        # Record results in SimpleMonitor
        monitor_result = {
            'success': True,
            'sharpe_ratio': result.get('sharpe_ratio', 0.0),
            'total_return': result.get('total_return', 0.0),
            'params': params,
            'execution_time': 0.0  # Individual task time not measured
        }
        simple_monitor.record_result(monitor_result)
        
        # Update progress (increment by 1)
        progress_tracker.update(1)
        
        # Periodically output progress
        if completed_tasks % update_interval == 0 or completed_tasks == total_combinations:
            progress_info = progress_tracker.get_progress()
            eta_info = progress_tracker.get_eta()
            progress_text = progress_tracker.format_progress(show_bar=True)
            
            print(f"📈 {progress_text}, ETA: {eta_info}")
            
            # Output intermediate statistics (when 5 or more tasks completed)
            if completed_tasks >= 5:
                current_stats = simple_monitor.get_statistics()
                print(f"   💡 Current statistics: Success rate {current_stats['success_rate']:.1f}%, "
                        f"Average Sharpe ratio {current_stats['avg_sharpe_ratio']:.4f}")
                
            print("-" * 50)
        
    except Exception as e:
        completed_tasks += 1
        print(f"❌ Task {task_id} failed: {e}")
        
        failed_result = {
            'params': params,
            'result': None,
            'success': False,
            'error': str(e),
            'task_id': task_id
        }
        results.append(failed_result)
        
        # Record failed tasks in SimpleMonitor as well
        monitor_result = {
            'success': False,
            'sharpe_ratio': 0.0,
            'total_return': 0.0,
            'params': params,
            'execution_time': 0.0,
            'error': str(e)
        }
        simple_monitor.record_result(monitor_result)
        
        # Reflect failed tasks in progress as well (increment by 1)
        progress_tracker.update(1)

📊 Executing 16 backtests in parallel with real-time monitoring... 
----------------------------------------------------------------------
📈 진행률: 1/16 (6.2%), ETA: 22초
██░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░, ETA: 22초
--------------------------------------------------
📈 진행률: 2/16 (12.5%), ETA: 10초
█████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░, ETA: 10초
--------------------------------------------------
📈 진행률: 3/16 (18.8%), ETA: 6초
███████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░, ETA: 6초
--------------------------------------------------
📈 진행률: 4/16 (25.0%), ETA: 4초
██████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░, ETA: 4초
--------------------------------------------------
📈 진행률: 5/16 (31.2%), ETA: 3초
████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░, ETA: 3초
   💡 Current statistics: Success rate 1.0%, Average Sharpe ratio 3.6642
--------------------------------------------------
📈 진행률: 6/16 (37.5%), ETA: 2초
███████████████░░░░░░░░░░░░░░░░░░░░░░░░░, ETA: 2초
   💡 Current statistics: Success rate 1.0%, Average Sharpe

In [8]:
# 12. Final result analysis and output
print("\n" + "=" * 70)
print("📊 Ray Backtesting Monitoring Integrated System Final Results")
print("=" * 70)

# Progress tracker final status
final_progress = progress_tracker.format_progress(show_bar=True)
print(f"🎯 Final progress status: {final_progress}")

# SimpleMonitor final statistics
final_statistics = simple_monitor.get_statistics()
print(f"\n📈 Final backtest statistics:")
print(f"   - Success rate: {final_statistics['success_rate']:.1f}%")
print(f"   - Average Sharpe ratio: {final_statistics['avg_sharpe_ratio']:.4f}")
print(f"   - Average total return: {final_statistics['avg_return']:.4f}")

# Optimal parameter information
best_params = simple_monitor.get_best_performance()
if best_params:
    print(f"\n🏆 Optimal parameters:")
    print(f"   - Buy SMA: {best_params['params']['buy_sma']}")
    print(f"   - Sell SMA: {best_params['params']['sell_sma']}")
    print(f"   - Sharpe ratio: {best_params.get('sharpe_ratio', 0):.4f}")
    print(f"   - Total return: {best_params.get('total_return', 0):.4f}")

# Detailed result summary output
summary_text = simple_monitor.format_summary()
print(f"\n📊 Detailed performance summary:")
for line in summary_text.split('\n'):
    if line.strip():
        print(f"   {line}")


📊 Ray Backtesting Monitoring Integrated System Final Results
🎯 Final progress status: 진행률: 16/16 (100.0%), ETA: 완료 - 완료
████████████████████████████████████████

📈 Final backtest statistics:
   - Success rate: 1.0%
   - Average Sharpe ratio: 3.7741
   - Average total return: 1.1634

🏆 Optimal parameters:
   - Buy SMA: 15
   - Sell SMA: 35
   - Sharpe ratio: 4.4160
   - Total return: 1.3737

📊 Detailed performance summary:
   📊 현재 성과:
      총 결과: 16개
      성공률: 100.0%
      평균 샤프비율: 3.7741
      평균 수익률: 1.1634
      평균 실행시간: 0.00초
      최고 샤프비율: 4.4160 (파라메터: {'buy_sma': 15, 'sell_sma': 35})


In [9]:
# 13. System efficiency analysis
print(f"   - Update frequency: 1 per {update_interval} tasks")

# Check cluster status
final_cluster_health = cluster_manager.monitor_cluster_health()
print(f"   - Final cluster status: {final_cluster_health['status']}")

# 14. Cluster cleanup
cluster_manager.shutdown_cluster()
print("\n✅ Cluster manager shutdown completed")

results = {}
results['monitoring_results'] = {
    'progress_tracker': progress_tracker.get_progress(),
    'statistics': final_statistics,
    'best_result': best_params
}


results['execution_metrics'] = {
    'total_combinations': total_combinations,
    'successful_combinations': final_statistics['total_results'],
    'success_rate': final_statistics['success_rate']
}

results['performance_analysis'] = {
    'best_sharpe_ratio': best_params.get('sharpe_ratio', 0) if best_params else 0,
    'best_total_return': best_params.get('total_return', 0) if best_params else 0,
    'avg_sharpe_ratio': final_statistics['avg_sharpe_ratio'],
    'cluster_status': final_cluster_health['status']
}

   - Update frequency: 1 per 1 tasks
   - Final cluster status: healthy

✅ Cluster manager shutdown completed


In [10]:
# Core performance indicator summary
metrics = results['execution_metrics']
performance = results['performance_analysis']

print(f"\n📊 Core performance indicators:")
print(f"   🎯 Success rate: {metrics['success_rate']*100:.1f}%")
print(f"   🏆 Best Sharpe ratio: {performance['best_sharpe_ratio']:.4f}")
print(f"   📊 Average Sharpe ratio: {performance['avg_sharpe_ratio']:.4f}")
print(f"   💰 Best return rate: {performance['best_total_return']:.4f}")


📊 Core performance indicators:
   🎯 Success rate: 100.0%
   🏆 Best Sharpe ratio: 4.4160
   📊 Average Sharpe ratio: 3.7741
   💰 Best return rate: 1.3737
