In [None]:
# ============================================
# CELL 1: Install Dependencies
# ============================================

import subprocess
import sys

def install_dependencies():
    """Install all required packages"""
    print("📦 Installing dependencies...")
    packages = [
        "ultralytics",
        "roboflow",
        "onnx",
        "onnxruntime",
        "onnx-graphsurgeon",
        "numpy",
        "torch",
        "torchvision"
    ]

    for package in packages:
        print(f"Installing {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package, "-q"])

    print("✅ All dependencies installed!")

# Run installation
install_dependencies()

📦 Installing dependencies...
Installing ultralytics...
Installing roboflow...
Installing onnx...
Installing onnxruntime...
Installing onnx-graphsurgeon...
Installing numpy...
Installing torch...
Installing torchvision...
✅ All dependencies installed!


In [None]:
# ============================================
# CELL 2: Create the Complete Training Script
# ============================================

training_script = r'''#!/usr/bin/env python3
"""
Robust YOLO11 Training Script for Xilinx ZCU104 DPU Deployment
"""

import os
import sys
import yaml
import torch
import shutil
import onnx
import argparse
import logging
import traceback
from pathlib import Path
from collections import defaultdict
from typing import Dict, Set, Tuple, Optional, List
import onnx_graphsurgeon as gs
import numpy as np
from ultralytics import YOLO
from roboflow import Roboflow
import onnxruntime as ort

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('yolo_dpu_training.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Constants
INPUT_SHAPE = (1, 3, 640, 640)
ONNX_OPSET = 17
DEFAULT_EPOCHS = 50
DEFAULT_BATCH = 2
DEFAULT_IMGSZ = 640

# DPU compatibility definitions
DPU_COMPATIBLE_OPS = {
    "Conv", "Relu", "LeakyRelu", "MaxPool", "AveragePool", "GlobalAveragePool",
    "Add", "Mul", "BatchNormalization", "Concat", "Sigmoid", "Resize", "Transpose",
    "Reshape", "Pad", "Clip", "Upsample", "Softmax"
}

REWRITE_ONNX_OPS = {"Div", "Sub"}
STRUCTURAL_ONNX_OPS = {"Split", "Slice"}

# Model configurations
MODEL_CONFIGS = {
    'yolo11n': 'yolo11n.yaml',
    'yolo11s': 'yolo11s.yaml',
    'yolo11m': 'yolo11m.yaml',
    'yolo11l': 'yolo11l.yaml',
    'yolo11x': 'yolo11x.yaml'
}

class DPUModelPatcher:
    """Handles model patching for DPU compatibility"""

    @staticmethod
    def check_pt_model_compatibility(model) -> Tuple[Set[str], Dict[str, int]]:
        """Check PyTorch model for DPU-incompatible layers"""
        unsupported = set()
        counts = defaultdict(int)

        supported_layers = {
            "Conv2d", "ReLU", "LeakyReLU", "MaxPool2d", "AvgPool2d",
            "AdaptiveAvgPool2d", "BatchNorm2d", "Sigmoid", "Upsample",
            "Flatten", "Softmax", "Sequential", "Identity", "Detect",
            "ModuleList", "Linear", "Dropout"
        }

        for name, module in model.named_modules():
            module_type = module.__class__.__name__
            counts[module_type] += 1

            if module_type in ["C3k2", "C3k", "C2PSA", "PSABlock", "Bottleneck", "SPPF", "DFL", "Attention", "SiLU"]:
                continue

            if module_type not in supported_layers:
                unsupported.add(module_type)
                logger.warning(f"Unsupported layer found: {module_type} at {name}")

        return unsupported, counts

    @staticmethod
    def replace_backbone_layers(model, preserve_detect=True):
        """Replace DPU-incompatible backbone layers while preserving Detect head"""

        def create_simple_conv_block(in_ch, out_ch):
            """Create a simple Conv->BN->ReLU block"""
            return torch.nn.Sequential(
                torch.nn.Conv2d(in_ch, out_ch, 3, 1, 1, bias=False),
                torch.nn.BatchNorm2d(out_ch),
                torch.nn.ReLU(inplace=True),
                torch.nn.Conv2d(out_ch, out_ch, 3, 1, 1, bias=False),
                torch.nn.BatchNorm2d(out_ch),
                torch.nn.ReLU(inplace=True),
            )

        def patch_module(module, module_name):
            """Patch individual module based on type"""
            cname = module.__class__.__name__

            if cname == "Detect" and preserve_detect:
                logger.info(f"Preserving Detect head at {module_name}")
                return module

            if cname in ["C3k2", "C3k", "C2PSA", "PSABlock", "Bottleneck", "SPPF"]:
                in_ch = None
                out_ch = None

                for child_name, child in module.named_children():
                    if hasattr(child, 'in_channels'):
                        in_ch = child.in_channels
                        out_ch = child.out_channels
                        break

                if in_ch is None:
                    if hasattr(module, 'cv1') and hasattr(module.cv1, 'conv'):
                        in_ch = module.cv1.conv.in_channels
                        out_ch = module.cv1.conv.out_channels
                    elif hasattr(module, 'conv') and hasattr(module.conv, 'in_channels'):
                        in_ch = module.conv.in_channels
                        out_ch = module.conv.out_channels

                if in_ch is not None and out_ch is not None:
                    logger.info(f"Replacing {cname} at {module_name} with simple conv block ({in_ch}->{out_ch})")
                    return create_simple_conv_block(in_ch, out_ch)
                else:
                    logger.warning(f"Could not determine channels for {cname} at {module_name}, using Identity")
                    return torch.nn.Identity()

            elif cname in ["DFL", "Attention", "SiLU"]:
                logger.info(f"Replacing {cname} at {module_name} with Identity")
                return torch.nn.Identity()

            elif cname == "ModuleList":
                logger.info(f"Converting ModuleList at {module_name} to Sequential")
                return torch.nn.Sequential(*list(module.children()))

            return module

        replaced_count = 0        # number of replacements made
        replacement_log = []      # first few replacements (debug)

        modules_to_replace = []
        for name, module in model.named_modules():
            if name == '':
                continue
            patched = patch_module(module, name)
            if patched is not module:
                modules_to_replace.append((name, module, patched))

        for name, old_module, new_module in modules_to_replace:
            try:
                parts = name.split('.')
                parent = model
                for part in parts[:-1]:
                    parent = getattr(parent, part)

                setattr(parent, parts[-1], new_module)
                replaced_count += 1
                replacement_log.append(f"{name}: {old_module.__class__.__name__} -> {new_module.__class__.__name__}")

            except Exception as e:
                logger.error(f"Failed to replace {name}: {e}")

        logger.info(f"✅ Layer patching complete. Total replaced: {replaced_count}")
        for log_entry in replacement_log[:10]:
            logger.debug(f"  - {log_entry}")
        if len(replacement_log) > 10:
            logger.debug(f"  ... and {len(replacement_log) - 10} more")

        return model, replaced_count

class ONNXPatcher:
    """Handles ONNX model patching for DPU compatibility"""

    @staticmethod
    def check_onnx_compatibility(onnx_path: Path) -> Tuple[Set[str], Dict[str, int]]:
        """Check ONNX model for DPU-incompatible operations"""
        try:
            model = onnx.load(str(onnx_path))
            op_counts = defaultdict(int)

            for node in model.graph.node:
                op_counts[node.op_type] += 1

            unsupported = {op for op in op_counts.keys() if op not in DPU_COMPATIBLE_OPS}

            return unsupported, dict(op_counts)
        except Exception as e:
            logger.error(f"Failed to check ONNX compatibility: {e}")
            return set(), {}

    @staticmethod
    def patch_onnx_ops(onnx_path: Path, output_path: Path) -> Path:
        """Apply gentle ONNX patching (Div->Mul, Sub->Add)"""
        try:
            model = onnx.load(str(onnx_path))
            graph = gs.import_onnx(model)

            rewrites = 0
            warnings = []

            for node in list(graph.nodes):
                if node.op in REWRITE_ONNX_OPS:
                    if len(node.inputs) != 2:
                        warnings.append(f"Skip {node.op} {node.name}: not binary operation")
                        continue

                    a, b = node.inputs

                    if not (isinstance(b, gs.Constant) or (hasattr(b, 'is_const') and b.is_const())):
                        warnings.append(f"Skip {node.op} {node.name}: second input not constant")
                        continue

                    b_values = b.values if isinstance(b, gs.Constant) else np.array(b.values)

                    if node.op == "Div":
                        recip = np.reciprocal(b_values.astype(np.float32))
                        const_name = f"{node.name}_recip"
                        const = gs.Constant(name=const_name, values=recip)

                        new_node = gs.Node(
                            op="Mul",
                            inputs=[a, const],
                            outputs=node.outputs,
                            name=f"{node.name}_as_Mul"
                        )
                        graph.nodes.append(new_node)
                        graph.nodes.remove(node)
                        rewrites += 1
                        logger.debug(f"Replaced Div with Mul at {node.name}")

                    elif node.op == "Sub":
                        neg = (-b_values).astype(np.float32)
                        const_name = f"{node.name}_neg"
                        const = gs.Constant(name=const_name, values=neg)

                        new_node = gs.Node(
                            op="Add",
                            inputs=[a, const],
                            outputs=node.outputs,
                            name=f"{node.name}_as_Add"
                        )
                        graph.nodes.append(new_node)
                        graph.nodes.remove(node)
                        rewrites += 1
                        logger.debug(f"Replaced Sub with Add at {node.name}")

                elif node.op in STRUCTURAL_ONNX_OPS:
                    warnings.append(f"Structural op present: {node.op} ({node.name}) - left intact")

            graph.cleanup().toposort()
            patched_model = gs.export_onnx(graph)
            onnx.save(patched_model, str(output_path))

            logger.info(f"🩹 Patched ONNX saved to {output_path} (rewrites: {rewrites})")
            if warnings:
                logger.info("Patch notes:")
                for warning in warnings[:5]:
                    logger.debug(f"  - {warning}")
                if len(warnings) > 5:
                    logger.debug(f"  ... plus {len(warnings) - 5} more")

            return output_path

        except Exception as e:
            logger.error(f"Failed to patch ONNX: {e}")
            logger.error(traceback.format_exc())
            return onnx_path

class YOLODPUTrainer:
    """Main trainer class for YOLO DPU optimization"""

    def __init__(self, api_key: str = "fN2UlBariD5qcspFvWp9"):
        self.api_key = api_key
        self.dataset = None
        self.patcher = DPUModelPatcher()
        self.onnx_patcher = ONNXPatcher()

    def download_dataset(self):
        """Download dataset from Roboflow"""
        try:
            rf = Roboflow(api_key=self.api_key)
            project = rf.workspace("basketballyolo").project("basketball-hoop-ball-and-player-exknu")
            version = project.version(1)
            self.dataset = version.download("yolov11")
            logger.info(f"✅ Dataset downloaded to {self.dataset.location}")
            return True
        except Exception as e:
            logger.error(f"Failed to download dataset: {e}")
            return False

    def validate_onnx_model(self, onnx_path: Path) -> bool:
        """Validate ONNX model can be loaded in ONNX Runtime"""
        try:
            ort_session = ort.InferenceSession(str(onnx_path))

            dummy_input = np.random.randn(*INPUT_SHAPE).astype(np.float32)
            input_name = ort_session.get_inputs()[0].name
            outputs = ort_session.run(None, {input_name: dummy_input})

            logger.info(f"✅ ONNX model validated successfully: {onnx_path.name}")
            logger.debug(f"  Output shapes: {[out.shape for out in outputs]}")
            return True

        except Exception as e:
            logger.error(f"❌ ONNX validation failed for {onnx_path}: {e}")
            return False

    def train_single_model(
        self,
        model_size: str,
        epochs: int = DEFAULT_EPOCHS,
        batch: int = DEFAULT_BATCH,
        imgsz: int = DEFAULT_IMGSZ
    ) -> Optional[Path]:
        """Train a single YOLO model variant"""

        logger.info(f"\n{'='*60}")
        logger.info(f"Starting training for {model_size.upper()}")
        logger.info(f"{'='*60}")

        output_dir = Path(f"outputs/{model_size}_dpu")
        output_dir.mkdir(parents=True, exist_ok=True)

        try:
            if model_size not in MODEL_CONFIGS:
                logger.error(f"Unknown model size: {model_size}")
                return None

            model_config = MODEL_CONFIGS[model_size]
            logger.info(f"Loading model configuration: {model_config}")

            base_model = YOLO(model_config)

            unsupported_before, counts_before = self.patcher.check_pt_model_compatibility(base_model.model)
            logger.info(f"Initial unsupported layers: {unsupported_before}")

            patched_model, replace_count = self.patcher.replace_backbone_layers(
                base_model.model,
                preserve_detect=True
            )

            unsupported_after, counts_after = self.patcher.check_pt_model_compatibility(patched_model)

            yolo = YOLO()
            yolo.model = patched_model

            logger.info(f"🏋️ Training {model_size} for {epochs} epochs...")
            results = yolo.train(
                data=f"{self.dataset.location}/data.yaml",
                epochs=epochs,
                imgsz=imgsz,
                batch=batch,
                name=f"{model_size}_dpu_run",
                pretrained=False,
                verbose=True
            )

            trained_pt = Path(f"runs/detect/{model_size}_dpu_run/weights/best.pt")
            if not trained_pt.exists():
                trained_pt = Path(f"runs/detect/{model_size}_dpu_run/weights/last.pt")

            if not trained_pt.exists():
                logger.error(f"Could not find trained weights for {model_size}")
                return None

            shutil.copy(str(trained_pt), output_dir / "best.pt")
            logger.info(f"✅ Saved trained weights to {output_dir / 'best.pt'}")

            self.export_onnx_models(trained_pt, model_size, output_dir)

            self.generate_report(
                output_dir,
                model_size,
                unsupported_before,
                unsupported_after,
                counts_after,
                replace_count
            )

            return output_dir / "best.pt"

        except Exception as e:
            logger.error(f"Failed to train {model_size}: {e}")
            logger.error(traceback.format_exc())
            return None

    def export_onnx_models(self, pt_path: Path, model_size: str, output_dir: Path):
        """Export both original and patched ONNX models"""

        try:
            model = YOLO(str(pt_path))

            logger.info(f"📤 Exporting original ONNX for {model_size}...")
            model.export(
                format="onnx",
                opset=ONNX_OPSET,
                dynamic=False,
                simplify=False,
                imgsz=DEFAULT_IMGSZ
            )

            original_onnx = Path(str(pt_path).replace(".pt", ".onnx"))
            if not original_onnx.exists():
                logger.error(f"ONNX export failed for {model_size}")
                return

            original_output = output_dir / f"{model_size}_original.onnx"
            shutil.move(str(original_onnx), str(original_output))

            if self.validate_onnx_model(original_output):
                logger.info(f"✅ Original ONNX exported: {original_output}")

            unsupported_ops, op_counts = self.onnx_patcher.check_onnx_compatibility(original_output)
            logger.info(f"Original ONNX unsupported ops: {unsupported_ops}")

            patched_output = output_dir / f"{model_size}_patched.onnx"
            patched_path = self.onnx_patcher.patch_onnx_ops(original_output, patched_output)

            if self.validate_onnx_model(patched_path):
                logger.info(f"✅ Patched ONNX exported: {patched_path}")

            unsupported_ops_patched, _ = self.onnx_patcher.check_onnx_compatibility(patched_path)
            hard_unsupported = {op for op in unsupported_ops_patched if op not in STRUCTURAL_ONNX_OPS}

            if hard_unsupported:
                logger.warning(f"⚠️ Patched ONNX still has unsupported ops: {hard_unsupported}")
            else:
                logger.info(f"✅ {model_size} ONNX is DPU-compatible after patching")

        except Exception as e:
            logger.error(f"Failed to export ONNX for {model_size}: {e}")
            logger.error(traceback.format_exc())

    def generate_report(
        self,
        output_dir: Path,
        model_size: str,
        unsupported_before: Set[str],
        unsupported_after: Set[str],
        layer_counts: Dict[str, int],
        replace_count: int
    ):
        """Generate compatibility report"""

        report_path = output_dir / "compatibility_report.txt"

        with open(report_path, 'w') as f:
            f.write(f"DPU Compatibility Report for {model_size.upper()}\n")
            f.write("=" * 60 + "\n\n")

            f.write("Model Information:\n")
            f.write(f"  Model Size: {model_size}\n")
            f.write(f"  Input Shape: {INPUT_SHAPE}\n")
            f.write(f"  ONNX Opset: {ONNX_OPSET}\n\n")

            f.write("Layer Replacement Summary:\n")
            f.write(f"  Total Layers Replaced: {replace_count}\n")
            f.write(f"  Unsupported Before: {unsupported_before or 'None'}\n")
            f.write(f"  Unsupported After: {unsupported_after or 'None'}\n\n")

            f.write("Layer Type Counts (After Patching):\n")
            for layer_type, count in sorted(layer_counts.items(), key=lambda x: -x[1]):
                f.write(f"  {layer_type}: {count}\n")

            f.write("\nONNX Export Results:\n")

            original_onnx = output_dir / f"{model_size}_original.onnx"
            patched_onnx = output_dir / f"{model_size}_patched.onnx"

            if original_onnx.exists():
                unsupported_ops, op_counts = self.onnx_patcher.check_onnx_compatibility(original_onnx)
                f.write(f"  Original ONNX:\n")
                f.write(f"    File: {original_onnx.name}\n")
                f.write(f"    Size: {original_onnx.stat().st_size / 1024 / 1024:.2f} MB\n")
                f.write(f"    Unsupported Ops: {unsupported_ops or 'None'}\n")

            if patched_onnx.exists():
                unsupported_ops, op_counts = self.onnx_patcher.check_onnx_compatibility(patched_onnx)
                hard_unsupported = {op for op in unsupported_ops if op not in STRUCTURAL_ONNX_OPS}
                f.write(f"  Patched ONNX:\n")
                f.write(f"    File: {patched_onnx.name}\n")
                f.write(f"    Size: {patched_onnx.stat().st_size / 1024 / 1024:.2f} MB\n")
                f.write(f"    Unsupported Ops: {unsupported_ops or 'None'}\n")
                f.write(f"    Hard Unsupported (non-structural): {hard_unsupported or 'None'}\n")
                f.write(f"    DPU Compatible: {'Yes' if not hard_unsupported else 'No'}\n")

            f.write("\nPerformance Considerations:\n")
            f.write(f"  - Model size {model_size} is {'optimal' if model_size in ['yolo11n', 'yolo11s'] else 'larger'} for DPU\n")
            f.write(f"  - Smaller models (n, s) typically achieve better DPU utilization\n")
            f.write(f"  - Larger models may require more CPU fallback operations\n")

        logger.info(f"📄 Compatibility report saved to {report_path}")

def main():
    parser = argparse.ArgumentParser(
        description="Train YOLO11 models optimized for Xilinx ZCU104 DPU deployment"
    )
    parser.add_argument(
        "--models",
        nargs="+",
        choices=list(MODEL_CONFIGS.keys()),
        default=["yolo11n"],
        help="List of YOLO11 model sizes to train (default: yolo11n)"
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=DEFAULT_EPOCHS,
        help=f"Number of training epochs (default: {DEFAULT_EPOCHS})"
    )
    parser.add_argument(
        "--batch",
        type=int,
        default=DEFAULT_BATCH,
        help=f"Batch size for training (default: {DEFAULT_BATCH})"
    )
    parser.add_argument(
        "--imgsz",
        type=int,
        default=DEFAULT_IMGSZ,
        help=f"Image size for training (default: {DEFAULT_IMGSZ})"
    )
    parser.add_argument(
        "--api-key",
        type=str,
        default="fN2UlBariD5qcspFvWp9",
        help="Roboflow API key"
    )
    parser.add_argument(
        "--skip-download",
        action="store_true",
        help="Skip dataset download if already present"
    )

    args = parser.parse_args()

    trainer = YOLODPUTrainer(api_key=args.api_key)

    if not args.skip_download:
        if not trainer.download_dataset():
            logger.error("Failed to download dataset. Exiting.")
            sys.exit(1)
    else:
        trainer.dataset = type('obj', (object,), {'location': 'basketball-hoop-ball-and-player-exknu-1'})()
        logger.info("Skipping dataset download, using existing dataset")

    successful_models = []
    failed_models = []

    for model_size in args.models:
        logger.info(f"\n{'#'*60}")
        logger.info(f"Processing model: {model_size}")
        logger.info(f"{'#'*60}")

        result = trainer.train_single_model(
            model_size=model_size,
            epochs=args.epochs,
            batch=args.batch,
            imgsz=args.imgsz
        )

        if result:
            successful_models.append(model_size)
        else:
            failed_models.append(model_size)

    logger.info("\n" + "="*60)
    logger.info("TRAINING SUMMARY")
    logger.info("="*60)
    logger.info(f"✅ Successfully trained: {successful_models}")
    if failed_models:
        logger.info(f"❌ Failed: {failed_models}")

    logger.info("\nOutput structure:")
    for model_size in successful_models:
        output_dir = Path(f"outputs/{model_size}_dpu")
        if output_dir.exists():
            logger.info(f"\n{output_dir}/")
            for file in output_dir.iterdir():
                logger.info(f"  ├── {file.name}")

    logger.info("\n✅ Training pipeline complete!")

if __name__ == "__main__":
    main()
'''

# Write the training script to file
with open('train_yolo_dpu.py', 'w') as f:
    f.write(training_script)

print("✅ Training script created: train_yolo_dpu.py")


✅ Training script created: train_yolo_dpu.py


In [None]:
# ============================================
# CELL 3: Run the Training Command (streamed)
# ============================================
import os, sys, subprocess

print("🚀 Starting YOLO11 DPU Training Pipeline")
print("=" * 60)
print("Training 3 models: yolo11n, yolo11s, yolo11m")
print("Configuration: 50 epochs, batch size 2")
print("=" * 60)

cmd = [
    sys.executable, "-u", "train_yolo_dpu.py",
    "--models", "yolo11n", "yolo11s", "yolo11m",
    "--epochs", "50", "--batch", "2"
]

env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env["TERM"] = "xterm"  # helps tqdm formatting

with subprocess.Popen(
    cmd,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True,
    bufsize=1,
    env=env,
) as p:
    for line in p.stdout:
        print(line, end="")
    rc = p.wait()

print("\n✅ Training completed successfully!" if rc == 0 else f"\n⚠️ Training completed with return code: {rc}")


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
      44/50      2.03G      1.059     0.6299      1.165         31        640:   7%|▋         | 30/452 [00:02<00:38, 11.07it/s]
      44/50      2.03G      1.059     0.6299      1.165         31        640:   7%|▋         | 32/452 [00:02<00:38, 11.01it/s]
      44/50      2.03G      1.062     0.6391      1.172         33        640:   7%|▋         | 32/452 [00:02<00:38, 11.01it/s]
      44/50      2.03G      1.058     0.6363      1.173         34        640:   7%|▋         | 32/452 [00:03<00:38, 11.01it/s]
      44/50      2.03G      1.058     0.6363      1.173         34        640:   8%|▊         | 34/452 [00:03<00:38, 10.87it/s]
      44/50      2.03G      1.057     0.6358      1.174         31        640:   8%|▊         | 34/452 [00:03<00:38, 10.87it/s]
      44/50      2.03G      1.052     0.6324       1.17         30        640:   8%|▊         | 34/452 [00:03<00:38, 10.87it/s]
      44/50      2.03G      1.052     0

In [None]:
# ============================================
# CELL 4: Create ZIP Archive of Results
# ============================================
import zipfile
import os
from pathlib import Path

print("\n📦 Creating ZIP archive of results...")

zip_filename = "yolo11_dpu_models.zip"

try:
    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Add outputs directory
        outputs_dir = Path("outputs")
        if outputs_dir.exists():
            for root, dirs, files in os.walk(outputs_dir):
                for file in files:
                    file_path = os.path.join(root, file)
                    arcname = os.path.relpath(file_path, start=".")
                    zipf.write(file_path, arcname)
                    print(f"  Added: {arcname}")

        # Add training log
        if Path("yolo_dpu_training.log").exists():
            zipf.write("yolo_dpu_training.log")
            print(f"  Added: yolo_dpu_training.log")

    print(f"✅ Archive created: {zip_filename}")
    print(f"   Size: {os.path.getsize(zip_filename) / 1024 / 1024:.2f} MB")

except Exception as e:
    print(f"❌ Failed to create archive: {e}")


📦 Creating ZIP archive of results...
  Added: outputs/yolo11s_dpu/compatibility_report.txt
  Added: outputs/yolo11s_dpu/yolo11s_original.onnx
  Added: outputs/yolo11s_dpu/best.pt
  Added: outputs/yolo11s_dpu/yolo11s_patched.onnx
  Added: outputs/yolo11n_dpu/compatibility_report.txt
  Added: outputs/yolo11n_dpu/best.pt
  Added: outputs/yolo11n_dpu/yolo11n_patched.onnx
  Added: outputs/yolo11n_dpu/yolo11n_original.onnx
  Added: outputs/yolo11m_dpu/yolo11m_original.onnx
  Added: outputs/yolo11m_dpu/compatibility_report.txt
  Added: outputs/yolo11m_dpu/yolo11m_patched.onnx
  Added: outputs/yolo11m_dpu/best.pt
  Added: yolo_dpu_training.log
✅ Archive created: yolo11_dpu_models.zip
   Size: 269.10 MB


In [None]:
# ============================================
# CELL 5: Download Results (Google Colab Only)
# ============================================
try:
    from google.colab import files
    import os

    print("\n📥 Preparing files for download...")

    if os.path.exists(zip_filename):
        print(f"Archive size: {os.path.getsize(zip_filename) / 1024 / 1024:.2f} MB")
        print("\nDownloading archive...")
        files.download(zip_filename)
        print("✅ Download started! Check your browser downloads.")
    else:
        print("❌ ZIP archive not found. Checking individual outputs...")

        outputs_dir = Path("outputs")
        if outputs_dir.exists():
            for model_dir in outputs_dir.iterdir():
                if model_dir.is_dir():
                    print(f"\nFound model outputs: {model_dir.name}")
                    for file in model_dir.iterdir():
                        if file.suffix in ['.pt', '.onnx', '.txt']:
                            print(f"  Downloading: {file.name}")
                            files.download(str(file))

except ImportError:
    print("\n📥 Not running in Google Colab - files saved locally")
    print(f"   Archive location: {os.path.abspath(zip_filename)}")


📥 Preparing files for download...
Archive size: 269.10 MB

Downloading archive...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Download started! Check your browser downloads.


In [None]:
# ============================================
# CELL 6: Verify Results and Display Summary
# ============================================
from pathlib import Path

print("\n" + "="*60)
print("📊 FINAL TRAINING SUMMARY")
print("="*60)

outputs_dir = Path("outputs")
model_summary = []

if outputs_dir.exists():
    for model_dir in sorted(outputs_dir.iterdir()):
        if model_dir.is_dir():
            model_name = model_dir.name.replace("_dpu", "")
            print(f"\n📁 {model_dir.name}/")

            # Check for key files
            best_pt = model_dir / "best.pt"
            original_onnx = list(model_dir.glob("*_original.onnx"))
            patched_onnx = list(model_dir.glob("*_patched.onnx"))
            report = model_dir / "compatibility_report.txt"

            files_found = []
            dpu_compatible = False

            if best_pt.exists():
                size_mb = best_pt.stat().st_size / 1024 / 1024
                print(f"  ✅ best.pt ({size_mb:.2f} MB)")
                files_found.append("weights")

            if original_onnx:
                size_mb = original_onnx[0].stat().st_size / 1024 / 1024
                print(f"  ✅ {original_onnx[0].name} ({size_mb:.2f} MB)")
                files_found.append("original_onnx")

            if patched_onnx:
                size_mb = patched_onnx[0].stat().st_size / 1024 / 1024
                print(f"  ✅ {patched_onnx[0].name} ({size_mb:.2f} MB)")
                files_found.append("patched_onnx")

            if report.exists():
                print(f"  ✅ compatibility_report.txt")
                files_found.append("report")

                # Extract DPU compatibility status
                with open(report, 'r') as f:
                    content = f.read()
                    if "DPU Compatible: Yes" in content:
                        dpu_compatible = True
                        print(f"  📌 DPU Compatible: ✅ YES")
                    else:
                        print(f"  📌 DPU Compatible: ⚠️ NO (may need additional optimization)")

            model_summary.append({
                'model': model_name,
                'complete': len(files_found) == 4,
                'dpu_compatible': dpu_compatible
            })
else:
    print("❌ No outputs directory found!")

# Final Summary
print("\n" + "="*60)
print("🎯 DEPLOYMENT READINESS")
print("="*60)

if model_summary:
    ready_models = [m for m in model_summary if m['complete'] and m['dpu_compatible']]
    partial_models = [m for m in model_summary if m['complete'] and not m['dpu_compatible']]
    incomplete_models = [m for m in model_summary if not m['complete']]

    if ready_models:
        print("\n✅ Ready for DPU deployment:")
        for m in ready_models:
            print(f"   - {m['model']}")

    if partial_models:
        print("\n⚠️ Trained but may need optimization:")
        for m in partial_models:
            print(f"   - {m['model']}")

    if incomplete_models:
        print("\n❌ Incomplete training:")
        for m in incomplete_models:
            print(f"   - {m['model']}")

    print(f"\nTotal models processed: {len(model_summary)}")
    print(f"DPU-ready models: {len(ready_models)}")
else:
    print("No models were successfully processed.")

print("\n" + "="*60)
print("✅ Pipeline execution complete!")
print("=" * 60)
print("\nNext steps:")
print("1. Download the yolo11_dpu_models.zip archive")
print("2. Extract and review the compatibility reports")
print("3. Use the patched ONNX models for Vitis AI quantization")
print("4. Deploy optimized models to Xilinx ZCU104")
print("=" * 60)


📊 FINAL TRAINING SUMMARY

📁 yolo11m_dpu/
  ✅ best.pt (38.62 MB)
  ✅ yolo11m_original.onnx (76.67 MB)
  ✅ yolo11m_patched.onnx (76.67 MB)
  ✅ compatibility_report.txt
  📌 DPU Compatible: ⚠️ NO (may need additional optimization)

📁 yolo11n_dpu/
  ✅ best.pt (15.32 MB)
  ✅ yolo11n_original.onnx (10.08 MB)
  ✅ yolo11n_patched.onnx (10.08 MB)
  ✅ compatibility_report.txt
  📌 DPU Compatible: ⚠️ NO (may need additional optimization)

📁 yolo11s_dpu/
  ✅ best.pt (18.27 MB)
  ✅ yolo11s_original.onnx (36.14 MB)
  ✅ yolo11s_patched.onnx (36.14 MB)
  ✅ compatibility_report.txt
  📌 DPU Compatible: ⚠️ NO (may need additional optimization)

🎯 DEPLOYMENT READINESS

⚠️ Trained but may need optimization:
   - yolo11m
   - yolo11n
   - yolo11s

Total models processed: 3
DPU-ready models: 0

✅ Pipeline execution complete!

Next steps:
1. Download the yolo11_dpu_models.zip archive
2. Extract and review the compatibility reports
3. Use the patched ONNX models for Vitis AI quantization
4. Deploy optimized mod