# Cardboard Quality Control - GPU-Optimized LoRA Inference

## Complete Guide to Full GPU Offloading with Fine-tuned LoRA Adapter

This notebook demonstrates how to use your fine-tuned LoRA adapter with maximum GPU utilization, similar to Ollama's `num_gpu=999` functionality. It includes memory optimization, batch processing, and production-ready error handling.

### Features:
- **Full GPU Offloading**: Maximize GPU usage with various precision options
- **Memory Monitoring**: Real-time VRAM and RAM tracking
- **LoRA Integration**: Direct LoRA loading without merging for efficiency
- **Batch Processing**: Handle multiple images efficiently
- **Cardboard QC Specific**: Specialized prompts and quality assessment
- **Production Ready**: Error handling, logging, and fallback strategies

### Hardware Requirements:
- RTX 3060 (6GB VRAM) - optimized for your setup
- Additional system RAM for model loading
- CUDA-compatible PyTorch installation

## 1. Environment Setup and Dependencies

First, let's set up the environment and install required packages.

In [4]:
# Install required packages if not already installed
import subprocess
import sys
import os

def install_package(package):
    """Install package if not already installed."""
    try:
        __import__(package.split('==')[0].replace('-', '_'))
        print(f"✅ {package} already installed")
    except ImportError:
        print(f"📦 Installing {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        print(f"✅ {package} installed successfully")

# Required packages
required_packages = [
    "torch>=2.0.0",
    "torchvision",
    "transformers>=4.37.0",
    "peft>=0.8.0",
    "accelerate>=0.26.0",
    "pillow",
    "numpy",
    "psutil",
    "tqdm",
    "matplotlib",
    "seaborn",
    "pandas",
    "requests",
    "einops",
    "safetensors"
]

print("🚀 Checking and installing required packages...")
for package in required_packages:
    try:
        install_package(package)
    except Exception as e:
        print(f"⚠️ Warning: Could not install {package}: {e}")

print("\n✨ Package installation complete!")

🚀 Checking and installing required packages...
📦 Installing torch>=2.0.0...
✅ torch>=2.0.0 installed successfully
✅ torchvision already installed
📦 Installing transformers>=4.37.0...
✅ transformers>=4.37.0 installed successfully
📦 Installing peft>=0.8.0...
✅ peft>=0.8.0 installed successfully
📦 Installing accelerate>=0.26.0...
✅ accelerate>=0.26.0 installed successfully
📦 Installing pillow...
✅ pillow installed successfully
✅ numpy already installed
✅ psutil already installed
✅ tqdm already installed
✅ matplotlib already installed
✅ seaborn already installed
✅ pandas already installed
✅ requests already installed
✅ einops already installed
✅ safetensors already installed

✨ Package installation complete!


In [None]:
# Import all required libraries
import torch
import torch.nn as nn
from torch.nn.utils import prune
import torchvision.transforms as transforms
from transformers import (
    Qwen2VLForConditionalGeneration,
    Qwen2VLProcessor,
    AutoTokenizer,
    AutoConfig,
    BitsAndBytesConfig
)
from peft import PeftModel, PeftConfig, get_peft_model, LoraConfig
import accelerate
from accelerate import init_empty_weights, load_checkpoint_and_dispatch
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import psutil
import gc
import time
import json
import warnings
import logging
from pathlib import Path
from typing import Dict, List, Optional, Union, Tuple, Any
from tqdm.auto import tqdm
from datetime import datetime
import requests
from io import BytesIO
import threading
import queue
from dataclasses import dataclass
from contextlib import contextmanager

# Configure warnings and logging
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('cardboard_qc_gpu.log')
    ]
)
logger = logging.getLogger(__name__)

# Set plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("✅ All imports successful!")
print(f"🔥 PyTorch version: {torch.__version__}")
print(f"🤗 Transformers version: {transformers.__version__}")
print(f"🎯 PEFT version: {peft.__version__}")
print(f"⚡ Accelerate version: {accelerate.__version__}")
print(f"🐍 Python version: {sys.version.split()[0]}")

RuntimeError: Failed to import transformers.models.qwen2_vl.modeling_qwen2_vl because of the following error (look up to see its traceback):
cannot import name 'get_num_sms' from 'torch._inductor.utils' (c:\Users\76135\AppData\Local\Programs\Python\Python312\Lib\site-packages\torch\_inductor\utils.py)

: 

## 2. GPU Memory Management and Monitoring

This section provides comprehensive GPU memory monitoring and optimization utilities.

In [None]:
@dataclass
class MemoryStats:
    """Data class for memory statistics."""
    ram_total_gb: float
    ram_available_gb: float
    ram_used_percent: float
    gpu_total_gb: float = 0.0
    gpu_allocated_gb: float = 0.0
    gpu_reserved_gb: float = 0.0
    gpu_free_gb: float = 0.0
    gpu_utilization_percent: float = 0.0

class GPUMemoryManager:
    """Advanced GPU memory management and monitoring."""
    
    def __init__(self):
        self.device_count = torch.cuda.device_count() if torch.cuda.is_available() else 0
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.memory_history = []
        
        if torch.cuda.is_available():
            self.gpu_properties = torch.cuda.get_device_properties(0)
            print(f"🎮 GPU: {self.gpu_properties.name}")
            print(f"💾 GPU Memory: {self.gpu_properties.total_memory / 1024**3:.1f} GB")
            print(f"🔢 GPU Compute Capability: {self.gpu_properties.major}.{self.gpu_properties.minor}")
        else:
            print("⚠️ No CUDA GPU available, using CPU")
    
    def get_memory_stats(self) -> MemoryStats:
        """Get comprehensive memory statistics."""
        # RAM statistics
        ram = psutil.virtual_memory()
        ram_total_gb = ram.total / (1024**3)
        ram_available_gb = ram.available / (1024**3)
        ram_used_percent = ram.percent
        
        # GPU statistics
        gpu_stats = MemoryStats(
            ram_total_gb=ram_total_gb,
            ram_available_gb=ram_available_gb,
            ram_used_percent=ram_used_percent
        )
        
        if torch.cuda.is_available():
            gpu_stats.gpu_total_gb = self.gpu_properties.total_memory / (1024**3)
            gpu_stats.gpu_allocated_gb = torch.cuda.memory_allocated(0) / (1024**3)
            gpu_stats.gpu_reserved_gb = torch.cuda.memory_reserved(0) / (1024**3)
            gpu_stats.gpu_free_gb = gpu_stats.gpu_total_gb - gpu_stats.gpu_allocated_gb
            gpu_stats.gpu_utilization_percent = (gpu_stats.gpu_allocated_gb / gpu_stats.gpu_total_gb) * 100
        
        return gpu_stats
    
    def log_memory_usage(self, context: str = "", save_to_history: bool = True):
        """Log current memory usage with optional context."""
        stats = self.get_memory_stats()
        
        print(f"\n📊 Memory Usage {context}:")
        print(f"  🖥️  RAM: {stats.ram_used_percent:.1f}% ({stats.ram_available_gb:.1f}GB available / {stats.ram_total_gb:.1f}GB total)")
        
        if torch.cuda.is_available():
            print(f"  🎮 GPU: {stats.gpu_utilization_percent:.1f}% ({stats.gpu_allocated_gb:.2f}GB allocated / {stats.gpu_total_gb:.1f}GB total)")
            print(f"  💨 GPU Free: {stats.gpu_free_gb:.2f}GB")
            print(f"  📦 GPU Reserved: {stats.gpu_reserved_gb:.2f}GB")
        
        if save_to_history:
            self.memory_history.append({
                'timestamp': datetime.now().isoformat(),
                'context': context,
                'stats': stats
            })
    
    def optimize_gpu_memory(self, aggressive: bool = False):
        """Optimize GPU memory usage."""
        if not torch.cuda.is_available():
            return
        
        print("🧹 Optimizing GPU memory...")
        
        # Standard cleanup
        gc.collect()
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()
        
        if aggressive:
            # Aggressive cleanup
            with torch.cuda.device(0):
                torch.cuda.empty_cache()
                torch.cuda.reset_peak_memory_stats()
        
        print("✅ GPU memory optimization complete")
    
    def get_optimal_batch_size(self, model_size_gb: float, safety_factor: float = 0.8) -> int:
        """Calculate optimal batch size based on available GPU memory."""
        if not torch.cuda.is_available():
            return 1
        
        stats = self.get_memory_stats()
        available_memory = stats.gpu_free_gb * safety_factor
        
        # Estimate memory per sample (rough approximation)
        memory_per_sample = 0.5  # GB - adjust based on your model and input size
        
        batch_size = max(1, int(available_memory / memory_per_sample))
        print(f"🎯 Recommended batch size: {batch_size} (Available: {available_memory:.1f}GB)")
        
        return min(batch_size, 8)  # Cap at 8 for stability
    
    def plot_memory_history(self, save_path: Optional[str] = None):
        """Plot memory usage history."""
        if not self.memory_history:
            print("No memory history to plot")
            return
        
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
        
        # Extract data
        timestamps = [datetime.fromisoformat(h['timestamp']) for h in self.memory_history]
        ram_usage = [h['stats'].ram_used_percent for h in self.memory_history]
        gpu_usage = [h['stats'].gpu_utilization_percent for h in self.memory_history if torch.cuda.is_available()]
        
        # Plot RAM usage
        ax1.plot(timestamps, ram_usage, 'b-', linewidth=2, marker='o')
        ax1.set_title('RAM Usage Over Time')
        ax1.set_ylabel('RAM Usage (%)')
        ax1.grid(True, alpha=0.3)
        ax1.set_ylim(0, 100)
        
        # Plot GPU usage
        if torch.cuda.is_available() and gpu_usage:
            ax2.plot(timestamps, gpu_usage, 'r-', linewidth=2, marker='s')
            ax2.set_title('GPU Memory Usage Over Time')
            ax2.set_ylabel('GPU Memory Usage (%)')
            ax2.grid(True, alpha=0.3)
            ax2.set_ylim(0, 100)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"📊 Memory usage plot saved to {save_path}")
        
        plt.show()
    
    @contextmanager
    def memory_tracking(self, context: str):
        """Context manager for automatic memory tracking."""
        self.log_memory_usage(f"before {context}")
        try:
            yield
        finally:
            self.log_memory_usage(f"after {context}")

# Initialize memory manager
memory_manager = GPUMemoryManager()
memory_manager.log_memory_usage("initialization")

print("\n✅ GPU Memory Management System initialized!")

NameError: name 'dataclass' is not defined

## 3. Model Configuration and Precision Options

Configure different precision levels and GPU offloading strategies.

In [None]:
class ModelPrecisionConfig:
    """Configuration class for different model precision levels."""
    
    @staticmethod
    def get_fp16_config():
        """Float16 - Good balance of speed and quality."""
        return {
            'torch_dtype': torch.float16,
            'device_map': 'auto',
            'low_cpu_mem_usage': True,
            'trust_remote_code': True,
            'attn_implementation': 'flash_attention_2' if hasattr(torch.nn, 'MultiheadAttention') else 'eager'
        }
    
    @staticmethod
    def get_bf16_config():
        """BFloat16 - Better numerical stability than FP16."""
        return {
            'torch_dtype': torch.bfloat16,
            'device_map': 'auto',
            'low_cpu_mem_usage': True,
            'trust_remote_code': True,
            'attn_implementation': 'flash_attention_2' if hasattr(torch.nn, 'MultiheadAttention') else 'eager'
        }
    
    @staticmethod
    def get_int8_config():
        """8-bit quantization - Significant memory savings."""
        return {
            'quantization_config': BitsAndBytesConfig(
                load_in_8bit=True,
                llm_int8_enable_fp32_cpu_offload=True,
                llm_int8_has_fp16_weight=True
            ),
            'device_map': 'auto',
            'low_cpu_mem_usage': True,
            'trust_remote_code': True
        }
    
    @staticmethod
    def get_int4_config():
        """4-bit quantization - Maximum memory savings."""
        return {
            'quantization_config': BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_compute_dtype=torch.float16,
                bnb_4bit_use_double_quant=True,
                bnb_4bit_quant_type="nf4"
            ),
            'device_map': 'auto',
            'low_cpu_mem_usage': True,
            'trust_remote_code': True
        }
    
    @staticmethod
    def get_cpu_config():
        """CPU-only configuration."""
        return {
            'torch_dtype': torch.float32,
            'device_map': 'cpu',
            'low_cpu_mem_usage': True,
            'trust_remote_code': True
        }
    
    @staticmethod
    def get_config_by_name(name: str):
        """Get configuration by name."""
        configs = {
            'fp16': ModelPrecisionConfig.get_fp16_config(),
            'bf16': ModelPrecisionConfig.get_bf16_config(),
            'int8': ModelPrecisionConfig.get_int8_config(),
            'int4': ModelPrecisionConfig.get_int4_config(),
            'cpu': ModelPrecisionConfig.get_cpu_config()
        }
        
        if name not in configs:
            available = list(configs.keys())
            raise ValueError(f"Unknown config '{name}'. Available: {available}")
        
        return configs[name]
    
    @staticmethod
    def recommend_config(target_memory_gb: float = 6.0) -> str:
        """Recommend best configuration for target memory."""
        if not torch.cuda.is_available():
            return 'cpu'
        
        gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)
        
        if gpu_memory >= 12:
            return 'fp16'  # High-end GPU
        elif gpu_memory >= 8:
            return 'bf16'  # Mid-range GPU
        elif gpu_memory >= 6:
            return 'int8'  # Your RTX 3060
        else:
            return 'int4'  # Low VRAM GPU

# Test precision configurations
print("🔧 Available Precision Configurations:")
for name in ['fp16', 'bf16', 'int8', 'int4', 'cpu']:
    try:
        config = ModelPrecisionConfig.get_config_by_name(name)
        print(f"  ✅ {name.upper()}: {config.get('torch_dtype', 'N/A')}")
    except Exception as e:
        print(f"  ❌ {name.upper()}: {e}")

# Get recommendation for your RTX 3060
recommended = ModelPrecisionConfig.recommend_config(6.0)
print(f"\n🎯 Recommended configuration for RTX 3060: {recommended.upper()}")

## 4. Advanced LoRA Inference Engine

Production-ready LoRA inference engine with full GPU offloading and optimization.

In [None]:
class AdvancedLoRAInferenceEngine:
    """Advanced LoRA inference engine with full GPU optimization."""
    
    def __init__(
        self,
        base_model_path: str,
        lora_path: str,
        precision: str = 'int8',
        enable_compilation: bool = True,
        memory_manager: Optional[GPUMemoryManager] = None
    ):
        self.base_model_path = Path(base_model_path)
        self.lora_path = Path(lora_path)
        self.precision = precision
        self.enable_compilation = enable_compilation
        self.memory_manager = memory_manager or GPUMemoryManager()
        
        # Model components
        self.model = None
        self.tokenizer = None
        self.processor = None
        self.generation_config = None
        
        # State tracking
        self.is_loaded = False
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.compiled_model = None
        
        # Performance tracking
        self.inference_times = []
        self.memory_snapshots = []
        
        logger.info(f"🚀 Advanced LoRA Engine initialized")
        logger.info(f"📁 Base model: {self.base_model_path}")
        logger.info(f"🎯 LoRA adapter: {self.lora_path}")
        logger.info(f"⚙️ Precision: {precision}")
        logger.info(f"🔥 Compilation: {enable_compilation}")
    
    def validate_paths(self) -> bool:
        """Validate that all required paths exist."""
        logger.info("🔍 Validating model paths...")
        
        # Check base model
        if not self.base_model_path.exists():
            logger.error(f"❌ Base model path not found: {self.base_model_path}")
            return False
        
        base_config = self.base_model_path / "config.json"
        if not base_config.exists():
            logger.error(f"❌ Base model config not found: {base_config}")
            return False
        
        # Check LoRA adapter
        if not self.lora_path.exists():
            logger.error(f"❌ LoRA path not found: {self.lora_path}")
            return False
        
        lora_config = self.lora_path / "adapter_config.json"
        lora_weights = self.lora_path / "adapter_model.safetensors"
        
        if not lora_config.exists():
            logger.error(f"❌ LoRA config not found: {lora_config}")
            return False
        
        if not lora_weights.exists():
            logger.error(f"❌ LoRA weights not found: {lora_weights}")
            return False
        
        logger.info("✅ All paths validated successfully")
        return True
    
    def load_model(self, force_reload: bool = False):
        """Load model with optimal GPU configuration."""
        if self.is_loaded and not force_reload:
            logger.info("Model already loaded")
            return
        
        if not self.validate_paths():
            raise RuntimeError("Path validation failed")
        
        logger.info(f"🔄 Loading model with {self.precision} precision...")
        
        with self.memory_manager.memory_tracking("model loading"):
            try:
                # Get precision configuration
                model_config = ModelPrecisionConfig.get_config_by_name(self.precision)
                logger.info(f"📋 Model config: {model_config}")
                
                # Load LoRA configuration
                logger.info("📖 Loading LoRA configuration...")
                peft_config = PeftConfig.from_pretrained(str(self.lora_path))
                logger.info(f"🎯 LoRA target modules: {peft_config.target_modules}")
                logger.info(f"🔢 LoRA rank: {peft_config.r}")
                logger.info(f"📊 LoRA alpha: {peft_config.lora_alpha}")
                
                # Load base model
                logger.info("🏗️ Loading base model (this may take a few minutes)...")
                
                # Special handling for quantized models
                if 'quantization_config' in model_config:
                    # For quantized models, load without device_map first
                    base_model = Qwen2VLForConditionalGeneration.from_pretrained(
                        str(self.base_model_path),
                        quantization_config=model_config['quantization_config'],
                        low_cpu_mem_usage=True,
                        trust_remote_code=True
                    )
                else:
                    # For non-quantized models
                    base_model = Qwen2VLForConditionalGeneration.from_pretrained(
                        str(self.base_model_path),
                        **model_config
                    )
                
                logger.info("✅ Base model loaded successfully!")
                
                # Load LoRA adapter
                logger.info("🎯 Loading LoRA adapter...")
                self.model = PeftModel.from_pretrained(
                    base_model,
                    str(self.lora_path),
                    is_trainable=False
                )
                
                logger.info("✅ LoRA adapter loaded successfully!")
                
                # Load tokenizer and processor
                logger.info("📝 Loading tokenizer and processor...")
                
                try:
                    self.tokenizer = AutoTokenizer.from_pretrained(
                        str(self.lora_path),
                        trust_remote_code=True
                    )
                    self.processor = Qwen2VLProcessor.from_pretrained(
                        str(self.lora_path)
                    )
                except:
                    # Fallback to base model
                    logger.warning("Using base model tokenizer/processor as fallback")
                    self.tokenizer = AutoTokenizer.from_pretrained(
                        str(self.base_model_path),
                        trust_remote_code=True
                    )
                    self.processor = Qwen2VLProcessor.from_pretrained(
                        str(self.base_model_path)
                    )
                
                # Set evaluation mode
                self.model.eval()
                
                # Compile model for optimization (if enabled)
                if self.enable_compilation and torch.cuda.is_available():
                    try:
                        logger.info("🔥 Compiling model for optimization...")
                        self.compiled_model = torch.compile(
                            self.model,
                            mode="reduce-overhead",
                            fullgraph=False
                        )
                        logger.info("✅ Model compilation successful!")
                    except Exception as e:
                        logger.warning(f"⚠️ Model compilation failed: {e}")
                        self.compiled_model = None
                
                # Configure generation parameters
                self.generation_config = {
                    'max_new_tokens': 512,
                    'do_sample': True,
                    'temperature': 0.7,
                    'top_p': 0.9,
                    'top_k': 50,
                    'repetition_penalty': 1.1,
                    'pad_token_id': self.tokenizer.eos_token_id,
                    'use_cache': True
                }
                
                self.is_loaded = True
                logger.info("🎉 Model loading complete!")
                
            except Exception as e:
                logger.error(f"❌ Error loading model: {e}")
                import traceback
                logger.error(traceback.format_exc())
                raise
    
    def unload_model(self):
        """Unload model and free memory."""
        logger.info("🧹 Unloading model...")
        
        if self.model:
            del self.model
        if self.compiled_model:
            del self.compiled_model
        if self.tokenizer:
            del self.tokenizer
        if self.processor:
            del self.processor
        
        self.model = None
        self.compiled_model = None
        self.tokenizer = None
        self.processor = None
        self.is_loaded = False
        
        # Cleanup memory
        self.memory_manager.optimize_gpu_memory(aggressive=True)
        
        logger.info("✅ Model unloaded successfully")
    
    def load_image(self, image_source: Union[str, Path, Image.Image]) -> Image.Image:
        """Load and preprocess image."""
        if isinstance(image_source, Image.Image):
            image = image_source
        elif isinstance(image_source, (str, Path)):
            image_path = Path(image_source)
            if not image_path.exists():
                raise FileNotFoundError(f"Image not found: {image_path}")
            image = Image.open(image_path)
        else:
            raise ValueError("Invalid image source type")
        
        # Convert to RGB if needed
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        return image
    
    def generate_response(
        self,
        prompt: str,
        image: Optional[Union[str, Path, Image.Image]] = None,
        **generation_kwargs
    ) -> Dict[str, Any]:
        """Generate response with performance tracking."""
        if not self.is_loaded:
            raise RuntimeError("Model not loaded. Call load_model() first.")
        
        start_time = time.time()
        
        try:
            with self.memory_manager.memory_tracking("inference"):
                # Prepare input
                messages = []
                
                if image is not None:
                    image_obj = self.load_image(image)
                    messages.append({
                        "role": "user",
                        "content": [
                            {"type": "image", "image": image_obj},
                            {"type": "text", "text": prompt}
                        ]
                    })
                else:
                    messages.append({
                        "role": "user",
                        "content": prompt
                    })
                
                # Apply chat template
                text = self.processor.apply_chat_template(
                    messages,
                    tokenize=False,
                    add_generation_prompt=True
                )
                
                # Process inputs
                if image is not None:
                    image_obj = self.load_image(image)
                    inputs = self.processor(
                        text=[text],
                        images=[image_obj],
                        return_tensors="pt",
                        padding=True
                    )
                else:
                    inputs = self.processor(
                        text=[text],
                        return_tensors="pt",
                        padding=True
                    )
                
                # Move to device
                if torch.cuda.is_available():
                    inputs = inputs.to(self.device)
                
                # Merge generation config
                final_config = {**self.generation_config, **generation_kwargs}
                
                # Generate
                model_to_use = self.compiled_model if self.compiled_model else self.model
                
                with torch.no_grad():
                    generated_ids = model_to_use.generate(
                        **inputs,
                        **final_config
                    )
                
                # Decode response
                input_token_len = inputs.input_ids.shape[1]
                new_tokens = generated_ids[:, input_token_len:]
                
                response = self.tokenizer.decode(
                    new_tokens[0],
                    skip_special_tokens=True
                ).strip()
                
                # Performance tracking
                end_time = time.time()
                inference_time = end_time - start_time
                self.inference_times.append(inference_time)
                
                # Token statistics
                input_tokens = inputs.input_ids.shape[1]
                output_tokens = new_tokens.shape[1]
                total_tokens = input_tokens + output_tokens
                tokens_per_second = output_tokens / inference_time
                
                result = {
                    'response': response,
                    'inference_time': inference_time,
                    'input_tokens': input_tokens,
                    'output_tokens': output_tokens,
                    'total_tokens': total_tokens,
                    'tokens_per_second': tokens_per_second,
                    'prompt': prompt,
                    'has_image': image is not None
                }
                
                logger.info(f"🎯 Generated {output_tokens} tokens in {inference_time:.2f}s ({tokens_per_second:.1f} tok/s)")
                
                return result
                
        except Exception as e:
            logger.error(f"❌ Generation error: {e}")
            raise
    
    def get_performance_stats(self) -> Dict[str, Any]:
        """Get inference performance statistics."""
        if not self.inference_times:
            return {"message": "No inference data available"}
        
        times = np.array(self.inference_times)
        
        return {
            'total_inferences': len(times),
            'avg_time': float(np.mean(times)),
            'median_time': float(np.median(times)),
            'min_time': float(np.min(times)),
            'max_time': float(np.max(times)),
            'std_time': float(np.std(times))
        }

print("✅ Advanced LoRA Inference Engine ready!")

## 5. Cardboard Quality Control Specialization

Specialized prompts and processing for cardboard quality control tasks.

In [None]:
class CardboardQCProcessor:
    """Specialized processor for cardboard quality control tasks."""
    
    # Quality control prompts based on your training data
    QC_PROMPTS = {
        'detailed_analysis': """
Analyze this cardboard image for quality control purposes. Please provide a detailed assessment including:

1. **Overall Quality**: Pass/Fail determination
2. **Visual Defects**: Any visible damage, warping, creases, or deformations
3. **Surface Condition**: Scratches, stains, discoloration, or surface irregularities
4. **Structural Issues**: Bent edges, torn sections, or compromised integrity
5. **Confidence Score**: Your confidence in the assessment (0-100%)

Provide your response in a clear, structured format suitable for quality control documentation.
""".strip(),
        
        'pass_fail_only': """
Examine this cardboard for quality control. Determine if this cardboard should PASS or FAIL quality inspection.

Respond with:
- PASS or FAIL
- Brief reason (1-2 sentences)
- Confidence percentage
""".strip(),
        
        'defect_detection': """
Look at this cardboard image and identify any defects or quality issues. Focus on:
- Warping or bending
- Surface damage
- Structural problems
- Manufacturing defects

List each defect found and its severity (Minor/Major/Critical).
""".strip(),
        
        'comparative_analysis': """
Analyze this cardboard sample and compare it to standard quality expectations for packaging materials.
Rate the following aspects (1-10 scale):
- Structural integrity
- Surface quality
- Overall condition
- Usability for packaging
""".strip()
    }
    
    def __init__(self, inference_engine: AdvancedLoRAInferenceEngine):
        self.engine = inference_engine
        self.qc_history = []
        
    def analyze_cardboard(
        self,
        image_path: Union[str, Path],
        analysis_type: str = 'detailed_analysis',
        custom_prompt: Optional[str] = None
    ) -> Dict[str, Any]:
        """Analyze cardboard quality with specialized prompts."""
        
        # Select prompt
        if custom_prompt:
            prompt = custom_prompt
        elif analysis_type in self.QC_PROMPTS:
            prompt = self.QC_PROMPTS[analysis_type]
        else:
            raise ValueError(f"Unknown analysis type: {analysis_type}")
        
        # Generate analysis
        result = self.engine.generate_response(
            prompt=prompt,
            image=image_path,
            temperature=0.3,  # Lower temperature for more consistent QC results
            max_new_tokens=384
        )
        
        # Parse QC-specific information
        qc_result = self._parse_qc_response(result['response'])
        
        # Combine with generation stats
        full_result = {
            **result,
            'qc_analysis': qc_result,
            'image_path': str(image_path),
            'analysis_type': analysis_type,
            'timestamp': datetime.now().isoformat()
        }
        
        # Save to history
        self.qc_history.append(full_result)
        
        return full_result
    
    def _parse_qc_response(self, response: str) -> Dict[str, Any]:
        """Parse QC response to extract structured information."""
        qc_info = {
            'raw_response': response,
            'pass_fail': None,
            'confidence': None,
            'defects': [],
            'severity': 'unknown'
        }
        
        response_lower = response.lower()
        
        # Extract pass/fail decision
        if 'pass' in response_lower and 'fail' not in response_lower:
            qc_info['pass_fail'] = 'PASS'
        elif 'fail' in response_lower:
            qc_info['pass_fail'] = 'FAIL'
        
        # Extract confidence percentage
        import re
        confidence_match = re.search(r'confidence[:\s]*([0-9]+)%?', response_lower)
        if confidence_match:
            qc_info['confidence'] = int(confidence_match.group(1))
        
        # Extract defects (simple keyword matching)
        defect_keywords = ['warp', 'bend', 'crease', 'tear', 'damage', 'stain', 'scratch', 'dent']
        found_defects = [kw for kw in defect_keywords if kw in response_lower]
        qc_info['defects'] = found_defects
        
        # Determine severity
        if 'critical' in response_lower:
            qc_info['severity'] = 'critical'
        elif 'major' in response_lower:
            qc_info['severity'] = 'major'
        elif 'minor' in response_lower:
            qc_info['severity'] = 'minor'
        elif qc_info['pass_fail'] == 'FAIL':
            qc_info['severity'] = 'major'
        elif found_defects:
            qc_info['severity'] = 'minor'
        else:
            qc_info['severity'] = 'none'
        
        return qc_info
    
    def batch_analyze(
        self,
        image_paths: List[Union[str, Path]],
        analysis_type: str = 'detailed_analysis',
        show_progress: bool = True
    ) -> List[Dict[str, Any]]:
        """Analyze multiple cardboard images in batch."""
        results = []
        
        iterator = tqdm(image_paths, desc="Analyzing cardboard samples") if show_progress else image_paths
        
        for image_path in iterator:
            try:
                result = self.analyze_cardboard(image_path, analysis_type)
                results.append(result)
                
                if show_progress:
                    qc = result['qc_analysis']
                    status = qc.get('pass_fail', 'UNKNOWN')
                    confidence = qc.get('confidence', 'N/A')
                    iterator.set_postfix(status=status, confidence=f"{confidence}%" if confidence else "N/A")
                    
            except Exception as e:
                logger.error(f"❌ Error analyzing {image_path}: {e}")
                results.append({
                    'image_path': str(image_path),
                    'error': str(e),
                    'timestamp': datetime.now().isoformat()
                })
        
        return results
    
    def generate_qc_report(self, save_path: Optional[str] = None) -> Dict[str, Any]:
        """Generate comprehensive QC report from analysis history."""
        if not self.qc_history:
            return {"message": "No QC analysis data available"}
        
        # Analyze results
        total_samples = len(self.qc_history)
        passed = sum(1 for h in self.qc_history if h.get('qc_analysis', {}).get('pass_fail') == 'PASS')
        failed = sum(1 for h in self.qc_history if h.get('qc_analysis', {}).get('pass_fail') == 'FAIL')
        
        # Calculate averages
        avg_inference_time = np.mean([h.get('inference_time', 0) for h in self.qc_history])
        avg_confidence = np.mean([
            h.get('qc_analysis', {}).get('confidence', 0) 
            for h in self.qc_history 
            if h.get('qc_analysis', {}).get('confidence') is not None
        ]) if any(h.get('qc_analysis', {}).get('confidence') for h in self.qc_history) else 0
        
        # Common defects
        all_defects = []
        for h in self.qc_history:
            defects = h.get('qc_analysis', {}).get('defects', [])
            all_defects.extend(defects)
        
        defect_counts = {}
        for defect in all_defects:
            defect_counts[defect] = defect_counts.get(defect, 0) + 1
        
        report = {
            'summary': {
                'total_samples': total_samples,
                'passed': passed,
                'failed': failed,
                'pass_rate': (passed / total_samples) * 100 if total_samples > 0 else 0,
                'avg_inference_time': avg_inference_time,
                'avg_confidence': avg_confidence
            },
            'defect_analysis': {
                'common_defects': sorted(defect_counts.items(), key=lambda x: x[1], reverse=True),
                'total_defects_found': len(all_defects)
            },
            'performance': self.engine.get_performance_stats(),
            'generated_at': datetime.now().isoformat()
        }
        
        # Save report if path provided
        if save_path:
            with open(save_path, 'w') as f:
                json.dump(report, f, indent=2)
            logger.info(f"📄 QC report saved to {save_path}")
        
        return report
    
    def plot_qc_statistics(self, save_path: Optional[str] = None):
        """Plot QC analysis statistics."""
        if not self.qc_history:
            print("No QC data to plot")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # Pass/Fail distribution
        pass_fail_data = [h.get('qc_analysis', {}).get('pass_fail') for h in self.qc_history]
        pass_fail_counts = pd.Series(pass_fail_data).value_counts()
        
        axes[0, 0].pie(pass_fail_counts.values, labels=pass_fail_counts.index, autopct='%1.1f%%')
        axes[0, 0].set_title('Pass/Fail Distribution')
        
        # Confidence scores
        confidences = [
            h.get('qc_analysis', {}).get('confidence', 0) 
            for h in self.qc_history 
            if h.get('qc_analysis', {}).get('confidence') is not None
        ]
        
        if confidences:
            axes[0, 1].hist(confidences, bins=20, alpha=0.7, color='skyblue')
            axes[0, 1].set_title('Confidence Score Distribution')
            axes[0, 1].set_xlabel('Confidence (%)')
            axes[0, 1].set_ylabel('Frequency')
        
        # Inference times
        inference_times = [h.get('inference_time', 0) for h in self.qc_history]
        axes[1, 0].plot(inference_times, marker='o', alpha=0.7)
        axes[1, 0].set_title('Inference Time Over Samples')
        axes[1, 0].set_xlabel('Sample Number')
        axes[1, 0].set_ylabel('Inference Time (s)')
        
        # Common defects
        all_defects = []
        for h in self.qc_history:
            defects = h.get('qc_analysis', {}).get('defects', [])
            all_defects.extend(defects)
        
        if all_defects:
            defect_counts = pd.Series(all_defects).value_counts().head(10)
            defect_counts.plot(kind='bar', ax=axes[1, 1], alpha=0.7)
            axes[1, 1].set_title('Most Common Defects')
            axes[1, 1].set_xlabel('Defect Type')
            axes[1, 1].set_ylabel('Count')
            axes[1, 1].tick_params(axis='x', rotation=45)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"📊 QC statistics plot saved to {save_path}")
        
        plt.show()

print("✅ Cardboard QC Processor ready!")

## 6. Model Initialization and Configuration

Set up your specific model paths and initialize the system.

In [None]:
# Configure your specific model paths
BASE_MODEL_PATH = r"C:\Users\76135\Desktop\ADSYS-Cardboard-AI-Detection\Yolo DS To Qwen DS\Finetuned LoRA model\V1 26-08-2025 Qwen 2.5vl_7b\base_model"
LORA_MODEL_PATH = r"C:\Users\76135\Desktop\ADSYS-Cardboard-AI-Detection\Yolo DS To Qwen DS\Finetuned LoRA model\V1 26-08-2025 Qwen 2.5vl_7b\lora_model"
TEST_IMAGES_PATH = r"C:\Users\76135\Desktop\ADSYS-Cardboard-AI-Detection\test_img"

# Verify paths exist
print("🔍 Verifying model paths...")
base_path = Path(BASE_MODEL_PATH)
lora_path = Path(LORA_MODEL_PATH)
test_path = Path(TEST_IMAGES_PATH)

if not base_path.exists():
    print(f"❌ Base model path not found: {base_path}")
    print("Please update BASE_MODEL_PATH to the correct location")
else:
    print(f"✅ Base model path verified: {base_path}")

if not lora_path.exists():
    print(f"❌ LoRA model path not found: {lora_path}")
    print("Please update LORA_MODEL_PATH to the correct location")
else:
    print(f"✅ LoRA model path verified: {lora_path}")

if not test_path.exists():
    print(f"❌ Test images path not found: {test_path}")
    print("Please update TEST_IMAGES_PATH to the correct location")
else:
    test_images = list(test_path.glob("*.jpg")) + list(test_path.glob("*.jpeg")) + list(test_path.glob("*.png")) + list(test_path.glob("*.JPG"))
    print(f"✅ Test images path verified: {test_path} ({len(test_images)} images found)")

# Choose precision based on your RTX 3060
PRECISION = ModelPrecisionConfig.recommend_config(6.0)  # 6GB VRAM
print(f"\n🎯 Selected precision: {PRECISION.upper()}")

# Option to override precision (uncomment to use different precision)
# PRECISION = 'fp16'  # Use this for maximum quality if memory allows
# PRECISION = 'int4'  # Use this for maximum memory savings

print(f"\n⚙️ Configuration Summary:")
print(f"   Base Model: {BASE_MODEL_PATH}")
print(f"   LoRA Adapter: {LORA_MODEL_PATH}")
print(f"   Test Images: {TEST_IMAGES_PATH}")
print(f"   Precision: {PRECISION}")
print(f"   GPU Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"   GPU Device: {torch.cuda.get_device_name(0)}")
    print(f"   GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

## 7. Load Model with Full GPU Optimization

Initialize and load the model with maximum GPU utilization.

In [None]:
# Initialize the inference engine
print("🚀 Initializing Advanced LoRA Inference Engine...")

# Create the engine with full GPU optimization
engine = AdvancedLoRAInferenceEngine(
    base_model_path=BASE_MODEL_PATH,
    lora_path=LORA_MODEL_PATH,
    precision=PRECISION,
    enable_compilation=True,  # Enable torch.compile for extra speed
    memory_manager=memory_manager
)

print("\n📥 Loading model - this may take several minutes...")
print("💡 This is equivalent to Ollama's num_gpu=999 (full GPU offloading)")

try:
    # Load the model with memory tracking
    start_time = time.time()
    engine.load_model()
    load_time = time.time() - start_time
    
    print(f"\n🎉 Model loaded successfully in {load_time:.1f} seconds!")
    
    # Display final memory usage
    memory_manager.log_memory_usage("model loaded")
    
    # Initialize QC processor
    qc_processor = CardboardQCProcessor(engine)
    print("\n✅ Cardboard QC Processor initialized!")
    print("\n🎯 System ready for cardboard quality control inference!")
    
except Exception as e:
    print(f"❌ Error loading model: {e}")
    print("\n🔧 Troubleshooting tips:")
    print("   1. Check if model paths are correct")
    print("   2. Ensure you have enough GPU/RAM memory")
    print("   3. Try a lower precision (int4 instead of int8)")
    print("   4. Check CUDA installation and compatibility")
    raise

## 8. Test Single Image Analysis

Test the system with a single cardboard image.

In [None]:
# Select a test image
test_images = list(Path(TEST_IMAGES_PATH).glob("*.jpg")) + list(Path(TEST_IMAGES_PATH).glob("*.JPG")) + list(Path(TEST_IMAGES_PATH).glob("*.png"))

if not test_images:
    print(f"❌ No test images found in {TEST_IMAGES_PATH}")
    print("Please add some cardboard images to test with")
else:
    # Use the first available image
    test_image = test_images[0]
    print(f"🖼️ Testing with image: {test_image.name}")
    
    # Display the test image
    img = Image.open(test_image)
    plt.figure(figsize=(10, 6))
    plt.imshow(img)
    plt.title(f"Test Image: {test_image.name}")
    plt.axis('off')
    plt.show()
    
    # Perform detailed QC analysis
    print("\n🔍 Performing detailed quality control analysis...")
    
    try:
        result = qc_processor.analyze_cardboard(
            test_image,
            analysis_type='detailed_analysis'
        )
        
        # Display results
        print("\n📊 Analysis Results:")
        print("=" * 50)
        
        qc_analysis = result['qc_analysis']
        print(f"🎯 Decision: {qc_analysis.get('pass_fail', 'UNKNOWN')}")
        print(f"📈 Confidence: {qc_analysis.get('confidence', 'N/A')}%")
        print(f"⚠️ Defects Found: {', '.join(qc_analysis.get('defects', [])) or 'None'}")
        print(f"🚨 Severity: {qc_analysis.get('severity', 'unknown').upper()}")
        
        print(f"\n⏱️ Performance:")
        print(f"   Inference Time: {result['inference_time']:.2f}s")
        print(f"   Tokens/Second: {result['tokens_per_second']:.1f}")
        print(f"   Input Tokens: {result['input_tokens']}")
        print(f"   Output Tokens: {result['output_tokens']}")
        
        print(f"\n📝 Detailed Analysis:")
        print("-" * 50)
        print(result['response'])
        
    except Exception as e:
        print(f"❌ Analysis failed: {e}")
        import traceback
        print(traceback.format_exc())

## 9. Batch Processing Demo

Process multiple cardboard images efficiently.

In [None]:
# Batch process multiple images
print("📦 Batch Processing Demo")
print("=" * 30)

if len(test_images) > 1:
    # Select first few images for demo (limit to 5 to avoid long processing)
    batch_images = test_images[:min(5, len(test_images))]
    print(f"Processing {len(batch_images)} images...")
    
    # Optimize memory before batch processing
    memory_manager.optimize_gpu_memory()
    
    # Process batch with pass/fail analysis (faster)
    try:
        batch_results = qc_processor.batch_analyze(
            batch_images,
            analysis_type='pass_fail_only',
            show_progress=True
        )
        
        # Summary of results
        print("\n📈 Batch Processing Summary:")
        print("=" * 40)
        
        passed = sum(1 for r in batch_results if r.get('qc_analysis', {}).get('pass_fail') == 'PASS')
        failed = sum(1 for r in batch_results if r.get('qc_analysis', {}).get('pass_fail') == 'FAIL')
        errors = sum(1 for r in batch_results if 'error' in r)
        
        total_time = sum(r.get('inference_time', 0) for r in batch_results if 'inference_time' in r)
        avg_time = total_time / len(batch_results) if batch_results else 0
        
        print(f"✅ Passed: {passed}")
        print(f"❌ Failed: {failed}")
        print(f"⚠️ Errors: {errors}")
        print(f"⏱️ Total Time: {total_time:.1f}s")
        print(f"📊 Average Time: {avg_time:.2f}s per image")
        
        # Detailed results table
        print("\n📋 Detailed Results:")
        print("-" * 80)
        
        for i, result in enumerate(batch_results, 1):
            if 'error' in result:
                print(f"{i:2d}. {Path(result['image_path']).name:25s} ERROR: {result['error']}")
            else:
                qc = result.get('qc_analysis', {})
                status = qc.get('pass_fail', 'UNKNOWN')
                confidence = qc.get('confidence', 'N/A')
                time_taken = result.get('inference_time', 0)
                
                status_icon = "✅" if status == 'PASS' else "❌" if status == 'FAIL' else "❓"
                print(f"{i:2d}. {Path(result['image_path']).name:25s} {status_icon} {status:6s} ({confidence}%) - {time_taken:.2f}s")
        
    except Exception as e:
        print(f"❌ Batch processing failed: {e}")
        import traceback
        print(traceback.format_exc())
        
else:
    print("Only one test image available - add more images for batch processing demo")

## 10. Performance Analysis and Visualization

Analyze system performance and create visualizations.

In [None]:
# Generate comprehensive QC report
print("📊 Generating Performance Analysis...")

# Get QC report
qc_report = qc_processor.generate_qc_report()

if "message" not in qc_report:
    # Display summary
    summary = qc_report['summary']
    print("\n📈 Quality Control Summary:")
    print("=" * 35)
    print(f"Total Samples Analyzed: {summary['total_samples']}")
    print(f"Passed: {summary['passed']} ({summary['pass_rate']:.1f}%)")
    print(f"Failed: {summary['failed']}")
    print(f"Average Inference Time: {summary['avg_inference_time']:.2f}s")
    print(f"Average Confidence: {summary['avg_confidence']:.1f}%")
    
    # Performance statistics
    perf = qc_report['performance']
    print("\n⚡ Performance Statistics:")
    print("=" * 30)
    print(f"Total Inferences: {perf.get('total_inferences', 0)}")
    print(f"Fastest Inference: {perf.get('min_time', 0):.2f}s")
    print(f"Slowest Inference: {perf.get('max_time', 0):.2f}s")
    print(f"Median Time: {perf.get('median_time', 0):.2f}s")
    print(f"Time Std Dev: {perf.get('std_time', 0):.2f}s")
    
    # Defect analysis
    defects = qc_report['defect_analysis']
    if defects['common_defects']:
        print("\n🔍 Common Defects Found:")
        print("=" * 25)
        for defect, count in defects['common_defects'][:5]:  # Top 5
            print(f"  {defect.title()}: {count} occurrences")
    
    # Create visualizations
    print("\n📊 Creating visualizations...")
    qc_processor.plot_qc_statistics()
    
    # Memory usage over time
    print("\n💾 Memory Usage Analysis:")
    memory_manager.plot_memory_history()
    
else:
    print(qc_report['message'])

## 11. Advanced Features Demo

Demonstrate advanced features like custom prompts and comparative analysis.

In [None]:
# Demo advanced features
print("🚀 Advanced Features Demo")
print("=" * 30)

if test_images:
    test_image = test_images[0]
    
    print("\n1️⃣ Custom Prompt Analysis")
    print("-" * 25)
    
    custom_prompt = """
As a quality control expert, examine this cardboard sample and provide:
1. A quality grade (A, B, C, D, or F)
2. Three specific observations about the material condition
3. Recommendation for use (Approved/Conditional/Rejected)
4. Any safety concerns

Format your response clearly with numbered points.
    """.strip()
    
    try:
        custom_result = qc_processor.analyze_cardboard(
            test_image,
            custom_prompt=custom_prompt
        )
        
        print("Custom Analysis Result:")
        print(custom_result['response'])
        print(f"⏱️ Time: {custom_result['inference_time']:.2f}s")
        
    except Exception as e:
        print(f"Custom analysis failed: {e}")
    
    print("\n2️⃣ Defect Detection Focus")
    print("-" * 25)
    
    try:
        defect_result = qc_processor.analyze_cardboard(
            test_image,
            analysis_type='defect_detection'
        )
        
        print("Defect Detection Result:")
        print(defect_result['response'])
        print(f"⏱️ Time: {defect_result['inference_time']:.2f}s")
        
    except Exception as e:
        print(f"Defect detection failed: {e}")
    
    print("\n3️⃣ Comparative Rating")
    print("-" * 20)
    
    try:
        rating_result = qc_processor.analyze_cardboard(
            test_image,
            analysis_type='comparative_analysis'
        )
        
        print("Comparative Rating Result:")
        print(rating_result['response'])
        print(f"⏱️ Time: {rating_result['inference_time']:.2f}s")
        
    except Exception as e:
        print(f"Comparative analysis failed: {e}")
else:
    print("No test images available for advanced demo")

## 12. Memory Optimization and Cleanup

Demonstrate memory management and cleanup procedures.

In [None]:
print("🧹 Memory Management Demo")
print("=" * 30)

# Show current memory usage
print("\n📊 Current Memory Usage:")
memory_manager.log_memory_usage("before cleanup")

# Demonstrate memory optimization
print("\n🔧 Running memory optimization...")
memory_manager.optimize_gpu_memory(aggressive=False)

# Show memory after optimization
print("\n📊 Memory After Optimization:")
memory_manager.log_memory_usage("after optimization")

# Calculate optimal batch size
print("\n🎯 Batch Size Recommendation:")
optimal_batch = memory_manager.get_optimal_batch_size(
    model_size_gb=3.0,  # Approximate model size
    safety_factor=0.7   # Conservative safety factor
)
print(f"Recommended batch size: {optimal_batch}")

# Show memory statistics over time
stats = memory_manager.get_memory_stats()
print(f"\n📈 Current GPU Utilization: {stats.gpu_utilization_percent:.1f}%")
print(f"🆓 Free GPU Memory: {stats.gpu_free_gb:.2f}GB")
print(f"🔄 RAM Usage: {stats.ram_used_percent:.1f}%")

# Save performance report
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = f"cardboard_qc_report_{timestamp}.json"
final_report = qc_processor.generate_qc_report(save_path=report_path)

print(f"\n💾 Performance report saved to: {report_path}")

# Cleanup option (uncomment to unload model)
# print("\n🔄 Unloading model to free memory...")
# engine.unload_model()
# print("✅ Model unloaded - memory freed")

print("\n✨ Memory management demo complete!")

## 13. Production Usage Examples

Examples of how to use this system in production.

In [None]:
print("🏭 Production Usage Examples")
print("=" * 35)

print("""
💡 How to Use This System in Production:

1️⃣ **API Integration**:
   - Wrap the qc_processor in a Flask/FastAPI endpoint
   - Accept image uploads via HTTP POST
   - Return JSON responses with QC results

2️⃣ **Batch Processing Pipeline**:
   - Monitor a directory for new cardboard images
   - Process batches automatically using batch_analyze()
   - Save results to database or CSV files

3️⃣ **Real-time Quality Control**:
   - Integrate with camera systems
   - Process images as they're captured
   - Trigger alerts for failed samples

4️⃣ **Memory Management**:
   - Use memory_manager.memory_tracking() for monitoring
   - Run optimize_gpu_memory() periodically
   - Implement automatic model reloading if memory issues occur

5️⃣ **Performance Optimization**:
   - Use torch.compile() for 10-30% speed improvement
   - Batch similar images together for efficiency
   - Cache frequently used prompts and configurations
""")

# Example production configuration
production_config = {
    'model_settings': {
        'precision': PRECISION,
        'enable_compilation': True,
        'batch_size': memory_manager.get_optimal_batch_size(3.0, 0.7)
    },
    'qc_settings': {
        'default_analysis_type': 'pass_fail_only',  # Fastest for production
        'confidence_threshold': 85,  # Minimum confidence for auto-approval
        'temperature': 0.3,  # Lower temperature for consistent results
        'max_tokens': 256   # Sufficient for QC responses
    },
    'performance_monitoring': {
        'log_memory_every_n_samples': 50,
        'cleanup_memory_every_n_samples': 100,
        'generate_report_every_n_samples': 500
    }
}

print("\n⚙️ Recommended Production Configuration:")
print(json.dumps(production_config, indent=2))

# Performance comparison with Ollama
print("\n📊 Performance Comparison with Ollama:")
print("=" * 45)

if hasattr(qc_processor, 'qc_history') and qc_processor.qc_history:
    avg_time = np.mean([h.get('inference_time', 0) for h in qc_processor.qc_history])
    avg_tokens_per_sec = np.mean([
        h.get('tokens_per_second', 0) for h in qc_processor.qc_history 
        if h.get('tokens_per_second', 0) > 0
    ])
    
    print(f"🚀 Your System Performance:")
    print(f"   Average inference time: {avg_time:.2f}s")
    print(f"   Average tokens/second: {avg_tokens_per_sec:.1f}")
    print(f"   Precision: {PRECISION.upper()}")
    print(f"   GPU utilization: Full offloading (equivalent to num_gpu=999)")
    print(f"\n✅ Benefits over Ollama:")
    print(f"   - Direct LoRA access (no GGUF conversion needed)")
    print(f"   - Fine-grained memory control")
    print(f"   - Custom precision options")
    print(f"   - Specialized QC prompts and parsing")
    print(f"   - Advanced monitoring and analytics")
else:
    print("Run some inference examples above to see performance metrics")

print("\n🎯 Next Steps for Production:")
print("1. Test with your specific cardboard images")
print("2. Tune prompts for your quality standards")
print("3. Set up automated monitoring and alerts")
print("4. Create API endpoints for integration")
print("5. Implement data logging and analytics")

## 14. Troubleshooting and Advanced Configuration

Common issues and their solutions.

In [None]:
print("🔧 Troubleshooting Guide")
print("=" * 25)

print("""
❌ **Out of Memory Errors**:
   Solutions:
   - Switch to lower precision: int4 instead of int8
   - Reduce batch size in generation config
   - Use CPU offloading: device_map='auto' with CPU fallback
   - Run memory_manager.optimize_gpu_memory(aggressive=True)

⏱️ **Slow Inference Speed**:
   Solutions:
   - Ensure torch.compile() is enabled
   - Use fp16 precision if memory allows
   - Reduce max_new_tokens in generation config
   - Enable flash_attention_2 if available
   - Check GPU utilization with nvidia-smi

🔄 **Model Loading Issues**:
   Solutions:
   - Verify all model files are present
   - Check CUDA version compatibility
   - Try loading without quantization first
   - Ensure transformers and peft versions are compatible

📝 **Poor Quality Results**:
   Solutions:
   - Adjust temperature (lower for more consistent results)
   - Modify prompts to be more specific
   - Check if LoRA adapter is properly loaded
   - Verify training data quality and relevance

🔗 **Integration Issues**:
   Solutions:
   - Wrap in try-catch blocks for production
   - Implement health checks and auto-restart
   - Use memory monitoring to prevent crashes
   - Set up logging for debugging
""")

# System diagnostics
print("\n🔍 System Diagnostics")
print("=" * 20)

# Check CUDA status
print(f"CUDA Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA Version: {torch.version.cuda}")
    print(f"GPU Count: {torch.cuda.device_count()}")
    print(f"Current GPU: {torch.cuda.current_device()}")
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

# Check memory status
current_stats = memory_manager.get_memory_stats()
print(f"\nGPU Memory Allocated: {current_stats.gpu_allocated_gb:.2f} GB")
print(f"GPU Memory Free: {current_stats.gpu_free_gb:.2f} GB")
print(f"RAM Usage: {current_stats.ram_used_percent:.1f}%")

# Check package versions
print(f"\nPackage Versions:")
print(f"PyTorch: {torch.__version__}")
print(f"Transformers: {transformers.__version__}")
print(f"PEFT: {peft.__version__}")
print(f"Accelerate: {accelerate.__version__}")

# Check model status
if hasattr(engine, 'is_loaded'):
    print(f"\nModel Status: {'Loaded' if engine.is_loaded else 'Not Loaded'}")
    if engine.is_loaded:
        print(f"Precision: {engine.precision}")
        print(f"Compilation: {'Enabled' if engine.compiled_model else 'Disabled'}")
        print(f"Device: {engine.device}")

print("\n✅ Diagnostics complete")

# Performance tuning suggestions
print("\n🚀 Performance Tuning Suggestions:")
print("=" * 40)

gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3 if torch.cuda.is_available() else 0

if gpu_memory_gb >= 12:
    print("🎯 Your GPU has plenty of memory - consider:")
    print("   - Using fp16 precision for best quality")
    print("   - Increasing batch size for efficiency")
    print("   - Running multiple models simultaneously")
elif gpu_memory_gb >= 8:
    print("⚖️ Your GPU has good memory - current settings are optimal")
    print("   - int8 or bf16 precision recommended")
    print("   - Moderate batch sizes work well")
elif gpu_memory_gb >= 6:
    print("💡 Your RTX 3060 setup - recommendations:")
    print("   - int8 precision (current) is optimal")
    print("   - Keep batch size small (1-2)")
    print("   - Enable aggressive memory cleanup")
    print("   - Consider int4 for maximum memory savings")
else:
    print("⚠️ Limited GPU memory - consider:")
    print("   - int4 precision for maximum savings")
    print("   - CPU offloading for large models")
    print("   - Frequent memory cleanup")

print("\n📚 For more help, check the documentation or create an issue on GitHub")

## 15. Save Configuration and Cleanup

Save your configuration and optionally clean up resources.

In [None]:
print("💾 Saving Configuration and Cleanup")
print("=" * 40)

# Save current configuration
config_to_save = {
    'model_paths': {
        'base_model': BASE_MODEL_PATH,
        'lora_model': LORA_MODEL_PATH,
        'test_images': TEST_IMAGES_PATH
    },
    'model_config': {
        'precision': PRECISION,
        'enable_compilation': True,
        'device': str(engine.device) if hasattr(engine, 'device') else 'auto'
    },
    'system_info': {
        'gpu_name': torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU',
        'gpu_memory_gb': torch.cuda.get_device_properties(0).total_memory / 1024**3 if torch.cuda.is_available() else 0,
        'pytorch_version': torch.__version__,
        'transformers_version': transformers.__version__,
        'peft_version': peft.__version__
    },
    'performance_summary': engine.get_performance_stats() if hasattr(engine, 'get_performance_stats') else {},
    'generated_at': datetime.now().isoformat()
}

# Save configuration
config_filename = f"cardboard_qc_config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(config_filename, 'w') as f:
    json.dump(config_to_save, f, indent=2)

print(f"✅ Configuration saved to: {config_filename}")

# Final memory report
print("\n📊 Final Memory Report:")
memory_manager.log_memory_usage("session end")

# Option to unload model (uncomment to free memory)
print("\n🔄 Model Cleanup Options:")
print("To free GPU memory, run: engine.unload_model()")
print("To optimize memory: memory_manager.optimize_gpu_memory(aggressive=True)")

# Cleanup option
cleanup_choice = input("\n🗑️ Unload model to free memory? (y/n): ").lower().strip()
if cleanup_choice in ['y', 'yes']:
    print("🧹 Unloading model...")
    engine.unload_model()
    print("✅ Model unloaded successfully")
    memory_manager.log_memory_usage("after cleanup")
else:
    print("ℹ️ Model remains loaded for continued use")

print("\n🎉 Cardboard QC GPU Inference Setup Complete!")
print("\n📋 Summary of what you achieved:")
print("   ✅ Full GPU offloading (equivalent to Ollama num_gpu=999)")
print("   ✅ Direct LoRA adapter usage without merging")
print("   ✅ Advanced memory management and monitoring")
print("   ✅ Specialized cardboard quality control prompts")
print("   ✅ Batch processing capabilities")
print("   ✅ Performance tracking and optimization")
print("   ✅ Production-ready error handling")
print("\n🚀 Your system is ready for production cardboard quality control!")

# Final tips
print("\n💡 Next Steps:")
print("1. Test with your production cardboard images")
print("2. Fine-tune prompts for your specific quality standards")
print("3. Set up automated processing pipelines")
print("4. Monitor performance and adjust settings as needed")
print(f"5. Refer to saved configuration: {config_filename}")

print("\n📖 For questions or improvements, check the troubleshooting section above.")