# 大規模CANデータGPU処理ベンチマーク（RMM最適化版）

1000万、1億、10億メッセージの大規模データでGPU処理性能を評価します。
RMM managed memoryとメモリプール設定を最適化しています。

## 1. 環境設定とRMM初期化

In [None]:
import os
import rmm
import cupy as cp
import numpy as np
import pandas as pd
import cudf
import matplotlib.pyplot as plt
import seaborn as sns
import time
import gc
import psutil
from gpu_can_decoder import GPUCANDecoder
from cpu_can_decoder import CPUCANDecoder

# RMM managed memory の有効化（OOM回避のため）
# GPUメモリが不足した場合、自動的にホストメモリを使用
rmm.reinitialize(
    managed_memory=True,
    pool_allocator=True,
    initial_pool_size=2<<30,  # 2GB initial pool
    maximum_pool_size=20<<30  # 20GB max pool (24GB GPUの場合)
)

# Plot settings
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 12
sns.set_style("whitegrid")

# System info
print("Environment setup complete with RMM managed memory")
print(f"Available RAM: {psutil.virtual_memory().available / (1024**3):.1f} GB")
print(f"Total RAM: {psutil.virtual_memory().total / (1024**3):.1f} GB")

# GPU info
device = cp.cuda.Device()
mem_info = device.mem_info
print(f"\nGPU Memory:")
print(f"  Free: {mem_info[0] / (1024**3):.2f} GB")
print(f"  Total: {mem_info[1] / (1024**3):.2f} GB")

## 2. 大規模データ生成関数（メモリ効率化版）

In [None]:
def generate_synthetic_can_data_chunked(n_messages, chunk_size=5_000_000):
    """大規模CANデータの生成（チャンク処理版）"""
    # リアルなCANデータ分布
    address_distribution = {
        170: 0.037,  # 4輪速度
        37: 0.037,   # ステアリング
        36: 0.037,
        740: 0.044,
        608: 0.022,
        180: 0.018,
    }
    
    # メモリ効率のため小さめのチャンクサイズを使用
    chunk_size = min(chunk_size, n_messages)
    
    # 全体の配列を事前割り当て
    timestamps = np.empty(n_messages, dtype=np.float64)
    addresses = np.empty(n_messages, dtype=np.int64)
    data_bytes = np.empty((n_messages, 8), dtype=np.uint8)
    
    # チャンクごとに処理
    n_chunks = (n_messages + chunk_size - 1) // chunk_size
    
    for chunk_idx in range(n_chunks):
        start_idx = chunk_idx * chunk_size
        end_idx = min(start_idx + chunk_size, n_messages)
        chunk_messages = end_idx - start_idx
        
        # アドレスを生成
        chunk_addresses = []
        for addr, prob in address_distribution.items():
            count = int(chunk_messages * prob)
            chunk_addresses.extend([addr] * count)
        
        # 残りはランダムなアドレス
        remaining = chunk_messages - len(chunk_addresses)
        other_addresses = np.random.choice([452, 466, 467, 705, 321, 562], remaining)
        chunk_addresses.extend(other_addresses)
        
        # シャッフル
        np.random.shuffle(chunk_addresses)
        addresses[start_idx:end_idx] = chunk_addresses[:chunk_messages]
        
        # タイムスタンプ
        timestamps[start_idx:end_idx] = np.linspace(
            46408.0 + (chunk_idx * 60),
            46408.0 + ((chunk_idx + 1) * 60),
            chunk_messages
        )
        
        # データバイト
        chunk_data = np.zeros((chunk_messages, 8), dtype=np.uint8)
        
        for i in range(chunk_messages):
            if addresses[start_idx + i] == 170:  # 4輪速度
                for j in range(4):
                    speed_kmh = np.random.uniform(55, 65)  # 55-65 km/h
                    raw_value = int((speed_kmh + 67.67) / 0.01)
                    chunk_data[i, j*2] = (raw_value >> 8) & 0xFF
                    chunk_data[i, j*2 + 1] = raw_value & 0xFF
            elif addresses[start_idx + i] == 37:  # ステアリング
                chunk_data[i] = [0x00, 0x00, 0x10, 0x00, 0xC0, 0x00, 0x00, 0xFD]
            else:
                chunk_data[i] = np.random.randint(0, 256, 8, dtype=np.uint8)
        
        data_bytes[start_idx:end_idx] = chunk_data
        
        if chunk_idx % max(1, n_chunks // 10) == 0:
            print(f"  Generated chunk {chunk_idx + 1}/{n_chunks}")
            gc.collect()
    
    return timestamps, addresses, data_bytes

# テスト
print("Testing data generation...")
test_t, test_a, test_d = generate_synthetic_can_data_chunked(100_000)
print(f"Generated {len(test_t):,} messages")
print(f"Memory usage: {(test_t.nbytes + test_a.nbytes + test_d.nbytes) / (1024**2):.1f} MB")

## 3. GPUデコーダーの拡張（チャンク処理対応）

In [None]:
class ChunkedGPUCANDecoder(GPUCANDecoder):
    """チャンク処理対応のGPUCANデコーダー"""
    
    def decode_large_batch(self, timestamps, addresses, data_bytes, chunk_size=50_000_000):
        """大規模データのチャンク処理"""
        n_messages = len(timestamps)
        n_chunks = (n_messages + chunk_size - 1) // chunk_size
        
        # 結果を格納する辞書
        all_results = {signal: [] for signal in self.signal_configs.keys()}
        
        for chunk_idx in range(n_chunks):
            start_idx = chunk_idx * chunk_size
            end_idx = min(start_idx + chunk_size, n_messages)
            
            print(f"  Processing chunk {chunk_idx + 1}/{n_chunks} ({end_idx - start_idx:,} messages)")
            
            # チャンクデータの抽出
            chunk_timestamps = timestamps[start_idx:end_idx]
            chunk_addresses = addresses[start_idx:end_idx]
            chunk_data = data_bytes[start_idx:end_idx]
            
            # チャンクの処理
            chunk_results = self.decode_batch(chunk_timestamps, chunk_addresses, chunk_data)
            
            # 結果の統合
            for signal, df in chunk_results.items():
                if df is not None and len(df) > 0:
                    all_results[signal].append(df)
            
            # メモリクリーンアップ
            del chunk_timestamps, chunk_addresses, chunk_data, chunk_results
            gc.collect()
            cp.get_default_memory_pool().free_all_blocks()
        
        # 全チャンクの結果を結合
        final_results = {}
        for signal, dfs in all_results.items():
            if dfs:
                final_results[signal] = cudf.concat(dfs, ignore_index=True)
                # タイムスタンプでソート
                final_results[signal] = final_results[signal].sort_values('timestamp').reset_index(drop=True)
            else:
                final_results[signal] = None
        
        return final_results

# デコーダーの初期化
gpu_decoder = ChunkedGPUCANDecoder(batch_size=10_000_000)

## 4. 大規模ベンチマーク実行（修正版）

In [None]:
# テストサイズ（変更可能）
test_sizes = [10_000_000, 100_000_000, 1_000_000_000]
benchmark_results = []

for n_messages in test_sizes:
    print(f"\n{'='*60}")
    print(f"Testing with {n_messages:,} messages")
    print(f"{'='*60}")
    
    # メモリチェック
    required_memory_gb = (n_messages * 24) / (1024**3)  # 24 bytes per message
    available_memory_gb = psutil.virtual_memory().available / (1024**3)
    
    print(f"Required memory: ~{required_memory_gb:.1f} GB")
    print(f"Available memory: {available_memory_gb:.1f} GB")
    
    # 10億メッセージの場合は特別な処理
    if n_messages >= 1_000_000_000:
        print("\nUsing streaming approach for 1B+ messages...")
        # ストリーミング処理を実装（将来の拡張）
        print("Skipping 1B test for now (requires streaming implementation)")
        continue
    
    try:
        # データ生成
        print("\nGenerating synthetic data...")
        gen_start = time.time()
        timestamps, addresses, data_bytes = generate_synthetic_can_data_chunked(n_messages)
        gen_time = time.time() - gen_start
        
        data_size_mb = (timestamps.nbytes + addresses.nbytes + data_bytes.nbytes) / (1024**2)
        print(f"Data generation time: {gen_time:.2f} seconds")
        print(f"Data size: {data_size_mb:.1f} MB ({data_size_mb/1024:.2f} GB)")
        
        # GPU処理
        print("\nRunning GPU processing...")
        gpu_start = time.time()
        
        # 100M以上の場合はチャンク処理を使用
        if n_messages >= 100_000_000:
            gpu_results = gpu_decoder.decode_large_batch(timestamps, addresses, data_bytes)
        else:
            gpu_results = gpu_decoder.decode_batch(timestamps, addresses, data_bytes)
        
        cp.cuda.Stream.null.synchronize()  # GPU同期
        gpu_time = time.time() - gpu_start
        
        # 結果の統計
        n_decoded = sum(len(df) for df in gpu_results.values() if df is not None)
        
        # CPU処理時間の推定
        estimated_cpu_time = 0.45 * (n_messages / 1_000_000)
        
        # 結果記録
        result = {
            'n_messages': n_messages,
            'data_size_gb': data_size_mb / 1024,
            'generation_time': gen_time,
            'gpu_time': gpu_time,
            'estimated_cpu_time': estimated_cpu_time,
            'speedup': estimated_cpu_time / gpu_time,
            'gpu_throughput_mmsg': n_messages / gpu_time / 1e6,
            'gpu_throughput_gb': (data_size_mb / 1024) / gpu_time,
            'n_decoded_messages': n_decoded
        }
        benchmark_results.append(result)
        
        print(f"\n=== Results ===")
        print(f"GPU processing time: {gpu_time:.3f} seconds")
        print(f"Throughput: {result['gpu_throughput_mmsg']:.1f} Mmessages/sec")
        print(f"Throughput: {result['gpu_throughput_gb']:.2f} GB/sec")
        print(f"Estimated speedup vs CPU: {result['speedup']:.1f}x")
        print(f"Decoded messages: {n_decoded:,}")
        
        # メモリクリーンアップ
        del timestamps, addresses, data_bytes, gpu_results
        gc.collect()
        cp.get_default_memory_pool().free_all_blocks()
        
    except Exception as e:
        print(f"\nERROR: {str(e)}")
        import traceback
        traceback.print_exc()
        print("Skipping this test size...")
        continue

# 結果をDataFrameに
if benchmark_results:
    benchmark_df = pd.DataFrame(benchmark_results)
    print("\n" + "="*60)
    print("BENCHMARK SUMMARY")
    print("="*60)
    print(benchmark_df.to_string(index=False))

## 5. 結果の可視化

In [None]:
if benchmark_results:
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
    
    # Processing time vs data size
    ax1.plot(benchmark_df['n_messages'], benchmark_df['gpu_time'], 'b-o', 
             label='GPU Actual', linewidth=2, markersize=8)
    ax1.plot(benchmark_df['n_messages'], benchmark_df['estimated_cpu_time'], 'r--o', 
             label='CPU Estimated', linewidth=2, markersize=8)
    ax1.set_xlabel('Number of Messages')
    ax1.set_ylabel('Processing Time (seconds)')
    ax1.set_title('Processing Time Scaling')
    ax1.set_xscale('log')
    ax1.set_yscale('log')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Throughput scaling
    ax2.plot(benchmark_df['n_messages'], benchmark_df['gpu_throughput_mmsg'], 'g-o', 
             linewidth=2, markersize=8)
    ax2.set_xlabel('Number of Messages')
    ax2.set_ylabel('Throughput (Mmessages/sec)')
    ax2.set_title('GPU Throughput Scaling')
    ax2.set_xscale('log')
    ax2.grid(True, alpha=0.3)
    
    # Data throughput (GB/s)
    ax3.plot(benchmark_df['n_messages'], benchmark_df['gpu_throughput_gb'], 'm-o', 
             linewidth=2, markersize=8)
    ax3.set_xlabel('Number of Messages')
    ax3.set_ylabel('Throughput (GB/sec)')
    ax3.set_title('GPU Data Throughput')
    ax3.set_xscale('log')
    ax3.grid(True, alpha=0.3)
    
    # Speedup ratio
    ax4.plot(benchmark_df['n_messages'], benchmark_df['speedup'], 'c-o', 
             linewidth=2, markersize=8)
    ax4.set_xlabel('Number of Messages')
    ax4.set_ylabel('Speedup (times)')
    ax4.set_title('Estimated GPU Speedup vs CPU')
    ax4.set_xscale('log')
    ax4.grid(True, alpha=0.3)
    ax4.axhline(y=1, color='k', linestyle='--', alpha=0.5)
    
    plt.tight_layout()
    plt.show()
    
    # Summary statistics
    print("\n=== Performance Summary ===")
    print(f"Maximum throughput: {benchmark_df['gpu_throughput_mmsg'].max():.1f} Mmessages/sec")
    print(f"Maximum data rate: {benchmark_df['gpu_throughput_gb'].max():.2f} GB/sec")
    print(f"Maximum speedup: {benchmark_df['speedup'].max():.1f}x")

## 6. メモリ使用量分析

In [None]:
# メモリ使用量の理論値計算
print("=== Memory Usage Analysis ===")
print("\nPer-message memory requirements:")
print("  - Timestamp: 8 bytes (float64)")
print("  - Address: 8 bytes (int64)")
print("  - Data: 8 bytes (8 x uint8)")
print("  - Total: 24 bytes/message")
print("\nDataset sizes:")

for n_messages in test_sizes:
    total_gb = (n_messages * 24) / (1024**3)
    print(f"  - {n_messages:,} messages: {total_gb:.2f} GB")

# GPU メモリ情報
print("\nGPU Memory Status:")
mempool = cp.get_default_memory_pool()
print(f"  - Used: {mempool.used_bytes() / (1024**3):.2f} GB")
print(f"  - Total allocated: {mempool.total_bytes() / (1024**3):.2f} GB")

# GPUデバイス情報
device = cp.cuda.Device()
mem_info = device.mem_info
print(f"\nGPU Device Info:")
print(f"  - Total Memory: {mem_info[1] / (1024**3):.1f} GB")
print(f"  - Free Memory: {mem_info[0] / (1024**3):.1f} GB")
print(f"  - Used Memory: {(mem_info[1] - mem_info[0]) / (1024**3):.1f} GB")

# RMM設定情報
print(f"\nRMM Configuration:")
print(f"  - Managed Memory: Enabled")
print(f"  - Pool Allocator: Enabled")
print(f"  - Initial Pool Size: 2 GB")
print(f"  - Maximum Pool Size: 20 GB")

## 7. パフォーマンス改善の推奨事項

In [None]:
print("=== Performance Optimization Recommendations ===")
print("\n1. RMM Managed Memory:")
print("   - 現在有効化されており、GPUメモリ不足時に自動的にホストメモリを使用")
print("   - パフォーマンスは低下するが、OOMエラーを回避可能")

print("\n2. Chunk Processing:")
print("   - 100M以上のメッセージでは自動的にチャンク処理を使用")
print("   - チャンクサイズは50Mメッセージ（約1.2GB）に設定")

print("\n3. Memory Pool Settings:")
print("   - Initial: 2GB (GPUの高速処理用)")
print("   - Maximum: 20GB (24GB GPUの場合の推奨値)")

print("\n4. Further Optimizations:")
print("   - ストリーミング処理の実装（10億メッセージ以上）")
print("   - マルチGPU対応")
print("   - より効率的なメモリ再利用")