# AMD AI Compute Observatory - Custom Collectors

This notebook demonstrates how to create custom data collectors for specialized profiling needs.

Â© 2026 Sudheer Ibrahim Daniel Devu. All Rights Reserved.

## Setup

In [None]:
import aaco
from aaco.collectors import BaseCollector
import time
import numpy as np

print(f"AACO version: {aaco.__version__}")

## The BaseCollector Interface

All collectors in AACO inherit from `BaseCollector` and implement these methods:

- `start()` - Begin data collection
- `stop()` - End data collection
- `reset()` - Clear collected data
- `get_results()` - Return collected data

In [None]:
# View the BaseCollector interface
from abc import ABC, abstractmethod

class BaseCollector(ABC):
    """Abstract base class for all data collectors."""
    
    def __init__(self):
        self._data = {}
    
    @abstractmethod
    def start(self) -> None:
        """Begin collecting data."""
        pass
    
    @abstractmethod
    def stop(self) -> None:
        """Stop collecting data."""
        pass
    
    @abstractmethod
    def reset(self) -> None:
        """Clear all collected data."""
        pass
    
    @abstractmethod
    def get_results(self) -> dict:
        """Return collected data."""
        pass

## Creating a Custom Collector

Let's create a collector that tracks CPU usage during profiling.

In [None]:
import psutil
import threading

class CPUUsageCollector(BaseCollector):
    """Collector for CPU usage metrics."""
    
    def __init__(self, interval: float = 0.1):
        """
        Initialize the CPU usage collector.
        
        Args:
            interval: Sampling interval in seconds
        """
        super().__init__()
        self.interval = interval
        self._samples = []
        self._running = False
        self._thread = None
    
    def _sample(self):
        """Background thread for sampling."""
        while self._running:
            self._samples.append({
                "timestamp": time.time(),
                "cpu_percent": psutil.cpu_percent(interval=None),
                "cpu_count": psutil.cpu_count(),
                "memory_percent": psutil.virtual_memory().percent,
            })
            time.sleep(self.interval)
    
    def start(self):
        """Start collecting CPU metrics."""
        self._running = True
        self._thread = threading.Thread(target=self._sample)
        self._thread.daemon = True
        self._thread.start()
    
    def stop(self):
        """Stop collecting CPU metrics."""
        self._running = False
        if self._thread:
            self._thread.join(timeout=1.0)
    
    def reset(self):
        """Clear all samples."""
        self._samples = []
    
    def get_results(self) -> dict:
        """Return collected CPU metrics."""
        if not self._samples:
            return {}
        
        cpu_values = [s["cpu_percent"] for s in self._samples]
        mem_values = [s["memory_percent"] for s in self._samples]
        
        return {
            "sample_count": len(self._samples),
            "cpu_mean": np.mean(cpu_values),
            "cpu_max": np.max(cpu_values),
            "cpu_min": np.min(cpu_values),
            "memory_mean": np.mean(mem_values),
            "samples": self._samples,
        }

print("CPUUsageCollector defined!")

## Using the Custom Collector

In [None]:
# Create collector instance
cpu_collector = CPUUsageCollector(interval=0.05)  # Sample every 50ms

# Start collection
cpu_collector.start()

# Simulate some CPU workload
import math
for i in range(100000):
    _ = math.sin(i) * math.cos(i)

# Stop collection
cpu_collector.stop()

# Get results
results = cpu_collector.get_results()
print(f"Samples collected: {results['sample_count']}")
print(f"CPU Usage - Mean: {results['cpu_mean']:.1f}%, Max: {results['cpu_max']:.1f}%, Min: {results['cpu_min']:.1f}%")
print(f"Memory Usage - Mean: {results['memory_mean']:.1f}%")

## Network I/O Collector Example

In [None]:
class NetworkIOCollector(BaseCollector):
    """Collector for network I/O metrics."""
    
    def __init__(self):
        super().__init__()
        self._start_counters = None
        self._end_counters = None
    
    def start(self):
        """Record starting network counters."""
        counters = psutil.net_io_counters()
        self._start_counters = {
            "bytes_sent": counters.bytes_sent,
            "bytes_recv": counters.bytes_recv,
            "packets_sent": counters.packets_sent,
            "packets_recv": counters.packets_recv,
            "timestamp": time.time(),
        }
    
    def stop(self):
        """Record ending network counters."""
        counters = psutil.net_io_counters()
        self._end_counters = {
            "bytes_sent": counters.bytes_sent,
            "bytes_recv": counters.bytes_recv,
            "packets_sent": counters.packets_sent,
            "packets_recv": counters.packets_recv,
            "timestamp": time.time(),
        }
    
    def reset(self):
        """Clear counters."""
        self._start_counters = None
        self._end_counters = None
    
    def get_results(self) -> dict:
        """Return network I/O metrics."""
        if not self._start_counters or not self._end_counters:
            return {}
        
        duration = self._end_counters["timestamp"] - self._start_counters["timestamp"]
        
        return {
            "duration_s": duration,
            "bytes_sent": self._end_counters["bytes_sent"] - self._start_counters["bytes_sent"],
            "bytes_recv": self._end_counters["bytes_recv"] - self._start_counters["bytes_recv"],
            "packets_sent": self._end_counters["packets_sent"] - self._start_counters["packets_sent"],
            "packets_recv": self._end_counters["packets_recv"] - self._start_counters["packets_recv"],
            "send_rate_mbps": (self._end_counters["bytes_sent"] - self._start_counters["bytes_sent"]) * 8 / duration / 1e6 if duration > 0 else 0,
            "recv_rate_mbps": (self._end_counters["bytes_recv"] - self._start_counters["bytes_recv"]) * 8 / duration / 1e6 if duration > 0 else 0,
        }

print("NetworkIOCollector defined!")

## Combining Multiple Collectors

In [None]:
class CompositeCollector(BaseCollector):
    """Collector that combines multiple collectors."""
    
    def __init__(self, collectors: dict):
        """
        Initialize with named collectors.
        
        Args:
            collectors: Dict of name -> collector instance
        """
        super().__init__()
        self.collectors = collectors
    
    def start(self):
        """Start all collectors."""
        for collector in self.collectors.values():
            collector.start()
    
    def stop(self):
        """Stop all collectors."""
        for collector in self.collectors.values():
            collector.stop()
    
    def reset(self):
        """Reset all collectors."""
        for collector in self.collectors.values():
            collector.reset()
    
    def get_results(self) -> dict:
        """Return combined results from all collectors."""
        return {
            name: collector.get_results()
            for name, collector in self.collectors.items()
        }


# Usage example
composite = CompositeCollector({
    "cpu": CPUUsageCollector(interval=0.1),
    "network": NetworkIOCollector(),
})

composite.start()
time.sleep(0.5)  # Simulate workload
composite.stop()

results = composite.get_results()
print("Combined Results:")
for name, data in results.items():
    print(f"\n{name}:")
    for key, value in data.items():
        if key != "samples":  # Skip large sample arrays
            print(f"  {key}: {value}")

## Registering Custom Collectors with AACO

In [None]:
# Register custom collector with AACO
# aaco.collectors.register('cpu_usage', CPUUsageCollector)
# aaco.collectors.register('network_io', NetworkIOCollector)

# Then use in profiling
# session = obs.profile(
#     model="model.onnx",
#     collectors=["timing", "memory", "cpu_usage", "network_io"]
# )

print("Custom collectors can be registered with AACO for use in profiling sessions.")

## Best Practices for Custom Collectors

1. **Thread Safety**: Use proper synchronization when collecting data in background threads
2. **Low Overhead**: Minimize the performance impact of data collection
3. **Error Handling**: Gracefully handle failures without crashing the profiling session
4. **Memory Management**: Limit sample storage to prevent memory issues
5. **Documentation**: Document the metrics collected and their units

## Next Steps

- Explore [Laboratory Mode](04_laboratory_mode.ipynb) for kernel-level eBPF-based collection
- Learn about integrating collectors with CI/CD pipelines