# RF Signal Detection Project
## Spectrogram Processing with Channel/Temporal Slicing for YOLO Training

**Project Context:**
- Pre-processed spectrograms from Georgia Tech pipeline (256×256 RGB)
- Center frequency: 2.437 GHz (Wi-Fi Channel 6)
- Bandwidth: 20 MHz (captures 2.427 - 2.447 GHz)
- Time duration: 410 μs per spectrogram frame

**Target Signals:**
- Bluetooth: 2.402-2.480 GHz (~1 MHz bandwidth, narrow vertical streaks)
- Wi-Fi: 2.437 GHz Channel 6 (20 MHz bandwidth, wide rectangular blocks)
- Zigbee: 2.405-2.480 GHz (~2 MHz bandwidth, medium width bursts)
- Drone: Variable frequency and bandwidth

---
## 1. Environment Setup & Dependencies

In [None]:
# Install required packages
!pip install -q roboflow ultralytics opencv-python matplotlib numpy scikit-learn pyyaml joblib tqdm

print("✅ All packages installed successfully!")

: 

In [None]:
# Import libraries
import os
import json
import shutil
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional

import cv2
import numpy as np
import matplotlib.pyplot as plt
import yaml
from tqdm import tqdm
from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split

from roboflow import Roboflow
from ultralytics import YOLO
from IPython.display import display, Image as IPImage

print("✅ Libraries imported successfully!")

---
## 2. Configuration & Constants

In [None]:
# Project Configuration
@dataclass
class Config:
    """Central configuration for the RF detection pipeline"""
    
    # Hardware settings
    WORKERS: int = 8  # Number of parallel workers
    N_GPUS: int = 1  # Number of GPUs available
    
    # Signal parameters
    BASE_DURATION_US: float = 410  # Duration per spectrogram frame in microseconds
    CENTER_FREQ_GHZ: float = 2.437  # Center frequency in GHz
    BANDWIDTH_MHZ: float = 20  # Bandwidth in MHz
    
    # Bluetooth specific
    BLUETOOTH_BANDWIDTH_MHZ: float = 1  # Bluetooth channel bandwidth
    BLUETOOTH_SLOT_US: float = 625  # Bluetooth time slot duration
    
    # Processing parameters
    NUM_CHANNELS: int = 4  # Number of frequency channels for slicing
    TEMPORAL_OVERLAP: float = 0.75  # Temporal overlap for slicing
    
    # Dataset paths (will be set after download)
    DATASET_PATH: Optional[Path] = None
    OUTPUT_DIR: Optional[Path] = None
    YOLO_DIR: Optional[Path] = None
    
    # Training parameters
    TRAIN_EPOCHS: int = 100
    TRAIN_BATCH_SIZE: int = 16
    IMG_SIZE: int = 640
    PATIENCE: int = 50

# Initialize configuration
config = Config()
print("✅ Configuration initialized")
print(f"   Workers: {config.WORKERS}")
print(f"   Center Frequency: {config.CENTER_FREQ_GHZ} GHz")
print(f"   Bandwidth: {config.BANDWIDTH_MHZ} MHz")

---
## 3. Data Classes & Structures

In [None]:
@dataclass
class BoundingBox:
    """Represents a bounding box annotation"""
    x: float
    y: float
    width: float
    height: float
    category_id: int
    category_name: str
    confidence: float = 1.0
    
    def to_yolo_format(self, img_width: int, img_height: int) -> str:
        """Convert to YOLO format: class_id x_center y_center width height (normalized)"""
        x_center = (self.x + self.width / 2) / img_width
        y_center = (self.y + self.height / 2) / img_height
        norm_width = self.width / img_width
        norm_height = self.height / img_height
        return f"{self.category_id - 1} {x_center} {y_center} {norm_width} {norm_height}"
    
    def to_coco_format(self) -> Dict:
        """Convert to COCO format"""
        return {
            'bbox': [self.x, self.y, self.width, self.height],
            'category_id': self.category_id,
            'area': self.width * self.height
        }

@dataclass
class TimeWindow:
    """Represents a time window for a spectrogram frame"""
    frame_idx: int
    start_us: float
    end_us: float
    duration_us: float
    
    def overlaps_with(self, other: 'TimeWindow') -> bool:
        """Check if this window overlaps with another"""
        return not (self.end_us <= other.start_us or self.start_us >= other.end_us)
    
    def overlap_percent(self, other: 'TimeWindow') -> float:
        """Calculate overlap percentage with another window"""
        if not self.overlaps_with(other):
            return 0.0
        overlap_start = max(self.start_us, other.start_us)
        overlap_end = min(self.end_us, other.end_us)
        overlap_duration = overlap_end - overlap_start
        return 100.0 * overlap_duration / self.duration_us

print("✅ Data structures defined")

---
## 4. Dataset Download & Analysis

In [None]:
def download_dataset(api_key: str) -> Tuple[Path, Dict]:
    """
    Download dataset from Roboflow and load COCO annotations
    
    Args:
        api_key: Roboflow API key
        
    Returns:
        Tuple of (dataset_path, coco_data)
    """
    print("📥 Downloading dataset from Roboflow...")
    
    rf = Roboflow(api_key=api_key)
    project = rf.workspace("intelligent-digital-communications").project("ism-band-results-dataset")
    dataset = project.version(49).download("coco")
    
    dataset_path = Path(dataset.location)
    
    # Find and load COCO annotations
    annotation_files = list(dataset_path.rglob('*.json'))
    train_annotations_path = max(annotation_files, key=lambda f: f.stat().st_size)
    
    with open(train_annotations_path, 'r') as f:
        coco_data = json.load(f)
    
    print(f"✅ Dataset downloaded successfully")
    print(f"   Path: {dataset_path}")
    print(f"   Images: {len(coco_data['images'])}")
    print(f"   Annotations: {len(coco_data['annotations'])}")
    print(f"   Categories: {len(coco_data['categories'])}")
    
    # Display categories
    for cat in coco_data['categories']:
        print(f"     - {cat['name']} (ID: {cat['id']})")
    
    return dataset_path, coco_data

# Download dataset
ROBOFLOW_API_KEY = "V74EfwetgJtOmApcRI4g"  # Replace with your API key
config.DATASET_PATH, coco_data = download_dataset(ROBOFLOW_API_KEY)

# Set output directories
config.OUTPUT_DIR = config.DATASET_PATH.parent / 'dataset_optimized'
config.YOLO_DIR = config.DATASET_PATH.parent / 'yolo_final'

config.OUTPUT_DIR.mkdir(exist_ok=True)
config.YOLO_DIR.mkdir(exist_ok=True)

---
## 5. Temporal Overlap Analysis

In [None]:
def analyze_image_pair(img1_path: Path, img2_path: Path) -> Optional[Dict]:
    """
    Analyze temporal overlap between two sequential spectrogram images
    
    Args:
        img1_path: Path to first image
        img2_path: Path to second image
        
    Returns:
        Dictionary with overlap percentage and correlation, or None if images can't be loaded
    """
    img1 = cv2.imread(str(img1_path), cv2.IMREAD_GRAYSCALE)
    img2 = cv2.imread(str(img2_path), cv2.IMREAD_GRAYSCALE)
    
    if img1 is None or img2 is None:
        return None
    
    width = img1.shape[1]
    best_corr = 0
    best_overlap = 0
    
    # Test different overlap percentages
    for overlap_pct in range(0, 80, 5):
        overlap_width = int(width * overlap_pct / 100)
        if overlap_width < 10:
            continue
        
        # Compare right edge of img1 with left edge of img2
        right_edge = img1[:, -overlap_width:].flatten()
        left_edge = img2[:, :overlap_width].flatten()
        
        # Calculate correlation
        corr = np.corrcoef(right_edge, left_edge)[0, 1]
        
        if corr > best_corr:
            best_corr = corr
            best_overlap = overlap_pct
    
    return {'overlap': best_overlap, 'correlation': best_corr}

def detect_temporal_overlap(dataset_path: Path, n_workers: int = 8) -> float:
    """
    Detect temporal overlap across all sequential images in dataset
    
    Args:
        dataset_path: Path to dataset
        n_workers: Number of parallel workers
        
    Returns:
        Average overlap percentage detected
    """
    print("🔍 Analyzing temporal overlap between frames...")
    
    # Get all images sorted by name (assumes sequential naming)
    all_images = sorted(list(dataset_path.rglob('*.jpg')) + list(dataset_path.rglob('*.png')))
    
    if len(all_images) < 2:
        print("⚠️ Not enough images for overlap analysis")
        return 0.0
    
    # Create pairs of sequential images
    pairs = [(all_images[i], all_images[i+1]) for i in range(len(all_images)-1)]
    
    # Parallel analysis
    results = Parallel(n_jobs=n_workers)(
        delayed(analyze_image_pair)(p[0], p[1]) for p in tqdm(pairs, desc="Analyzing pairs")
    )
    
    # Filter out None results
    results = [r for r in results if r is not None]
    
    if not results:
        print("⚠️ No valid overlap results")
        return 0.0
    
    avg_overlap = np.mean([r['overlap'] for r in results])
    avg_correlation = np.mean([r['correlation'] for r in results])
    
    print(f"✅ Temporal overlap analysis complete")
    print(f"   Average overlap: {avg_overlap:.1f}%")
    print(f"   Average correlation: {avg_correlation:.3f}")
    
    return avg_overlap

# Detect temporal overlap
detected_overlap = detect_temporal_overlap(config.DATASET_PATH, config.WORKERS)

---
## 6. Sliding Window Signal Continuity Check

In [None]:
def check_bluetooth_cutoff(images: List[Path], annotations_dict: Dict, 
                          time_windows: List[TimeWindow]) -> Dict:
    """
    Check if Bluetooth signals are cut off at image boundaries using sliding window approach
    
    Args:
        images: List of image paths (assumed sequential)
        annotations_dict: Dictionary mapping image filename to list of BoundingBox objects
        time_windows: List of TimeWindow objects for each frame
        
    Returns:
        Dictionary with analysis results
    """
    print("\n🔍 Checking for Bluetooth signal cutoffs at image boundaries...")
    
    cutoff_detections = []
    edge_threshold = 0.05  # 5% from edge is considered "at boundary"
    
    for i in range(len(images) - 1):
        img1_name = images[i].name
        img2_name = images[i+1].name
        
        # Get annotations for both images
        bboxes1 = annotations_dict.get(img1_name, [])
        bboxes2 = annotations_dict.get(img2_name, [])
        
        # Load images to get dimensions
        img1 = cv2.imread(str(images[i]))
        if img1 is None:
            continue
            
        img_height, img_width = img1.shape[:2]
        
        # Check Bluetooth signals near right edge of img1
        for bbox in bboxes1:
            if 'bluetooth' not in bbox.category_name.lower():
                continue
            
            # Calculate if bbox extends to right edge
            right_edge = bbox.x + bbox.width
            distance_from_right = img_width - right_edge
            
            if distance_from_right < edge_threshold * img_width:
                # Signal extends to right boundary - check if it continues in next frame
                continues = False
                
                for bbox2 in bboxes2:
                    if 'bluetooth' not in bbox2.category_name.lower():
                        continue
                    
                    # Check if bbox2 starts near left edge
                    if bbox2.x < edge_threshold * img_width:
                        # Check vertical alignment (same frequency channel)
                        vertical_overlap = min(bbox.y + bbox.height, bbox2.y + bbox2.height) - max(bbox.y, bbox2.y)
                        if vertical_overlap > 0.5 * min(bbox.height, bbox2.height):
                            continues = True
                            break
                
                cutoff_detections.append({
                    'frame_idx': i,
                    'frame1': img1_name,
                    'frame2': img2_name,
                    'time_window': time_windows[i],
                    'bbox': bbox,
                    'continues_in_next': continues,
                    'distance_from_edge': distance_from_right
                })
    
    # Analyze results
    total_detections = len(cutoff_detections)
    continuing_signals = sum(1 for d in cutoff_detections if d['continues_in_next'])
    cut_signals = total_detections - continuing_signals
    
    print(f"\n✅ Bluetooth cutoff analysis complete:")
    print(f"   Total Bluetooth signals at boundaries: {total_detections}")
    print(f"   Signals continuing to next frame: {continuing_signals}")
    print(f"   Signals potentially cut off: {cut_signals}")
    
    if total_detections > 0:
        continuity_rate = 100.0 * continuing_signals / total_detections
        print(f"   Continuity rate: {continuity_rate:.1f}%")
    
    return {
        'detections': cutoff_detections,
        'total': total_detections,
        'continuing': continuing_signals,
        'cut_off': cut_signals
    }

print("✅ Bluetooth cutoff detection function defined")

---
## 7. Time Window Calculation

In [None]:
def calculate_time_windows(n_images: int, overlap_pct: float, 
                          base_duration_us: float = 410) -> List[TimeWindow]:
    """
    Calculate absolute time windows for each spectrogram frame
    
    Args:
        n_images: Number of images
        overlap_pct: Overlap percentage between frames
        base_duration_us: Base duration per frame in microseconds
        
    Returns:
        List of TimeWindow objects
    """
    stride_us = base_duration_us * (1 - overlap_pct / 100)
    windows = []
    
    for i in range(n_images):
        start_time = i * stride_us
        end_time = start_time + base_duration_us
        windows.append(TimeWindow(
            frame_idx=i,
            start_us=start_time,
            end_us=end_time,
            duration_us=base_duration_us
        ))
    
    return windows

# Get all images
all_images = sorted(list(config.DATASET_PATH.rglob('*.jpg')) + 
                   list(config.DATASET_PATH.rglob('*.png')))

# Calculate time windows
time_windows = calculate_time_windows(
    len(all_images), 
    detected_overlap, 
    config.BASE_DURATION_US
)

total_recording_time_ms = time_windows[-1]['end_us'] / 1000 if time_windows else 0
stride_us = config.BASE_DURATION_US * (1 - detected_overlap/100)

print(f"\n⏱️ Temporal partitioning:")
print(f"   Total frames: {len(time_windows)}")
print(f"   Frame duration: {config.BASE_DURATION_US} μs")
print(f"   Frame stride: {stride_us:.1f} μs")
print(f"   Total recording time: {total_recording_time_ms:.2f} ms")
print(f"   Bluetooth time slots per frame: {config.BASE_DURATION_US / config.BLUETOOTH_SLOT_US:.2f}")

---
## 8. Load and Process Annotations

In [None]:
def load_annotations_for_image(img_info: Dict, coco_data: Dict) -> List[BoundingBox]:
    """
    Load and convert COCO annotations to BoundingBox objects
    
    Args:
        img_info: Image information from COCO
        coco_data: Complete COCO dataset
        
    Returns:
        List of BoundingBox objects
    """
    img_id = img_info['id']
    cat_id_to_name = {cat['id']: cat['name'] for cat in coco_data['categories']}
    
    img_anns = [ann for ann in coco_data['annotations'] if ann['image_id'] == img_id]
    
    bboxes = []
    for ann in img_anns:
        x, y, w, h = ann['bbox']
        bboxes.append(BoundingBox(
            x=x, y=y, width=w, height=h,
            category_id=ann['category_id'],
            category_name=cat_id_to_name[ann['category_id']]
        ))
    
    return bboxes

# Create image path mapping
print("\n📋 Loading annotations...")
img_name_to_path = {img.name: img for img in all_images}
for img in all_images:
    img_name_to_path[img.stem] = img

# Create annotations dictionary
annotations_dict = {}
for img_info in tqdm(coco_data['images'], desc="Processing annotations"):
    img_name = img_info['file_name']
    annotations_dict[img_name] = load_annotations_for_image(img_info, coco_data)

total_annotations = sum(len(bboxes) for bboxes in annotations_dict.values())
print(f"✅ Annotations loaded: {total_annotations} bounding boxes across {len(annotations_dict)} images")

---
## 9. Run Bluetooth Cutoff Analysis

In [None]:
# Run sliding window analysis for Bluetooth cutoffs
cutoff_analysis = check_bluetooth_cutoff(all_images, annotations_dict, time_windows)

# Display sample cutoff detections
if cutoff_analysis['detections']:
    print("\n📊 Sample cutoff detections:")
    for i, detection in enumerate(cutoff_analysis['detections'][:5]):
        print(f"\n  Detection {i+1}:")
        print(f"    Frame: {detection['frame1']} → {detection['frame2']}")
        print(f"    Time: {detection['time_window'].start_us:.1f} - {detection['time_window'].end_us:.1f} μs")
        print(f"    Continues: {'Yes ✓' if detection['continues_in_next'] else 'No ✗'}")
        print(f"    Distance from edge: {detection['distance_from_edge']:.1f} px")

---
## 10. Visualization

In [None]:
def visualize_cutoff_detection(detection: Dict, output_path: Optional[Path] = None):
    """
    Visualize a Bluetooth cutoff detection across two frames
    
    Args:
        detection: Detection dictionary from cutoff analysis
        output_path: Optional path to save visualization
    """
    img1_path = img_name_to_path.get(detection['frame1'])
    img2_path = img_name_to_path.get(detection['frame2'])
    
    if img1_path is None or img2_path is None:
        print("⚠️ Could not find image paths")
        return
    
    img1 = cv2.imread(str(img1_path))
    img2 = cv2.imread(str(img2_path))
    
    if img1 is None or img2 is None:
        print("⚠️ Could not load images")
        return
    
    img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2RGB)
    img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2RGB)
    
    # Draw bounding box on img1
    bbox = detection['bbox']
    color = (0, 255, 0) if detection['continues_in_next'] else (255, 0, 0)
    cv2.rectangle(img1, 
                 (int(bbox.x), int(bbox.y)), 
                 (int(bbox.x + bbox.width), int(bbox.y + bbox.height)), 
                 color, 2)
    
    # Create side-by-side visualization
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    
    axes[0].imshow(img1)
    axes[0].set_title(f"Frame {detection['frame_idx']}: {detection['frame1']}\n"
                     f"Signal at right edge")
    axes[0].axis('off')
    
    axes[1].imshow(img2)
    axes[1].set_title(f"Frame {detection['frame_idx']+1}: {detection['frame2']}\n"
                     f"Continues: {'Yes' if detection['continues_in_next'] else 'No'}")
    axes[1].axis('off')
    
    status = "Continuous" if detection['continues_in_next'] else "CUT OFF"
    color_text = 'green' if detection['continues_in_next'] else 'red'
    fig.suptitle(f"Bluetooth Signal Analysis: {status}", fontsize=14, color=color_text, weight='bold')
    
    plt.tight_layout()
    
    if output_path:
        plt.savefig(output_path, dpi=150, bbox_inches='tight')
        print(f"✅ Saved visualization to {output_path}")
    
    plt.show()

# Visualize first cutoff detection if available
if cutoff_analysis['detections']:
    print("\n🖼️ Visualizing first cutoff detection...")
    visualize_cutoff_detection(cutoff_analysis['detections'][0])

---
## 11. Summary Statistics

In [None]:
def generate_summary_report():
    """
    Generate comprehensive summary report
    """
    print("\n" + "="*70)
    print("📊 RF SIGNAL DETECTION - SUMMARY REPORT")
    print("="*70)
    
    print("\n📁 DATASET INFORMATION:")
    print(f"   Location: {config.DATASET_PATH}")
    print(f"   Total images: {len(all_images)}")
    print(f"   Total annotations: {total_annotations}")
    
    # Count by category
    category_counts = {}
    for bboxes in annotations_dict.values():
        for bbox in bboxes:
            category_counts[bbox.category_name] = category_counts.get(bbox.category_name, 0) + 1
    
    print("\n   Annotations by category:")
    for cat_name, count in sorted(category_counts.items()):
        print(f"     - {cat_name}: {count}")
    
    print("\n⏱️ TEMPORAL ANALYSIS:")
    print(f"   Detected temporal overlap: {detected_overlap:.1f}%")
    print(f"   Frame duration: {config.BASE_DURATION_US} μs")
    print(f"   Frame stride: {stride_us:.1f} μs")
    print(f"   Total recording duration: {total_recording_time_ms:.2f} ms")
    
    print("\n🎯 BLUETOOTH CUTOFF ANALYSIS:")
    print(f"   Signals at boundaries: {cutoff_analysis['total']}")
    print(f"   Continuous signals: {cutoff_analysis['continuing']}")
    print(f"   Potentially cut signals: {cutoff_analysis['cut_off']}")
    if cutoff_analysis['total'] > 0:
        continuity = 100.0 * cutoff_analysis['continuing'] / cutoff_analysis['total']
        print(f"   Continuity rate: {continuity:.1f}%")
    
    print("\n📡 SIGNAL CHARACTERISTICS:")
    print(f"   Center frequency: {config.CENTER_FREQ_GHZ} GHz")
    print(f"   Bandwidth: {config.BANDWIDTH_MHZ} MHz")
    print(f"   Frequency range: {config.CENTER_FREQ_GHZ - config.BANDWIDTH_MHZ/2000:.3f} - "
          f"{config.CENTER_FREQ_GHZ + config.BANDWIDTH_MHZ/2000:.3f} GHz")
    print(f"   Bluetooth slots per frame: {config.BASE_DURATION_US / config.BLUETOOTH_SLOT_US:.2f}")
    
    print("\n" + "="*70)

# Generate report
generate_summary_report()

---
## 12. Export Results

In [None]:
def export_cutoff_analysis(cutoff_analysis: Dict, output_path: Path):
    """
    Export cutoff analysis results to JSON
    
    Args:
        cutoff_analysis: Analysis results
        output_path: Path to save JSON file
    """
    # Convert to serializable format
    export_data = {
        'summary': {
            'total_detections': cutoff_analysis['total'],
            'continuing_signals': cutoff_analysis['continuing'],
            'cut_off_signals': cutoff_analysis['cut_off'],
            'continuity_rate': 100.0 * cutoff_analysis['continuing'] / cutoff_analysis['total'] 
                              if cutoff_analysis['total'] > 0 else 0.0
        },
        'detections': []
    }
    
    for det in cutoff_analysis['detections']:
        export_data['detections'].append({
            'frame_idx': det['frame_idx'],
            'frame1': det['frame1'],
            'frame2': det['frame2'],
            'time_window_start_us': det['time_window'].start_us,
            'time_window_end_us': det['time_window'].end_us,
            'continues_in_next': det['continues_in_next'],
            'distance_from_edge_px': det['distance_from_edge'],
            'bbox': {
                'x': det['bbox'].x,
                'y': det['bbox'].y,
                'width': det['bbox'].width,
                'height': det['bbox'].height,
                'category': det['bbox'].category_name
            }
        })
    
    with open(output_path, 'w') as f:
        json.dump(export_data, f, indent=2)
    
    print(f"✅ Exported cutoff analysis to {output_path}")

# Export results
export_path = config.OUTPUT_DIR / 'bluetooth_cutoff_analysis.json'
export_cutoff_analysis(cutoff_analysis, export_path)

---
## Next Steps

Now that we've analyzed the dataset and checked for Bluetooth signal cutoffs, the next steps are:

1. **Implement Channel Slicing** - Split spectrograms into frequency channels
2. **Implement Temporal Slicing** - Create overlapping temporal windows
3. **Process Annotations** - Transform annotations for sliced images
4. **Generate YOLO Dataset** - Create train/val splits with proper formatting
5. **Train YOLO Model** - Train object detector on processed dataset

These steps can be added as additional cells in this notebook or in a separate continuation notebook.