In [1]:
#!/usr/bin/env python3
"""
Experiment 1: Prefill + Concurrent Prefill (PYTORCH PROFILING)
Uses PyTorch profiler to verify actual parallel execution

Works in JupyterLab - no external tools needed.
"""

import os
os.environ['HF_HOME'] = '/workspace/huggingface_cache'

from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from torch.profiler import profile, ProfilerActivity
import time
import threading
import json

# ============================================================================
# CONFIGURATION
# ============================================================================

MODEL_NAME = "Qwen/Qwen2.5-7B-Instruct"

# Very long prompts for high compute utilization
LONG_PROMPT_TEMPLATE = """You are tasked with providing a comprehensive technical analysis of distributed database systems for a Fortune 500 company evaluating data infrastructure modernization. The company operates a legacy Oracle RAC database cluster with 500TB of structured data, 2PB of unstructured data in S3, serving 100,000 concurrent users across 50 global offices in 25 countries. They need to evaluate migration paths to modern distributed architectures supporting both OLTP and OLAP workloads with minimal downtime while maintaining ACID guarantees and supporting real-time analytics.

Current infrastructure: 12-node Oracle RAC cluster on bare metal with SAN storage, 50ms average query latency, 99.95% uptime SLA, backup RPO of 1 hour. Daily batch processing takes 8 hours using legacy ETL tools. The system processes 2 million transactions per day with peak loads of 50,000 TPS during business hours. Critical business applications include order management, inventory tracking, customer relationship management, financial reporting, and real-time fraud detection.

Technical constraints: Must maintain sub-100ms p99 latency for OLTP queries, support complex JOIN operations across 200+ tables, enable real-time materialized views for analytics, provide point-in-time recovery for compliance, support multi-region disaster recovery with RPO < 5 minutes, enable blue-green deployments for zero-downtime upgrades, maintain referential integrity across distributed transactions, support both SQL and NoSQL query patterns, enable real-time data replication to analytics warehouse.

Evaluation criteria: Total cost of ownership over 5 years including licensing, infrastructure, operations, migration costs. Performance benchmarks for OLTP (TPS, latency), OLAP (query response time, concurrency), mixed workloads. Operational complexity including monitoring, troubleshooting, capacity planning, disaster recovery procedures. Vendor ecosystem including tooling support, cloud integration, community resources. Migration strategy including data migration approach, application refactoring requirements, downtime windows, rollback procedures.

Modern architecture options to evaluate: (1) Amazon Aurora PostgreSQL with read replicas and Aurora Serverless v2 for variable workloads. (2) Google Cloud Spanner for globally distributed ACID transactions with TrueTime. (3) CockroachDB for distributed SQL with automatic sharding and rebalancing. (4) MongoDB Atlas with multi-region clusters and change streams. (5) Cassandra with Spark for analytics workloads. (6) Hybrid approach using PostgreSQL for OLTP with Snowflake for OLAP. (7) YugabyteDB for PostgreSQL compatibility with distributed architecture."""

CONCURRENT_PROMPT_TEMPLATE = """You are providing expert analysis on cloud-native application architecture patterns for a global e-commerce platform processing $5B in annual revenue. The platform serves 50 million active users across web, mobile, and API channels with 99.99% availability requirements.

Current architecture: Monolithic Java application (2.5M lines of code) on 100 EC2 instances behind ELB, MySQL cluster with read replicas, Redis for caching, RabbitMQ for async processing. System handles 10M requests per day with average response time of 500ms. Critical services include product catalog, shopping cart, order processing, payment gateway integration, inventory management, recommendation engine, fraud detection, customer service portal.

Modernization drivers: Deployment velocity (currently 2-week release cycles, targeting daily deployments), operational costs (EC2 + RDS spending $2M annually), scaling challenges (Black Friday requires 10x capacity, manual scaling takes hours), developer productivity (build times 45 minutes, integration testing 4 hours), service reliability (cascading failures affect entire platform).

Architecture evaluation criteria: Operational complexity (service orchestration, observability, debugging distributed transactions), development velocity (microservice boundaries, API contracts, testing strategies), infrastructure costs (compute, storage, networking, managed services), reliability patterns (circuit breakers, bulkheads, rate limiting, chaos engineering), data consistency (eventual vs strong consistency, saga patterns, event sourcing), security posture (service mesh, mTLS, API gateways, secret management).

Proposed patterns: (1) Strangler fig migration with API gateway routing legacy vs new services. (2) Event-driven architecture with Kafka for service communication. (3) CQRS with separate read/write models and materialized views. (4) Service mesh with Istio for traffic management and observability. (5) Serverless for variable workloads using Lambda/Fargate. (6) GraphQL federation for unified API layer. (7) Feature flags and canary deployments for gradual rollout.

Technical considerations: Service decomposition strategy (bounded contexts, team ownership, API versioning), data management (database per service, shared database, event sourcing), inter-service communication (synchronous REST/gRPC vs asynchronous messaging), distributed transactions (saga patterns, compensating transactions, idempotency), observability (distributed tracing, metrics aggregation, log correlation), testing strategy (contract testing, chaos engineering, synthetic monitoring), deployment architecture (Kubernetes, service mesh, API gateway, monitoring stack)."""

DEFAULT_PROMPT = (LONG_PROMPT_TEMPLATE + "\n\n" + "Please analyze each option in detail. " * 50).strip()
CONCURRENT_PROMPT = (CONCURRENT_PROMPT_TEMPLATE + "\n\n" + "Provide detailed recommendations. " * 50).strip()

# ============================================================================
# CONCURRENT STREAM
# ============================================================================

class ConcurrentStream:
    """Concurrent prefill stream"""
    def __init__(self, model, tokenizer, prompt, device="cuda"):
        self.model = model
        self.tokenizer = tokenizer
        self.prompt = prompt
        self.device = device
        self.stream = torch.cuda.Stream()
        self.should_stop = threading.Event()
        self.thread = None
        self.iteration_count = 0
    
    def _run_stream(self):
        with torch.cuda.stream(self.stream):
            while not self.should_stop.is_set():
                try:
                    inputs = self.tokenizer([self.prompt], return_tensors="pt")
                    inputs = {k: v.to(self.device) for k, v in inputs.items()}
                    
                    with torch.no_grad():
                        _ = self.model(**inputs, use_cache=True)
                    
                    torch.cuda.synchronize(self.stream)
                    self.iteration_count += 1
                    
                except Exception as e:
                    print(f"  [WARNING] Concurrent stream error: {e}")
                    break
    
    def start(self):
        self.iteration_count = 0
        self.should_stop.clear()
        self.thread = threading.Thread(target=self._run_stream, daemon=True)
        self.thread.start()
        time.sleep(0.5)
    
    def stop(self):
        self.should_stop.set()
        if self.thread:
            self.thread.join(timeout=10)
        torch.cuda.synchronize()
        return self.iteration_count

# ============================================================================
# PROFILING ANALYSIS
# ============================================================================

def analyze_trace(trace_file):
    """
    Analyze Chrome trace to detect concurrent kernel execution.
    
    Returns:
        dict with concurrent execution statistics
    """
    with open(trace_file, 'r') as f:
        trace_data = json.load(f)
    
    # Extract CUDA kernel events
    kernel_events = []
    for event in trace_data.get('traceEvents', []):
        if event.get('cat') == 'kernel' and event.get('ph') == 'X':
            kernel_events.append({
                'name': event.get('name', ''),
                'start': event.get('ts', 0),  # microseconds
                'dur': event.get('dur', 0),   # microseconds
                'end': event.get('ts', 0) + event.get('dur', 0),
                'stream': event.get('args', {}).get('stream', -1)
            })
    
    if not kernel_events:
        return {
            'total_kernels': 0,
            'concurrent_pairs': 0,
            'streams': set(),
            'analysis': 'No kernel events found in trace'
        }
    
    # Sort by start time
    kernel_events.sort(key=lambda x: x['start'])
    
    # Find concurrent kernel pairs (from different streams)
    concurrent_pairs = []
    streams = set(k['stream'] for k in kernel_events)
    
    for i, k1 in enumerate(kernel_events):
        for k2 in kernel_events[i+1:]:
            # Check if they overlap in time
            if k1['end'] <= k2['start']:
                break  # No more possible overlaps for k1
            
            # Check if from different streams
            if k1['stream'] != k2['stream'] and k1['stream'] != -1 and k2['stream'] != -1:
                overlap_start = max(k1['start'], k2['start'])
                overlap_end = min(k1['end'], k2['end'])
                overlap_duration = overlap_end - overlap_start
                
                if overlap_duration > 0:
                    concurrent_pairs.append({
                        'kernel1': k1['name'],
                        'kernel2': k2['name'],
                        'stream1': k1['stream'],
                        'stream2': k2['stream'],
                        'overlap_us': overlap_duration
                    })
    
    # Calculate statistics
    total_duration = max(k['end'] for k in kernel_events) - min(k['start'] for k in kernel_events)
    total_kernel_time = sum(k['dur'] for k in kernel_events)
    
    return {
        'total_kernels': len(kernel_events),
        'concurrent_pairs': len(concurrent_pairs),
        'unique_streams': len(streams),
        'streams': streams,
        'total_duration_ms': total_duration / 1000,
        'total_kernel_time_ms': total_kernel_time / 1000,
        'parallelism_factor': total_kernel_time / total_duration if total_duration > 0 else 0,
        'sample_concurrent_pairs': concurrent_pairs[:5]  # First 5 examples
    }

# ============================================================================
# MAIN EXPERIMENT
# ============================================================================

def run_with_profiling(model, tokenizer, prompt, 
                       concurrent_stream=None, label=""):
    """Run prefill with PyTorch profiling"""
    
    device = "cuda"
    inputs = tokenizer([prompt], return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    if concurrent_stream:
        concurrent_stream.start()
    
    # Warmup
    with torch.no_grad():
        _ = model(**inputs, use_cache=True)
    torch.cuda.synchronize()
    
    # Profile the actual run
    trace_file = f"/workspace/trace_prefill_{label}.json"
    
    torch.cuda.synchronize()
    start = time.perf_counter()
    
    with profile(
        activities=[ProfilerActivity.CUDA],
        record_shapes=False,
        with_stack=False
    ) as prof:
        with torch.no_grad():
            _ = model(**inputs, use_cache=True)
    
    torch.cuda.synchronize()
    elapsed = (time.perf_counter() - start) * 1000
    
    if concurrent_stream:
        iterations = concurrent_stream.stop()
    else:
        iterations = 0
    
    # Export trace
    prof.export_chrome_trace(trace_file)
    
    # Analyze trace
    analysis = analyze_trace(trace_file)
    
    return elapsed, iterations, analysis, trace_file

def main():
    print("=" * 70)
    print("PYTORCH PROFILING: PREFILL + CONCURRENT PREFILL")
    print("=" * 70)
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"PyTorch: {torch.__version__}")
    print(f"CUDA: {torch.version.cuda}")
    print()
    
    print("Loading model...")
    device = "cuda"
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        torch_dtype=torch.bfloat16,
        device_map=device
    )
    model.eval()
    print("✓ Model loaded\n")
    
    default_tokens = len(tokenizer.encode(DEFAULT_PROMPT))
    concurrent_tokens = len(tokenizer.encode(CONCURRENT_PROMPT))
    
    print(f"Configuration:")
    print(f"  Default stream: {default_tokens} tokens (prefill)")
    print(f"  Concurrent stream: {concurrent_tokens} tokens (prefill)")
    print()
    
    # Baseline
    print("\n[BASELINE] Single stream prefill")
    baseline_time, _, baseline_analysis, baseline_trace = run_with_profiling(
        model, tokenizer, DEFAULT_PROMPT,
        concurrent_stream=None, label="baseline"
    )
    print(f"  Duration: {baseline_time:.2f}ms")
    print(f"  Kernels: {baseline_analysis['total_kernels']}")
    print(f"  Streams: {baseline_analysis['unique_streams']}")
    print(f"  Trace: {baseline_trace}")
    
    # Concurrent
    print("\n[CONCURRENT] Dual stream prefill")
    concurrent = ConcurrentStream(model, tokenizer, CONCURRENT_PROMPT, device)
    concurrent_time, iterations, concurrent_analysis, concurrent_trace = run_with_profiling(
        model, tokenizer, DEFAULT_PROMPT,
        concurrent_stream=concurrent, label="concurrent"
    )
    print(f"  Default stream duration: {concurrent_time:.2f}ms")
    print(f"  Concurrent iterations: {iterations}")
    print(f"  Kernels: {concurrent_analysis['total_kernels']}")
    print(f"  Streams: {concurrent_analysis['unique_streams']}")
    print(f"  Trace: {concurrent_trace}")
    
    # Analysis
    print("\n" + "=" * 70)
    print("PARALLEL EXECUTION ANALYSIS")
    print("=" * 70)
    
    print(f"\nBaseline:")
    print(f"  Total kernels: {baseline_analysis['total_kernels']}")
    print(f"  Unique streams: {baseline_analysis['unique_streams']}")
    print(f"  Concurrent kernel pairs: {baseline_analysis['concurrent_pairs']}")
    
    print(f"\nConcurrent:")
    print(f"  Total kernels: {concurrent_analysis['total_kernels']}")
    print(f"  Unique streams: {concurrent_analysis['unique_streams']}")
    print(f"  Concurrent kernel pairs: {concurrent_analysis['concurrent_pairs']}")
    print(f"  Parallelism factor: {concurrent_analysis['parallelism_factor']:.2f}x")
    
    if concurrent_analysis['sample_concurrent_pairs']:
        print(f"\nExample concurrent kernel pairs:")
        for pair in concurrent_analysis['sample_concurrent_pairs'][:3]:
            print(f"  • Stream {pair['stream1']} vs Stream {pair['stream2']}: {pair['overlap_us']/1000:.2f}ms overlap")
            print(f"    {pair['kernel1'][:50]}... || {pair['kernel2'][:50]}...")
    
    # Verdict
    print("\n" + "=" * 70)
    print("VERDICT")
    print("=" * 70)
    
    print(f"\nTiming:")
    print(f"  Baseline:   {baseline_time:.2f}ms")
    print(f"  Concurrent: {concurrent_time:.2f}ms")
    print(f"  Slowdown:   {concurrent_time - baseline_time:.2f}ms ({(concurrent_time/baseline_time - 1)*100:.1f}%)")
    
    print(f"\nParallel Execution:")
    if concurrent_analysis['concurrent_pairs'] > 0:
        print(f"  ✓ VERIFIED - {concurrent_analysis['concurrent_pairs']} concurrent kernel pairs detected")
        print(f"  ✓ Multiple streams active: {concurrent_analysis['unique_streams']}")
        print(f"  ✓ Parallelism factor: {concurrent_analysis['parallelism_factor']:.2f}x")
        print(f"\n  → GPU executed kernels from both prefill streams simultaneously")
        print(f"  → Slowdown indicates compute/memory resource contention")
        
        if concurrent_analysis['parallelism_factor'] > 1.5:
            print(f"  → High parallelism - significant concurrent execution")
        elif concurrent_analysis['parallelism_factor'] > 1.0:
            print(f"  → Moderate parallelism - partial concurrent execution")
        else:
            print(f"  → Low parallelism - mostly serialized despite concurrent kernels")
    else:
        print(f"  ✗ NO CONCURRENCY - Kernels executed sequentially")
        print(f"  → GPU may have serialized the prefill streams")
    
    print(f"\nVisualize timelines:")
    print(f"  1. Open Chrome browser")
    print(f"  2. Navigate to: chrome://tracing")
    print(f"  3. Load trace files:")
    print(f"     - Baseline: {baseline_trace}")
    print(f"     - Concurrent: {concurrent_trace}")
    print(f"  4. Look for overlapping kernel execution bars from different streams")
    print("=" * 70)

if __name__ == "__main__":
    main()

PYTORCH PROFILING: PREFILL + CONCURRENT PREFILL
GPU: NVIDIA A100 80GB PCIe
PyTorch: 2.8.0+cu128
CUDA: 12.8

Loading model...


`torch_dtype` is deprecated! Use `dtype` instead!


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

✓ Model loaded

Configuration:
  Default stream: 875 tokens (prefill)
  Concurrent stream: 711 tokens (prefill)


[BASELINE] Single stream prefill
  Duration: 119.24ms
  Kernels: 1142
  Streams: 1
  Trace: /workspace/trace_prefill_baseline.json

[CONCURRENT] Dual stream prefill
  Default stream duration: 160.01ms
  Concurrent iterations: 11
  Kernels: 2284
  Streams: 2
  Trace: /workspace/trace_prefill_concurrent.json

PARALLEL EXECUTION ANALYSIS

Baseline:
  Total kernels: 1142
  Unique streams: 1
  Concurrent kernel pairs: 0

Concurrent:
  Total kernels: 2284
  Unique streams: 2
  Concurrent kernel pairs: 2026
  Parallelism factor: 1.41x

Example concurrent kernel pairs:
  • Stream 7 vs Stream 13: 0.00ms overlap
    void at::native::unrolled_elementwise_kernel<at::n... || void at::native::(anonymous namespace)::CatArrayBa...
  • Stream 7 vs Stream 13: 0.00ms overlap
    void at::native::vectorized_elementwise_kernel<4, ... || void at::native::vectorized_elementwise_kernel<4, ...
  • 