# 🚀 Focoos Edge Model Testing Notebook


In [1]:
# Import required libraries
import os
import sys
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import time


# Load the trained model from edge_output (relative to workspace root)
# Find workspace root by looking for pyproject.toml or .git directory
def find_workspace_root():
    """Find the workspace root directory."""
    current_path = Path.cwd()
    
    # Look for workspace indicators
    for path in [current_path] + list(current_path.parents):
        if (path / "pyproject.toml").exists() or (path / ".git").exists() or (path / "README.md").exists():
            return path
    
    # Fallback to current directory
    return current_path

# Add project root to path for imports
project_root = find_workspace_root()

# Load environment variables
try:
    from dotenv import load_dotenv
    env_path = project_root / ".env"
    if env_path.exists():
        load_dotenv(env_path)
        print(f"📄 Loaded environment variables from: {env_path}")
    else:
        print("⚠️  .env file not found")
except ImportError:
    print("💡 Install python-dotenv: pip install python-dotenv")

print("✅ Environment setup complete!")

workspace_root = find_workspace_root()

📄 Loaded environment variables from: /Users/u464645/Documents/projects/hackatons/project/sample/.env
✅ Environment setup complete!


In [2]:


workspace_root = find_workspace_root()
model_path = workspace_root / "edge_output" / "model.onnx"

print(f"🏠 Workspace root: {workspace_root}")
print(f"🔍 Looking for model at: {model_path}")

if model_path.exists():
    print(f"✅ Found trained model at: {model_path}")
    
    # Import Focoos inference
    try:
        from focoos import InferModel
        from focoos.ports import DatasetLayout, RuntimeType, Task

        # Load the exported TorchScript model
        print("📥 Loading trained edge model...")
        model = InferModel(str(model_path), runtime_type=RuntimeType.ONNX_CPU)  # Convert Path to string for InferModel
        print("✅ Model loaded successfully!")
        
        # Display model info
        print(f"\n📊 Model Information:")
        print(f"   Model file: {model_path}")
        print(f"   File size: {model_path.stat().st_size / 1024:.1f} KB")
        print(f"   Runtime: TorchScript (edge-optimized)")
        print(f"   Target device: Arduino Nicla Vision")
        
    except ImportError as e:
        print(f"❌ Import error: {e}")
        print("💡 Make sure focoos is installed: pip install focoos")
        model = None
    except Exception as e:
        print(f"❌ Failed to load model: {e}")
        model = None
        
else:
    print(f"❌ Model not found at: {model_path}")
    print("💡 Run training first from workspace root:")
    print("   source .venv/bin/activate && python src/workflows/edge_workflow.py --model fai-cls-n-coco --task classification --dataset rock-paper-scissors-classification")
    model = None

🏠 Workspace root: /Users/u464645/Documents/projects/hackatons/project/sample
🔍 Looking for model at: /Users/u464645/Documents/projects/hackatons/project/sample/edge_output/model.onnx
❌ Model not found at: /Users/u464645/Documents/projects/hackatons/project/sample/edge_output/model.onnx
💡 Run training first from workspace root:
   source .venv/bin/activate && python src/workflows/edge_workflow.py --model fai-cls-n-coco --task classification --dataset rock-paper-scissors-classification


In [3]:
# Show available datasets and their structure (from workspace root)
def show_available_datasets():
    """Display all available datasets and their structure."""
    # Use workspace root to find datasets directory
    datasets_base = workspace_root / "datasets"
    
    if not datasets_base.exists():
        print("📁 No datasets directory found at workspace root")
        print(f"   Expected: {datasets_base}")
        return
    
    print("📊 Available Datasets:")
    print("=" * 50)
    
    for dataset_folder in datasets_base.iterdir():
        if dataset_folder.is_dir():
            print(f"\n📁 {dataset_folder.name}")
            
            # Check dataset structure
            train_paths = list(dataset_folder.rglob("train"))
            if train_paths:
                for train_path in train_paths:
                    class_dirs = [d for d in train_path.iterdir() if d.is_dir()]
                    if class_dirs:
                        print(f"   📋 Train classes ({len(class_dirs)}): {[d.name for d in class_dirs[:5]]}")
                        if len(class_dirs) > 5:
                            print(f"       ... and {len(class_dirs)-5} more")
                        
                        # Count images
                        image_count = len([f for f in train_path.rglob("*.jpg")] + 
                                        [f for f in train_path.rglob("*.jpeg")] + 
                                        [f for f in train_path.rglob("*.png")])
                        print(f"   🖼️  Total images: {image_count}")
                    else:
                        print(f"   ❌ No class directories found in {train_path}")
            else:
                print("   ❌ No train directory found")

show_available_datasets()

📊 Available Datasets:

📁 emotions_back
   📋 Train classes (3): ['background', 'tired-person', 'happy-person']
   🖼️  Total images: 5725

📁 human_identification_only
   📋 Train classes (1): ['humans']
   🖼️  Total images: 1260

📁 Face emotion classification.v3i.folder
   📋 Train classes (3): ['background', 'tired-person', 'happy-person']
   🖼️  Total images: 1346

📁 emotions_original
   📋 Train classes (5): ['Happy', 'Sad', 'Surprise', 'Neutral', 'Angry']
   🖼️  Total images: 9768

📁 human_identification_balanced
   📋 Train classes (2): ['background', 'humans']
   🖼️  Total images: 2560

📁 human_identification_merged
   📋 Train classes (2): ['background', 'human']
   🖼️  Total images: 3819

📁 emotions
   📋 Train classes (2): ['tired-person', 'happy-person']
   🖼️  Total images: 3788


In [4]:
# Get test images from the extracted datasets (relative to workspace root)
def find_dataset_images():
    """Find test images from extracted datasets using workspace root."""
    datasets_base = workspace_root / "datasets"
    test_images = []
    classes = {}
    
    if not datasets_base.exists():
        print(f"❌ Datasets directory not found: {datasets_base}")
        return test_images, classes
    
    print(f"🔍 Looking for datasets in: {datasets_base}")
    
    # Look through all dataset directories
    for dataset_folder in datasets_base.iterdir():
        if dataset_folder.is_dir():
            print(f"📁 Checking dataset: {dataset_folder.name}")
            
            # Look for train directories in various possible structures
            possible_train_paths = [
                dataset_folder / "train",  # Direct structure
                dataset_folder / dataset_folder.name / "train",  # Nested with same name
            ]
            
            # Also check for any subdirectory that contains train/
            for subdir in dataset_folder.rglob("train"):
                if subdir.is_dir():
                    possible_train_paths.append(subdir)
            
            for train_path in possible_train_paths:
                if train_path.exists() and train_path.is_dir():
                    print(f"   ✅ Found training data: {train_path}")
                    
                    # Find image files
                    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp'}
                    
                    for img_path in train_path.rglob("*"):
                        if img_path.suffix.lower() in image_extensions and img_path.is_file():
                            test_images.append(img_path)
                    
                    # Group by class (parent directory name)
                    for img_path in test_images[:50]:  # Limit to first 50 for display
                        class_name = img_path.parent.name
                        if class_name not in classes:
                            classes[class_name] = []
                        classes[class_name].append(img_path)
                    
                    break  # Use first valid train directory found
    
    return test_images, classes

# Find images from the datasets
test_images, classes = find_dataset_images()

if test_images:
    print(f"📊 Found {len(test_images)} test images across {len(classes)} classes")
    print(f"📋 Classes found: {list(classes.keys())}")
    for class_name, images in classes.items():
        print(f"   {class_name}: {len(images)} images")
else:
    print("❌ No test images found")
    print("💡 Make sure you've run training to extract a dataset:")
    print("   cd", workspace_root)
    print("   source .venv/bin/activate && python src/workflows/edge_workflow.py --model fai-cls-n-coco --task classification --dataset rock-paper-scissors-classification --max_iters 100")

🔍 Looking for datasets in: /Users/u464645/Documents/projects/hackatons/project/sample/datasets
📁 Checking dataset: emotions_back
   ✅ Found training data: /Users/u464645/Documents/projects/hackatons/project/sample/datasets/emotions_back/train
📁 Checking dataset: human_identification_only
   ✅ Found training data: /Users/u464645/Documents/projects/hackatons/project/sample/datasets/human_identification_only/train
📁 Checking dataset: Face emotion classification.v3i.folder
   ✅ Found training data: /Users/u464645/Documents/projects/hackatons/project/sample/datasets/Face emotion classification.v3i.folder/train
📁 Checking dataset: emotions_original
   ✅ Found training data: /Users/u464645/Documents/projects/hackatons/project/sample/datasets/emotions_original/train
📁 Checking dataset: human_identification_balanced
   ✅ Found training data: /Users/u464645/Documents/projects/hackatons/project/sample/datasets/human_identification_balanced/train
📁 Checking dataset: human_identification_merged
   

## 🧪 Interactive Testing

Now let's test the model with sample images! Run the cells below to test different images.

In [6]:
import cv2
from datetime import datetime
from PIL import Image
import time
from pathlib import Path

def interactive_webcam_tester(model, classes, preferred_label,workspace_root, window_name="Webcam Tester - Press SPACE to test, Q to quit"):
    """Interactive webcam tester - capture images and run tests when prompted."""
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("❌ Could not open webcam.")
        return

    print("📸 Interactive Webcam Tester Started!")
    print("Controls:")
    print("  - SPACEBAR: Capture image and run test")
    print("  - 'q': Quit")
    print("  - ESC: Quit")
    
    class_list = sorted(list(classes.keys())) if classes else None
    capture_count = 0
    
    # Create directory for captured images
    captures_dir = Path(workspace_root) / "webcam_captures"
    captures_dir.mkdir(exist_ok=True)
    
    # Variables to store prediction results - IMPROVED SYNCHRONIZATION
    current_prediction = None
    current_confidence = 0.0
    prediction_timestamp = None
    is_processing = False  # Flag to show when inference is running
    processing_start = None

    while True:
        ret, frame = cap.read()
        if not ret:
            print("⚠️ Failed to grab frame.")
            break

        height, width = frame.shape[:2]
        
        # Add instructions overlay
        instructions = [
            "Press SPACE to capture & test",
            "Press Q to quit"
        ]
        
        # Show processing status
        if is_processing:
            instructions.append("🔄 Processing... Please wait")
        
        for i, text in enumerate(instructions):
            y_pos = 30 + (i * 30)
            color = (0, 165, 255) if "Processing" in text else (0, 255, 255)  # Orange for processing
            cv2.putText(frame, text, (10, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

        # Show crosshair for better framing
        cv2.line(frame, (width//2 - 20, height//2), (width//2 + 20, height//2), (0, 255, 0), 2)
        cv2.line(frame, (width//2, height//2 - 20), (width//2, height//2 + 20), (0, 255, 0), 2)

        # Display CURRENT prediction result (synchronized with latest inference)
        if current_prediction is not None and prediction_timestamp:
            elapsed = datetime.now() - prediction_timestamp
            if elapsed.total_seconds() < 15:  # Show prediction for 15 seconds
                # Create a semi-transparent overlay for better text visibility
                overlay = frame.copy()
                cv2.rectangle(overlay, (10, height - 140), (width - 10, height - 10), (0, 0, 0), -1)
                cv2.addWeighted(overlay, 0.7, frame, 0.3, 0, frame)
                
                # Display prediction result with FRESH indicator
                pred_text = f"Latest: {current_prediction}"
                conf_text = f"Confidence: {current_confidence:.3f}"
                time_text = f"Age: {elapsed.total_seconds():.1f}s"
                
                # Choose color based on confidence and freshness
                if elapsed.total_seconds() < 2:  # Fresh prediction (< 2 seconds)
                    color = (0, 255, 0) if current_confidence > 0.7 else (0, 255, 255)  # Bright green/cyan
                    fresh_indicator = "🆕 FRESH"
                else:
                    color = (128, 255, 128) if current_confidence > 0.7 else (128, 255, 255)  # Dimmer colors
                    fresh_indicator = ""
                
                cv2.putText(frame, pred_text, (20, height - 110), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
                cv2.putText(frame, conf_text, (20, height - 80), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                cv2.putText(frame, time_text, (20, height - 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
                
                if fresh_indicator:
                    cv2.putText(frame, fresh_indicator, (20, height - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        cv2.imshow(window_name, frame)
        key = cv2.waitKey(1) & 0xFF
        
        if key == ord('q') or key == 27:  # 'q' or ESC key
            break
        elif key == ord(' ') and not is_processing:  # Spacebar - only if not already processing
            # START PROCESSING - Set flags IMMEDIATELY
            is_processing = True
            processing_start = datetime.now()
            
            # Capture and test the current frame
            capture_count += 1
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            
            print(f"\n📸 Capture #{capture_count} - Running test...")
            
            # Save the captured frame FIRST
            capture_filename = f"capture_{capture_count}_{timestamp}.jpg"
            capture_path = captures_dir / capture_filename
            cv2.imwrite(str(capture_path), frame)
            print(f"💾 Saved image: {capture_path}")
            
            # Convert frame to RGB for model inference
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(rgb_frame)
            
            # CLEAR OLD PREDICTION before starting new inference
            current_prediction = "Processing..."
            current_confidence = 0.0
            prediction_timestamp = datetime.now()
            
            # Run inference on the captured frame
            try:
                start_time = time.time()
                
                # Try different input formats
                try:
                    result = model.infer(image=pil_image, threshold=0.4, annotate=False)
                except Exception:
                    try:
                        result = model.infer(image=str(capture_path), threshold=0.4, annotate=False)
                    except Exception:
                        result = model(pil_image)
                
                inference_time = time.time() - start_time
                # Parse results and update display variables IMMEDIATELY
                # Extract the detection with highest confidence

                print(result.__dict__)
                
                if result.detections:
                    detections = result.detections
                    humans_detections = [d for d in detections if d.label == preferred_label and d.conf > 0.5]
                    if humans_detections:
                        # Prioritize humans if confidence > 0.5, use the best humans detection
                        best_humans = max(humans_detections, key=lambda x: x.conf)
                        label = best_humans.label
                        conf = best_humans.conf
                    else:
                        # Otherwise use the detection with highest confidence overall
                        best_detection = max(detections, key=lambda x: x.conf)
                        label = best_detection.label
                        conf = best_detection.conf
                    
                    print(f"   ✅ Prediction: {label} (confidence: {conf:.3f})")
                    detections = result.detections
                 
                else:
                    label = str(result)
                    conf = 0.0



                current_prediction = label
                current_confidence = conf
                prediction_timestamp = datetime.now()  # Fresh timestamp for new prediction
                
                print(f"⏱️  Inference time: {inference_time*1000:.1f} ms")
                print(f"💾 Image saved as: {capture_filename}")
                print(f"✅ NEW prediction updated: {label} ({conf:.3f})")
                print("-" * 50)
                
            except Exception as e:
                print(f"❌ Inference error: {e}")
                current_prediction = "ERROR"
                current_confidence = 0.0
                prediction_timestamp = datetime.now()
                print("-" * 50)
            
            # FINISH PROCESSING - Clear flag
            is_processing = False

    cap.release()
    cv2.destroyAllWindows()
    print(f"🛑 Webcam tester stopped. Captured {capture_count} images.")
    if capture_count > 0:
        print(f"📁 Images saved in: {captures_dir}")

def batch_test_captures(model, workspace_root, preferred_label, classes=None):
    """Test all previously captured images."""
    captures_dir = Path(workspace_root) / "webcam_captures"
    
    if not captures_dir.exists():
        print("❌ No captures directory found. Capture some images first!")
        return
    
    image_files = list(captures_dir.glob("*.jpg")) + list(captures_dir.glob("*.png"))
    
    if not image_files:
        print("❌ No captured images found.")
        return
    
    print(f"🧪 Testing {len(image_files)} captured images...")
    
    for img_path in sorted(image_files):
        print(f"\n📸 Testing: {img_path.name}")
        # Test each image
        try:
            result = model.infer(image=str(img_path), threshold=0.5, annotate=False)
            
            # Parse results
            if isinstance(result, dict) and "detections" in result:
                detections = result["detections"]
                if detections:
                    # Check for high-confidence humans detection first
                    humans_detections = [d for d in detections if d.label == preferred_label and d.conf > 0.5]
                    print(detections.__dict__)
                    if humans_detections:
                        # Prioritize humans if confidence > 0.5, use the best humans detection
                        best_humans = max(humans_detections, key=lambda x: x.conf)
                        label = best_humans.label
                        conf = best_humans.conf
                    else:
                        # Otherwise use the detection with highest confidence overall
                        best_detection = max(detections, key=lambda x: x.conf)
                        label = best_detection.label
                        conf = best_detection.conf
                    
                    print(f"   ✅ Prediction: {label} (confidence: {conf:.3f})")
                else:
                    print(f"   ⚠️ No detections found")
            else:
                print(f"   ℹ️ Result: {result}")
        except Exception as e:
            print(f"   ❌ Error: {e}")




In [None]:
from focoos import ModelManager, FocoosHUB
from focoos.models.focoos_model  import FocoosModel, FocoosDetections
import os
api_key = os.environ.get("FOCOOS_API_KEY") or 'c7ef8380320c421792425668205fa8fa'

hub = FocoosHUB(api_key=api_key)

ref = "f73c51dfb3bd422f"
model : FocoosModel= ModelManager.get(f"hub://{ref}", hub=hub)


# 🎯 Verify HUB Model for Tired Person Detection
print("🔍 HUB Model Status:")
if 'model' in locals() and model is not None:
    print(f"✅ HUB Model loaded: {type(model)}")
    
    # Test with dummy input to check classes
    try:
        import numpy as np
        dummy_input = np.random.rand(224, 224, 3).astype(np.uint8)
        dummy_pil = Image.fromarray(dummy_input)
        test_result : FocoosDetections = model(dummy_pil)

        pred= test_result
    except Exception as e:
        print(f"⚠️  Could not test model: {e}")
        
# Example usage:
#classes = {"background": 1,"humans": 2}
classes = {"happy-person": 1,"tired-person": 2}
interactive_webcam_tester(model, classes, "tired-person", workspace_root)

[1;32m[10/05 02:52][INFO][HUB]: Currently logged as: frigato.luca97@gmail.com environment: https://api.focoos.ai/v0[0m
[1;32m[10/05 02:52][INFO][HUB]: 📥 Model already downloaded[0m
[1;32m[10/05 02:52][INFO][ModelManager]: 📥 Loading model info from cache: /Users/u464645/FocoosAI/models/af0de65d2173413e/model_info.json[0m
[1;32m[10/05 02:52][INFO][FocoosModel]: Loading weights from local path: /Users/u464645/FocoosAI/models/af0de65d2173413e/model_final.pth[0m


🔍 HUB Model Status:
✅ HUB Model loaded: <class 'focoos.models.focoos_model.FocoosModel'>
📸 Interactive Webcam Tester Started!
Controls:
  - SPACEBAR: Capture image and run test
  - 'q': Quit
  - ESC: Quit


2025-10-05 02:52:13.024 python[38289:11955490] +[IMKClient subclass]: chose IMKClient_Modern
2025-10-05 02:52:13.024 python[38289:11955490] +[IMKInputSession subclass]: chose IMKInputSession_Modern



📸 Capture #1 - Running test...
💾 Saved image: /Users/u464645/Documents/projects/hackatons/project/sample/webcam_captures/capture_1_20251005_025213.jpg

1 happy-person, 1 tired-person
Latency: imload 2ms, preprocess 3ms, inference 23ms, postprocess 0ms, total 28ms
{'detections': [FocoosDet(bbox=None, conf=0.6550604104995728, cls_id=0, label=happy-person, mask=None, keypoints=None), FocoosDet(bbox=None, conf=0.7397279739379883, cls_id=1, label=tired-person, mask=None, keypoints=None)], 'image': None, 'latency': InferLatency(imload=0.002, preprocess=0.003, inference=0.023, postprocess=0.0, annotate=None)}
   ✅ Prediction: tired-person (confidence: 0.740)
⏱️  Inference time: 28.3 ms
💾 Image saved as: capture_1_20251005_025213.jpg
✅ NEW prediction updated: tired-person (0.740)
--------------------------------------------------

📸 Capture #2 - Running test...
💾 Saved image: /Users/u464645/Documents/projects/hackatons/project/sample/webcam_captures/capture_2_20251005_025213.jpg

1 happy-pers

2025-10-05 02:52:24.135 python[38289:11955490] _TIPropertyValueIsValid called with 16 on nil context!
2025-10-05 02:52:24.135 python[38289:11955490] imkxpc_getApplicationProperty:reply: called with incorrect property value 16, bailing.
2025-10-05 02:52:24.135 python[38289:11955490] Text input context does not respond to _valueForTIProperty:



📸 Capture #18 - Running test...
💾 Saved image: /Users/u464645/Documents/projects/hackatons/project/sample/webcam_captures/capture_18_20251005_025246.jpg

1 happy-person, 1 tired-person
Latency: imload 3ms, preprocess 1ms, inference 26ms, postprocess 0ms, total 30ms
{'detections': [FocoosDet(bbox=None, conf=0.6320154666900635, cls_id=0, label=happy-person, mask=None, keypoints=None), FocoosDet(bbox=None, conf=0.7551708817481995, cls_id=1, label=tired-person, mask=None, keypoints=None)], 'image': None, 'latency': InferLatency(imload=0.003, preprocess=0.001, inference=0.026, postprocess=0.0, annotate=None)}
   ✅ Prediction: tired-person (confidence: 0.755)
⏱️  Inference time: 29.6 ms
💾 Image saved as: capture_18_20251005_025246.jpg
✅ NEW prediction updated: tired-person (0.755)
--------------------------------------------------


2025-10-05 02:52:48.537 python[38289:11955490] _TIPropertyValueIsValid called with 16 on nil context!
2025-10-05 02:52:48.537 python[38289:11955490] imkxpc_getApplicationProperty:reply: called with incorrect property value 16, bailing.
2025-10-05 02:52:48.537 python[38289:11955490] Text input context does not respond to _valueForTIProperty:
2025-10-05 02:52:54.066 python[38289:11955490] _TIPropertyValueIsValid called with 16 on nil context!
2025-10-05 02:52:54.066 python[38289:11955490] imkxpc_getApplicationProperty:reply: called with incorrect property value 16, bailing.
2025-10-05 02:52:54.066 python[38289:11955490] Text input context does not respond to _valueForTIProperty:



📸 Capture #19 - Running test...
💾 Saved image: /Users/u464645/Documents/projects/hackatons/project/sample/webcam_captures/capture_19_20251005_025255.jpg

1 happy-person, 1 tired-person
Latency: imload 2ms, preprocess 1ms, inference 21ms, postprocess 0ms, total 24ms
{'detections': [FocoosDet(bbox=None, conf=0.6968820095062256, cls_id=0, label=happy-person, mask=None, keypoints=None), FocoosDet(bbox=None, conf=0.7229427099227905, cls_id=1, label=tired-person, mask=None, keypoints=None)], 'image': None, 'latency': InferLatency(imload=0.002, preprocess=0.001, inference=0.021, postprocess=0.0, annotate=None)}
   ✅ Prediction: tired-person (confidence: 0.723)
⏱️  Inference time: 24.3 ms
💾 Image saved as: capture_19_20251005_025255.jpg
✅ NEW prediction updated: tired-person (0.723)
--------------------------------------------------

📸 Capture #20 - Running test...
💾 Saved image: /Users/u464645/Documents/projects/hackatons/project/sample/webcam_captures/capture_20_20251005_025255.jpg

1 happy

KeyboardInterrupt: 