# FBEdge Examples - Edge AI with Local Storage

This notebook provides a complete workflow for working with Hailo8 edge devices, from data capture to model deployment.

## Workflow Overview
1. **Setup** - Configure stream ID and AWS credentials
2. **Data Capture** - Capture video data and save to `data/raw/`
3. **Annotation** - Guidelines for annotating captured data
4. **Training** - Train models on annotated data
5. **Deployment** - Deploy models for inference

Let's get started!

# 1. Setup - Stream ID and AWS Configuration

## Step 1.1: Configure Stream ID

First, set your unique stream identifier. This will be used to organize your data and identify your video streams.

In [None]:
import os
from datetime import datetime

# Set your unique stream ID
STREAM_ID = "Unit specific Stream ID" 

print(f"Stream ID configured: {STREAM_ID}")
print(f"Session started: {datetime.now()}")

Stream ID configured: e84f3644c93d46e793c6e666d548496f-0
Session started: 2025-09-18 15:27:12.258349


## Step 1.2: Configure AWS Credentials

Set up your AWS credentials for cloud integration (optional but recommended for production use).

In [None]:
import boto3
from botocore.exceptions import NoCredentialsError, ClientError

# AWS Configuration - Using short-lived credentials
AWS_CONFIG = {
    'region': 'eu-west-1',           # Change to your preferred region
    'access_key_id': 'ACCESS_KEY_ID',             # Your AWS Access Key ID (from temporary credentials)
    'secret_access_key': 'SECRET_ACCESS_KEY',         # Your AWS Secret Access Key (from temporary credentials)
    'session_token': 'SESSION_TOKEN',             # Your AWS Session Token (required for temporary credentials)
}

# Short-lived credentials can be obtained from:
# 1. AWS CLI: aws sts get-session-token --duration-seconds 3600
# 2. AWS Console: "Command line or programmatic access"
# 3. AWS STS assume-role operations
# 4. IAM Identity Center (AWS SSO)
print("AWS Configuration:")
print(f"Region: {AWS_CONFIG['region']}")
print(f"Access Key ID: {'*' * len(AWS_CONFIG['access_key_id']) if AWS_CONFIG['access_key_id'] else 'Not set'}")
print(f"Secret Key: {'*' * len(AWS_CONFIG['secret_access_key']) if AWS_CONFIG['secret_access_key'] else 'Not set'}")
print(f"Session Token: {'*' * len(AWS_CONFIG['session_token']) if AWS_CONFIG['session_token'] else 'Not set'}")

if not all([AWS_CONFIG['access_key_id'], AWS_CONFIG['secret_access_key'], AWS_CONFIG['session_token']]):
    print("\n⚠️  AWS short-lived credentials not fully configured.")
    print("   To get temporary credentials, run:")
    print("   aws sts get-session-token --duration-seconds 3600")
    print("   You can still proceed with local-only functionality.")

AWS Configuration:
Region: eu-west-1
Access Key ID: ********************
Secret Key: ****************************************
Session Token: *******************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************

## Step 1.3: Test AWS Connection

Test your AWS credentials and connection to ensure everything is working properly.

In [10]:
# Import the test function from video_capture module
from video_capture import test_aws_connection

# Run the test
aws_connected, aws_session = test_aws_connection(AWS_CONFIG)

[38;21m2025-09-18 15:27:14,236 - video_capture - INFO - Testing AWS connection[0m
[38;21m2025-09-18 15:27:14,237 - video_capture - INFO - Using configured short-lived credentials[0m


🔑 Using configured short-lived credentials


[38;21m2025-09-18 15:27:14,465 - video_capture - INFO - STS call successful[0m
[38;21m2025-09-18 15:27:14,466 - video_capture - INFO - Using temporary/short-lived credentials[0m


✅ AWS Connection successful!
   Account ID: 789153103247
   User ARN: arn:aws:sts::789153103247:assumed-role/AWSReservedSSO_Developer_077527dfd4a46145/guto@factbird.com
   🕐 Using temporary/short-lived credentials


[38;21m2025-09-18 15:27:14,717 - video_capture - INFO - S3 access successful - 27 buckets found[0m
[38;21m2025-09-18 15:27:14,718 - video_capture - INFO - AWS connection test completed successfully[0m


   S3 Access: OK (27 buckets accessible)


# 2. Data Capture - Video Collection

Now let's capture video data and save it to the `data/raw/` directory for later processing and annotation.

## Step 2.1: Setup Data Directories

In [4]:
# Import data directory setup functions
from video_capture import setup_data_directories, save_session_metadata
from datetime import datetime

# Setup data directories
DATA_DIRS, session_dir = setup_data_directories(STREAM_ID)

# Save session metadata
session_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_session_metadata(session_dir, STREAM_ID, session_timestamp, aws_connected)

✅ raw directory ready: data/raw
✅ processed directory ready: data/processed
✅ models directory ready: models
📁 Session directory created: data/raw/e84f3644c93d46e793c6e666d548496f-0/20250918_145652
📄 Session metadata saved


## Step 2.2: AWS Kinesis Video Streams Setup

Since we're working with AWS KVS, let's configure the video stream connection to receive data from your edge device.

In [9]:
# Import KVS setup function
from video_capture import setup_kvs_stream

# Connect to existing KVS stream
kvs_config = setup_kvs_stream(aws_session, STREAM_ID, AWS_CONFIG) if aws_connected else None

if kvs_config:
    print(f"\n🎯 Connected to KVS Stream!")
    print(f"   Stream name: {kvs_config['stream_name']}")
    print(f"   Region: {kvs_config['region']}")
    print(f"   Status: {kvs_config['status']}")
    print(f"\n📡 Ready to consume video data from edge device")
else:
    print(f"\n⚠️  Could not connect to KVS stream. Proceeding with local mode.")
    kvs_config = None

[38;21m2025-09-17 09:20:15,230 - video_capture - INFO - Setting up KVS stream for stream_id: e84f3644c93d46e793c6e666d548496f-0[0m
[38;21m2025-09-17 09:20:15,273 - video_capture - INFO - KVS client created successfully[0m
[38;21m2025-09-17 09:20:15,273 - video_capture - INFO - Attempting to connect to stream: e84f3644c93d46e793c6e666d548496f-0[0m
[38;21m2025-09-17 09:20:15,437 - video_capture - INFO - Stream found successfully: e84f3644c93d46e793c6e666d548496f-0[0m
[38;21m2025-09-17 09:20:15,481 - video_capture - INFO - Got data endpoint: https://s-5ac677c0.kinesisvideo.eu-west-1.amazonaws.com[0m
[38;21m2025-09-17 09:20:15,482 - video_capture - INFO - KVS stream setup completed successfully[0m


✅ Found KVS stream: e84f3644c93d46e793c6e666d548496f-0
   Status: ACTIVE
   ARN: arn:aws:kinesisvideo:eu-west-1:789153103247:stream/e84f3644c93d46e793c6e666d548496f-0/1757944024970
   Created: 2025-09-15 15:47:04.970000+02:00
📺 Live media endpoint: https://s-5ac677c0.kinesisvideo.eu-west-1.amazonaws.com

🎯 Connected to KVS Stream!
   Stream name: e84f3644c93d46e793c6e666d548496f-0
   Region: eu-west-1
   Status: ACTIVE

📡 Ready to consume video data from edge device


## Step 2.3: Configure Video Stream Consumption

Configure how we'll consume and save video data from the KVS stream to `data/raw/`.

In [10]:
# Stream consumption configuration
STREAM_CONFIG = {
    'duration_seconds': 60,          # How far back in time to look (60 seconds default)
    'frame_extract_fps': 2,          # Extract frames at this rate (2 FPS = every 0.5 seconds)
    'max_frames': 100,               # Maximum frames to save (None = unlimited)
    'image_format': 'jpg',           # Image format for extracted frames
    'image_quality': 100,             # JPEG quality (1-100)
    'start_selector_type': 'NOW', # Start from LATEST and work backwards
    'fragment_selector_type': 'SERVER_TIMESTAMP',  # Use server timestamps
}

print("📋 Stream Consumption Configuration:")
for key, value in STREAM_CONFIG.items():
    print(f"   {key}: {value}")

# Calculate expected frames from stream
expected_frames = min(
    STREAM_CONFIG['max_frames'] if STREAM_CONFIG['max_frames'] else float('inf'),
    STREAM_CONFIG['duration_seconds'] * STREAM_CONFIG['frame_extract_fps']
)

print(f"\n📊 Expected frames to extract: {int(expected_frames)}")
print(f"💾 Estimated storage per frame: ~50-100 KB")
print(f"💾 Total estimated storage: {int(expected_frames) * 75 / 1024:.1f} MB")

if kvs_config:
    print(f"\n📡 Will consume from KVS stream: {kvs_config['stream_name']}")
    print(f"🕐 Strategy: Get latest frames working backwards {STREAM_CONFIG['duration_seconds']}s")
else:
    print(f"\n⚠️  No KVS stream available - will use local fallback mode")

📋 Stream Consumption Configuration:
   duration_seconds: 60
   frame_extract_fps: 2
   max_frames: 100
   image_format: jpg
   image_quality: 100
   start_selector_type: NOW
   fragment_selector_type: SERVER_TIMESTAMP

📊 Expected frames to extract: 100
💾 Estimated storage per frame: ~50-100 KB
💾 Total estimated storage: 7.3 MB

📡 Will consume from KVS stream: e84f3644c93d46e793c6e666d548496f-0
🕐 Strategy: Get latest frames working backwards 60s


## Step 2.4: Start Stream Consumption

Now let's consume video data from the KVS stream and extract frames to save in `data/raw/`.

⚠️ **Important**: Make sure your edge device is actively streaming to the KVS stream before running this!

In [11]:
# Import KVS stream consumer
from video_capture import KVSStreamConsumer

# RUN THIS CELL TO START KVS STREAM CONSUMPTION (BACKWARDS)
if kvs_config and aws_connected:
    # Initialize consumer
    consumer = KVSStreamConsumer(aws_session, kvs_config)

    # Start consumption
    success = consumer.consume_stream(session_dir, STREAM_CONFIG)

    if success:
        print(f"\n🎉 KVS stream consumption completed successfully!")
        print(f"📁 Your extracted frames are saved in: {session_dir}")
        print(f"🔄 You can now proceed to the annotation step.")
    else:
        print(f"\n⚠️  Stream consumption encountered issues. Check the error messages above.")
        print(f"💡 Make sure your edge device has been streaming to KVS in the last {STREAM_CONFIG['duration_seconds']} seconds.")
else:
    print(f"\n❌ Cannot consume stream - AWS connection or KVS setup failed")
    print(f"💡 Check Steps 1.3 and 2.2 above")

[38;21m2025-09-17 09:21:21,452 - video_capture - INFO - Final frame extraction attempt with remaining buffer[0m
[38;21m2025-09-17 09:21:21,452 - video_capture - INFO - Attempting frame extraction from 627680 byte buffer[0m


📡 LIVE STREAM PROCESSING:
⏱️  Processing time: 59.0s
🖼️ Frames extracted: 14
📊 Total data: 2,097,152 bytes
📦 Chunks processed: 32

⏰ Timeout reached after 61.1s


[matroska,webm @ 0x95a060780] EBML header parsing failed
OpenCV: Couldn't read video stream from file "data/raw/e84f3644c93d46e793c6e666d548496f-0/20250917_092010/temp_stream_14.webm"
[matroska,webm @ 0x95a060780] EBML header parsing failed
OpenCV: Couldn't read video stream from file "data/raw/e84f3644c93d46e793c6e666d548496f-0/20250917_092010/temp_stream_14.mkv"
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x95a060780] moov atom not found
OpenCV: Couldn't read video stream from file "data/raw/e84f3644c93d46e793c6e666d548496f-0/20250917_092010/temp_stream_14.mp4"
[matroska,webm @ 0x95a060780] Element at 0x5 ending at 0x1c6 exceeds containing master element ending at 0x7
[matroska,webm @ 0x95a060780] EBML header parsing failed
OpenCV: Couldn't read video stream from file "data/raw/e84f3644c93d46e793c6e666d548496f-0/20250917_092010/temp_raw_14.webm"
[matroska,webm @ 0x95a060780] Element at 0x5 ending at 0x2dc7 exceeds containing master element ending at 0x7
[matroska,webm @ 0x95a060780] EBML header parsi

🖼️ Final extraction: 1 frame(s)

📊 LIVE STREAM PROCESSING SUMMARY:
   Frames extracted: 15
   Total chunks processed: 32
   Processing time: 61.7 seconds
   Data processed: 2,097,152 bytes
   Stream: e84f3644c93d46e793c6e666d548496f-0
   Method: Live stream video frame extraction
   Storage location: data/raw/e84f3644c93d46e793c6e666d548496f-0/20250917_092010
   Summary saved: live_consumption_summary.json

✅ Check the frame_*.jpg files for extracted images

🎉 KVS stream consumption completed successfully!
📁 Your extracted frames are saved in: data/raw/e84f3644c93d46e793c6e666d548496f-0/20250917_092010
🔄 You can now proceed to the annotation step.


# 3. Data Annotation - Labeling Your Dataset

Now that you have captured video frames, you need to annotate them for training your models. This section provides guidance on annotation tools and best practices.



## 🛠️ Annotation Tools - Choose Your Tool

**Important**: If you plan to use Hailo training tools (recommended), ensure your annotations follow their specific format requirements. We recommend Roboflow for seamless compatibility, but feel free to use any tool that meets your needs.

### 1. Roboflow - Cloud-Based (Recommended)
**Best for**: Hailo compatibility, format conversion, team collaboration
- Go to [roboflow.com](https://roboflow.com)
- Upload images from `{session_dir}`
- Annotate via web interface
- Export in Hailo-compatible formats

### 2. CVAT - Professional
**Best for**: Complex projects, video annotation
```bash
git clone https://github.com/opencv/cvat
cd cvat && docker-compose up -d
```
- Web interface at `http://localhost:8080`

**Note**: Verify format compatibility with your chosen training pipeline before starting annotation.

# Training

For model training, Hailo strongly recommends using their Docker-based training environment through the Hailo Model Zoo. This approach provides pre-configured containers with popular network architectures (YOLO, ResNet, etc.) that can be retrained on your custom datasets. The Docker environment ensures consistent dependencies, proper GPU utilization, and seamless integration with the subsequent compilation pipeline.

**System Requirements:** The Model Zoo requires the Hailo Dataflow Compiler, which runs on Linux (Ubuntu 20.04/22.04). WSL2 on Windows may work but is poorly documented and requires additional setup.

**Getting Started:** Follow the comprehensive training guide in the [Hailo Model Zoo repository](https://github.com/hailo-ai/hailo_model_zoo) which includes step-by-step instructions for setting up the Docker environment and retraining models with your annotated data.


# 4. Compiling

**⚠️ IMPORTANT - Compilation Prerequisites:**

Model compilation to HEF format requires specific environment setup:

**Requirements:**
- **Operating System**: Linux x86_64 only
- **Hailo Dataflow Compiler (DFC)**: Must be installed and available
- **Python Environment**: Same virtual environment used for training

**Setup Instructions:**
1. **Linux x86 Environment**: Compilation only works on Linux x86_64 systems
2. **Install Hailo DFC**: Follow Hailo's official installation guide for the Dataflow Compiler
3. **Virtual Environment**: Use the same Python virtual environment that was used for model training:
   ```bash
   # Activate the training environment
   source your-training-venv/bin/activate
   
   # Verify Hailo DFC installation
   hailo -h
   ```
4. **Prerequisites**: Ensure all training dependencies are available in the same environment

**Note**: If you don't have access to a Linux x86 system with Hailo DFC, you can:
- Use Hailo's cloud compilation services
- Contact Hailo support for compilation assistance
- Use pre-compiled HEF models for testing

### Step 1 -Parsing (.onnx -> .har)

In [None]:
import os
import json
import matplotlib.patches as patches
import numpy as np
import tensorflow as tf

from IPython.display import SVG
from hailo_sdk_client import ClientRunner, InferenceContext
from IPython.display import SVG
from matplotlib import pyplot as plt
from PIL import Image

IMAGES_TO_VISUALIZE = 10

In [None]:
# The notebooks expect the .onnx model to be located here ./models/<model_name>.onnx
onnx_model_name = "best"  
workdir = "./models"
onnx_path = os.path.join(workdir, f"{onnx_model_name}.onnx")

# Load the .onnx file
print("Loading model from:", onnx_path)

# Step 1 - Convert to {onnx_model_name}.har
runner = ClientRunner(hw_arch='hailo8')
hn, npz = runner.translate_onnx_model(onnx_path, onnx_model_name, end_node_names=['Conv_283', 'Conv_251', 'Conv_267'])

# Save
har_path = os.path.join(workdir, f"{onnx_model_name}.har")
runner.save_har(har_path)

### Step 2 - Optimize (.har -> optimized.har)



In [None]:
# Here we will prepare a calibration set. The influence of the pictures in this step is not well understood.
#from tensorflow.python.eager.context import eager_mode #<- leftover example import, may be relevant. 

images_path = os.path.join(workdir, './data/processed/calibration-set')
images_list = [img_name for img_name in os.listdir(images_path) if
               os.path.splitext(img_name)[1] == '.jpg']


calib_dataset = np.zeros((len(images_list), 640, 640, 3))
for idx, img_name in enumerate(sorted(images_list)):
    img = np.array(Image.open(os.path.join(images_path, img_name)))
    img_preproc = tf.squeeze(img)
    calib_dataset[idx, :, :, :] = img_preproc.numpy()

# Can be saved
#np.save('calib_set.npy', calib_dataset)

In [None]:
# Minimal Model script flow
# model_script_commands = [
#     'normalization1 = normalization([0.0, 0.0, 0.0], [255.0, 255.0, 255.0])\n'
#     'nms_postprocess(meta_arch=yolov5, engine=cpu, nms_scores_th=0.7, nms_iou_th=0.6)\n',
# ]
# # Note: Scores threshold of 0.0 means no filtering, 1.0 means maximal filtering. IoU thresholds are opposite: 1.0 means filtering boxes only if they are equal, and 0.0 means filtering with minimal overlap.
# runner.load_model_script(''.join(model_script_commands))


# Regular Model script flow
runner.load_model_script("./cfg/yolov5m_wo_spp-edited.alls")
runner.optimize(calib_dataset)

# Save the optimized model
quantized_model_har_path = os.path.join(workdir, f"{onnx_model_name}-optimized.har")
runner.save_har(quantized_model_har_path)

### Step 3 - Compile (optimized.har -> .hef)

In [None]:
hef = runner.compile()

file_name = os.path.join(workdir, f"{onnx_model_name}.hef")
with open(file_name, 'wb') as f:
    f.write(hef)

# 5. Deploy

## Step 5.1: Deployment Overview

To deploy your compiled model to the edge device, you'll need to:
1. Upload your model file (`.hef`) to a web server or cloud storage
2. Generate a publicly accessible download URL
3. Create an IoT job that instructs the device to download from that URL

The device will download the file and save it to `/data/download-tests/test.txt` for testing.

## Step 5.2: Deployment Configuration

Configure the IoT thing ID and deployment URL for sending the download job to your edge device.

In [20]:
# Deployment Configuration Constants
THING_ID = "2a502dbd931787d773b2b1d4c0bdc409"  # Your IoT thing ID
DEPLOYMENT_URL = "https://raw.githubusercontent.com/GustavToft/download-tests/main/config-v2.txt"  # URL where device can download the model

print(f"📱 Thing ID: {THING_ID}")
print(f"🔗 Deployment URL: {DEPLOYMENT_URL}")
print(f"💡 The device will download the model from the provided URL")

📱 Thing ID: 2a502dbd931787d773b2b1d4c0bdc409
🔗 Deployment URL: https://raw.githubusercontent.com/GustavToft/download-tests/main/config-v2.txt
💡 The device will download the model from the provided URL


## Step 5.3: Check IoT Thing Exists

Before creating a deployment job, let's verify that the IoT thing exists in AWS IoT Core.

In [21]:
# Import IoT deployment functions
from iot_deployment import check_iot_thing_exists

# Check the thing
if aws_connected:
    thing_exists, thing_info = check_iot_thing_exists(aws_session, THING_ID)
else:
    print("❌ Cannot check IoT thing - AWS connection required")
    thing_exists = False
    thing_info = None

✅ IoT Thing found: 2a502dbd931787d773b2b1d4c0bdc409
   Thing Name: 2a502dbd931787d773b2b1d4c0bdc409
   Thing ARN: arn:aws:iot:eu-west-1:789153103247:thing/2a502dbd931787d773b2b1d4c0bdc409
   Attributes: {'cert_issuance': '01/09/2025'}
   📡 No shadow found (device may be offline)


## Step 5.4: Create IoT Job for File Download

Create an AWS IoT job to instruct the edge device to download the file from the provided URL.

In [22]:
# Import IoT deployment functions
from iot_deployment import create_deployment_job

# Create deployment job if thing exists
if thing_exists and aws_connected and thing_info:
    print(f"🚀 Creating download job for {THING_ID}...")
    
    # Use the thing ARN from the thing_info
    thing_arn = thing_info['thingArn']
    
    job_created, job_id, job_arn = create_deployment_job(
        aws_session, 
        thing_arn,  # Pass ARN instead of ID
        DEPLOYMENT_URL
    )
    
    if job_created:
        print(f"\n📋 Download job is ready!")
        print(f"💡 The edge device will receive instructions to download from: {DEPLOYMENT_URL}")
        print(f"📁 File will be saved as: /data/download-tests/test.txt")
        print(f"🔄 You can check job status by running the next cell")
    
else:
    print("❌ Cannot create download job - IoT thing not found or AWS not connected")
    job_created = False
    job_id = None

🚀 Creating download job for 2a502dbd931787d773b2b1d4c0bdc409...
✅ IoT Job created successfully!
   Job ID: deploy_download_20250918_153744
   Job ARN: arn:aws:iot:eu-west-1:789153103247:job/deploy_download_20250918_153744
   Target Thing: 2a502dbd931787d773b2b1d4c0bdc409
   Download URL: https://raw.githubusercontent.com/GustavToft/download-tests/main/config-v2.txt
   Created: 2025-09-18 15:37:44.994466

📋 Download job is ready!
💡 The edge device will receive instructions to download from: https://raw.githubusercontent.com/GustavToft/download-tests/main/config-v2.txt
📁 File will be saved as: /data/download-tests/test.txt
🔄 You can check job status by running the next cell


## Step 5.5: Monitor Download Status

Check the status of the download job and monitor device response.

In [25]:
# Import monitoring function
from iot_deployment import check_job_status

# Monitor download job status
if job_created and job_id:
    print(f"🔍 Checking status of job: {job_id}")
    job_status = check_job_status(aws_session, job_id)
    
    if job_status == "IN_PROGRESS":
        print(f"\n⏳ Job is in progress - device is processing download")
        print(f"💡 The device should download the file from: {DEPLOYMENT_URL}")
        print(f"📁 File will be saved as: /data/download-tests/test.txt")
        print(f"🔄 Re-run this cell to check for updates")
    elif job_status == "COMPLETED":
        print(f"\n✅ Download completed successfully!")
        print(f"📁 File should now be available at: /data/download-tests/test.txt")
    elif job_status == "FAILED":
        print(f"\n❌ Download failed")
        print(f"💡 Check device logs and ensure the URL is accessible")
    else:
        print(f"\n📊 Job status: {job_status}")
        
else:
    print("❌ No active download job to monitor")
    print("   Create a download job in the previous step first")

Could not get job executions: 'status'


🔍 Checking status of job: deploy_download_20250918_153744
📊 Job Status: COMPLETED

✅ Download completed successfully!
📁 File should now be available at: /data/download-tests/test.txt
