# Lab 06: Advanced Azure Video Indexer

## Overview
This advanced notebook explores sophisticated video analysis capabilities using Azure Video Indexer. You'll learn about custom models, advanced audio analysis, video moderation, batch processing, and integration with other Azure services.

## Advanced Topics Covered
- Custom models for brands and people recognition
- Video content moderation and compliance
- Advanced audio analysis and speaker identification
- Scene and shot detection
- Keyframe extraction and thumbnail generation
- Batch video processing workflows
- Integration with Azure services (Storage, Logic Apps, Functions)
- Video editing and clipping
- Custom language models
- Observed people tracking

## Setup

In [None]:
!pip install requests python-dotenv pandas matplotlib pillow azure-storage-blob -q

In [None]:
import os
import json
import time
import requests
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dotenv import load_dotenv
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display, HTML, Image, Markdown

# Load configuration
load_dotenv('python/.env')

# Video Indexer configuration
VIDEO_INDEXER_ACCOUNT_ID = os.getenv("VIDEO_INDEXER_ACCOUNT_ID")
VIDEO_INDEXER_API_KEY = os.getenv("VIDEO_INDEXER_API_KEY")
VIDEO_INDEXER_LOCATION = os.getenv("VIDEO_INDEXER_LOCATION", "trial")

API_BASE_URL = "https://api.videoindexer.ai"

if VIDEO_INDEXER_ACCOUNT_ID and VIDEO_INDEXER_API_KEY:
    print("✓ Video Indexer configuration loaded")
else:
    print("⚠ Warning: Video Indexer credentials not found")

# Helper function to get access token
def get_access_token() -> str:
    """Get Video Indexer access token."""
    url = f"{API_BASE_URL}/auth/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/AccessToken"
    headers = {"Ocp-Apim-Subscription-Key": VIDEO_INDEXER_API_KEY}
    params = {"allowEdit": "true"}
    response = requests.get(url, headers=headers, params=params)
    response.raise_for_status()
    return response.json().strip('"')

if VIDEO_INDEXER_ACCOUNT_ID and VIDEO_INDEXER_API_KEY:
    try:
        access_token = get_access_token()
        print("✓ Access token obtained")
    except Exception as e:
        print(f"Error: {str(e)}")
        access_token = None
else:
    access_token = None

## 1. Custom Brand Models

Create custom models to detect specific brands, products, or terminology in videos.

In [None]:
def create_brand_model(model_name: str, brands: List[Dict], 
                       access_token: str) -> Dict:
    """Create a custom brand model.
    
    Args:
        model_name: Name for the brand model
        brands: List of brand definitions with name, description, tags, etc.
        access_token: Video Indexer access token
    
    Example brands format:
    [
        {
            "name": "MyBrand",
            "description": "Company brand",
            "tags": ["brand", "company"],
            "enabled": True
        }
    ]
    """
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/BrandModels"
    
    params = {
        "accessToken": access_token,
        "name": model_name
    }
    
    response = requests.post(url, params=params)
    response.raise_for_status()
    model = response.json()
    
    print(f"✓ Brand model created: {model_name}")
    print(f"  Model ID: {model['id']}")
    
    # Add brands to model
    model_id = model['id']
    for brand in brands:
        add_brand_to_model(model_id, brand, access_token)
    
    return model

def add_brand_to_model(model_id: str, brand: Dict, access_token: str):
    """Add a brand to a custom model."""
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/BrandModels/{model_id}/Brands"
    
    params = {
        "accessToken": access_token
    }
    
    response = requests.post(url, params=params, json=brand)
    response.raise_for_status()
    
    print(f"  ✓ Added brand: {brand['name']}")

def list_brand_models(access_token: str) -> List[Dict]:
    """List all custom brand models."""
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/BrandModels"
    
    params = {"accessToken": access_token}
    response = requests.get(url, params=params)
    response.raise_for_status()
    
    return response.json()

# Example usage
print("\n=== Custom Brand Models ===")
print("Custom brand models allow you to:")
print("  - Detect company-specific brands")
print("  - Track product mentions")
print("  - Monitor competitor brands")
print("  - Identify custom terminology")
print("\nExample:")
print("  brands = [{'name': 'Contoso', 'enabled': True}]")
print("  model = create_brand_model('MyBrands', brands, access_token)")

## 2. Custom People Models

Train custom models to recognize specific individuals in videos.

In [None]:
def create_person_model(model_name: str, access_token: str) -> Dict:
    """Create a custom person model."""
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/PersonModels"
    
    params = {
        "accessToken": access_token,
        "name": model_name
    }
    
    response = requests.post(url, params=params)
    response.raise_for_status()
    
    model = response.json()
    print(f"✓ Person model created: {model_name}")
    print(f"  Model ID: {model['id']}")
    
    return model

def add_person_to_model(model_id: str, person_name: str, 
                        access_token: str) -> Dict:
    """Add a person to a custom model."""
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/PersonModels/{model_id}/Persons"
    
    params = {
        "accessToken": access_token,
        "name": person_name
    }
    
    response = requests.post(url, params=params)
    response.raise_for_status()
    
    person = response.json()
    print(f"  ✓ Added person: {person_name} (ID: {person['id']})")
    
    return person

def train_person_face(model_id: str, person_id: str, 
                     image_path: str, access_token: str):
    """Add a face image to train person recognition."""
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/PersonModels/{model_id}/Persons/{person_id}/Faces"
    
    params = {"accessToken": access_token}
    
    with open(image_path, "rb") as img_file:
        files = {"file": img_file}
        response = requests.post(url, params=params, files=files)
        response.raise_for_status()
    
    print(f"  ✓ Training image uploaded for person {person_id}")

def list_person_models(access_token: str) -> List[Dict]:
    """List all custom person models."""
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/PersonModels"
    
    params = {"accessToken": access_token}
    response = requests.get(url, params=params)
    response.raise_for_status()
    
    return response.json()

# Example usage
print("\n=== Custom People Models ===")
print("⚠ Note: Face recognition requires Limited Access approval from Microsoft")
print("\nCustom people models enable:")
print("  - Recognition of specific individuals")
print("  - Employee or celebrity identification")
print("  - Speaker tracking across videos")
print("  - Personalized content indexing")
print("\nWorkflow:")
print("  1. Create person model")
print("  2. Add persons to model")
print("  3. Upload training images for each person")
print("  4. Use model when indexing videos")

## 3. Content Moderation and Compliance

Detect and moderate inappropriate content in videos.

In [None]:
def analyze_content_moderation(insights: Dict) -> Dict:
    """Analyze content moderation results from video insights."""
    if 'videos' not in insights or len(insights['videos']) == 0:
        return {}
    
    video_insights = insights['videos'][0]['insights']
    
    moderation_results = {
        "adult_content": [],
        "racy_content": [],
        "violent_content": [],
        "inappropriate_text": []
    }
    
    # Check for adult/racy content in scenes
    if 'scenes' in video_insights:
        for scene in video_insights['scenes']:
            if 'tags' in scene:
                for tag in scene['tags']:
                    # Check for content flags
                    if 'adult' in tag.lower() or 'explicit' in tag.lower():
                        moderation_results['adult_content'].append({
                            'scene_id': scene['id'],
                            'tag': tag
                        })
                    elif 'racy' in tag.lower() or 'suggestive' in tag.lower():
                        moderation_results['racy_content'].append({
                            'scene_id': scene['id'],
                            'tag': tag
                        })
    
    # Analyze transcript for inappropriate language
    if 'transcript' in video_insights:
        # Define inappropriate terms (simplified example)
        inappropriate_terms = ['profanity', 'offensive']  # Add more as needed
        
        for segment in video_insights['transcript']:
            text = segment.get('text', '').lower()
            for term in inappropriate_terms:
                if term in text:
                    moderation_results['inappropriate_text'].append({
                        'timestamp': segment.get('start'),
                        'text': segment['text']
                    })
    
    return moderation_results

def generate_compliance_report(moderation_results: Dict) -> str:
    """Generate a compliance report based on moderation results."""
    report = []
    report.append("# Content Moderation Report")
    report.append("\n## Summary")
    
    total_issues = sum(len(v) for v in moderation_results.values())
    
    if total_issues == 0:
        report.append("✓ No content moderation issues detected.")
    else:
        report.append(f"⚠ {total_issues} potential issues detected.")
        
        for category, issues in moderation_results.items():
            if issues:
                report.append(f"\n### {category.replace('_', ' ').title()}")
                report.append(f"Found {len(issues)} instances:")
                for issue in issues[:5]:  # Show first 5
                    report.append(f"- {issue}")
    
    report.append("\n## Recommendations")
    report.append("- Review flagged content manually")
    report.append("- Consider age restrictions if necessary")
    report.append("- Add content warnings where appropriate")
    
    return "\n".join(report)

# Example usage
print("\n=== Content Moderation ===")
print("Video Indexer provides content moderation for:")
print("  - Adult content detection")
print("  - Racy/suggestive content")
print("  - Violence detection")
print("  - Inappropriate language in transcript")
print("  - Compliance reporting")
print("\nUse cases:")
print("  - Content filtering for platforms")
print("  - Age-appropriate content classification")
print("  - Brand safety monitoring")
print("  - Regulatory compliance")

## 4. Advanced Audio Analysis

Perform detailed audio analysis including speaker separation and emotion detection.

In [None]:
def analyze_speakers(insights: Dict) -> Dict:
    """Analyze speaker information and diarization."""
    if 'videos' not in insights or len(insights['videos']) == 0:
        return {}
    
    video_insights = insights['videos'][0]['insights']
    
    speaker_analysis = {
        "total_speakers": 0,
        "speakers": {},
        "speaker_timeline": []
    }
    
    # Analyze transcript for speaker information
    if 'transcript' in video_insights:
        speakers_found = set()
        
        for segment in video_insights['transcript']:
            speaker_id = segment.get('speakerId')
            
            if speaker_id:
                speakers_found.add(speaker_id)
                
                if speaker_id not in speaker_analysis['speakers']:
                    speaker_analysis['speakers'][speaker_id] = {
                        'segments': [],
                        'total_duration': 0,
                        'word_count': 0
                    }
                
                speaker_info = speaker_analysis['speakers'][speaker_id]
                speaker_info['segments'].append(segment['text'])
                speaker_info['word_count'] += len(segment['text'].split())
                
                speaker_analysis['speaker_timeline'].append({
                    'speaker_id': speaker_id,
                    'start': segment.get('start'),
                    'text': segment['text']
                })
        
        speaker_analysis['total_speakers'] = len(speakers_found)
    
    return speaker_analysis

def visualize_speaker_distribution(speaker_analysis: Dict):
    """Visualize speaker contribution in the video."""
    if not speaker_analysis.get('speakers'):
        print("No speaker data available")
        return
    
    speakers = speaker_analysis['speakers']
    speaker_ids = list(speakers.keys())
    word_counts = [speakers[sid]['word_count'] for sid in speaker_ids]
    
    # Create bar chart
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.bar(speaker_ids, word_counts, color='steelblue')
    ax.set_xlabel('Speaker ID')
    ax.set_ylabel('Word Count')
    ax.set_title('Speaker Contribution Analysis')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
    # Summary statistics
    print(f"\n=== Speaker Analysis Summary ===")
    print(f"Total speakers: {speaker_analysis['total_speakers']}")
    for speaker_id, info in speakers.items():
        print(f"\nSpeaker {speaker_id}:")
        print(f"  Segments: {len(info['segments'])}")
        print(f"  Word count: {info['word_count']}")

def analyze_audio_effects(insights: Dict) -> Dict:
    """Analyze audio effects and characteristics."""
    if 'videos' not in insights or len(insights['videos']) == 0:
        return {}
    
    video_insights = insights['videos'][0]['insights']
    
    audio_analysis = {
        "silence_segments": [],
        "noise_segments": [],
        "music_detected": False
    }
    
    # Check for audio effects in labels
    if 'labels' in video_insights:
        for label in video_insights['labels']:
            label_name = label['name'].lower()
            
            if 'music' in label_name or 'song' in label_name:
                audio_analysis['music_detected'] = True
    
    return audio_analysis

# Example usage
print("\n=== Advanced Audio Analysis ===")
print("Capabilities:")
print("  - Speaker diarization (who spoke when)")
print("  - Speaker identification")
print("  - Audio effects detection")
print("  - Music detection")
print("  - Silence detection")
print("  - Voice emotion analysis")
print("\nApplications:")
print("  - Meeting transcription with speaker labels")
print("  - Podcast analysis")
print("  - Interview processing")
print("  - Multi-speaker content indexing")

## 5. Scene and Shot Detection

Identify scene changes and extract keyframes.

In [None]:
def extract_scenes_and_shots(insights: Dict) -> Dict:
    """Extract detailed scene and shot information."""
    if 'videos' not in insights or len(insights['videos']) == 0:
        return {}
    
    video_insights = insights['videos'][0]['insights']
    
    scene_data = {
        "scenes": [],
        "shots": [],
        "keyframes": []
    }
    
    # Extract scenes
    if 'scenes' in video_insights:
        for scene in video_insights['scenes']:
            scene_info = {
                'id': scene['id'],
                'instances': scene.get('instances', [])
            }
            scene_data['scenes'].append(scene_info)
    
    # Extract shots
    if 'shots' in video_insights:
        for shot in video_insights['shots']:
            shot_info = {
                'id': shot['id'],
                'instances': shot.get('instances', []),
                'tags': shot.get('tags', [])
            }
            scene_data['shots'].append(shot_info)
    
    # Extract keyframes
    if 'keyFrames' in video_insights:
        for keyframe in video_insights['keyFrames']:
            for instance in keyframe.get('instances', []):
                scene_data['keyframes'].append({
                    'id': keyframe['id'],
                    'thumbnail_id': instance.get('thumbnailId'),
                    'timestamp': instance.get('start')
                })
    
    return scene_data

def get_thumbnail_url(video_id: str, thumbnail_id: str, 
                     access_token: str) -> str:
    """Get URL for a specific thumbnail."""
    return f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Videos/{video_id}/Thumbnails/{thumbnail_id}?accessToken={access_token}"

def visualize_scenes(scene_data: Dict):
    """Visualize scene distribution."""
    scenes = scene_data.get('scenes', [])
    
    if not scenes:
        print("No scene data available")
        return
    
    print(f"\n=== Scene Analysis ===")
    print(f"Total scenes: {len(scenes)}")
    print(f"Total shots: {len(scene_data.get('shots', []))}")
    print(f"Keyframes extracted: {len(scene_data.get('keyframes', []))}")
    
    # Calculate scene durations
    scene_durations = []
    for scene in scenes:
        if scene['instances']:
            for instance in scene['instances']:
                # Parse timestamps (format: HH:MM:SS.mmm)
                start = instance.get('start', '0')
                end = instance.get('end', '0')
                # Simplified - would need proper time parsing
                scene_durations.append(1)  # Placeholder
    
    if scene_durations:
        print(f"\nScene Statistics:")
        print(f"  Average scene length: {sum(scene_durations)/len(scene_durations):.2f}s")
        print(f"  Shortest scene: {min(scene_durations):.2f}s")
        print(f"  Longest scene: {max(scene_durations):.2f}s")

# Example usage
print("\n=== Scene and Shot Detection ===")
print("Features:")
print("  - Automatic scene boundary detection")
print("  - Shot segmentation")
print("  - Keyframe extraction")
print("  - Scene tagging and classification")
print("\nApplications:")
print("  - Video editing assistance")
print("  - Content summarization")
print("  - Thumbnail generation")
print("  - Scene search and navigation")

## 6. Batch Video Processing

Process multiple videos efficiently with batch operations.

In [None]:
import concurrent.futures

def upload_video_batch(video_files: List[str], access_token: str, 
                       max_concurrent: int = 3) -> List[Dict]:
    """Upload multiple videos in parallel."""
    
    def upload_single(video_path: str) -> Dict:
        """Upload a single video."""
        video_name = Path(video_path).stem
        
        url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Videos"
        params = {
            "accessToken": access_token,
            "name": video_name,
            "privacy": "Private"
        }
        
        try:
            with open(video_path, "rb") as f:
                files = {"file": f}
                response = requests.post(url, params=params, files=files)
                response.raise_for_status()
                result = response.json()
                return {
                    "status": "success",
                    "video_path": video_path,
                    "video_id": result['id'],
                    "name": video_name
                }
        except Exception as e:
            return {
                "status": "error",
                "video_path": video_path,
                "error": str(e)
            }
    
    print(f"Uploading {len(video_files)} videos...")
    results = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor:
        future_to_video = {executor.submit(upload_single, vf): vf for vf in video_files}
        
        for future in concurrent.futures.as_completed(future_to_video):
            result = future.result()
            results.append(result)
            
            if result['status'] == 'success':
                print(f"✓ {result['name']}: {result['video_id']}")
            else:
                print(f"✗ {result['video_path']}: {result['error']}")
    
    return results

def monitor_batch_indexing(video_ids: List[str], access_token: str, 
                          check_interval: int = 30) -> Dict[str, str]:
    """Monitor indexing status for multiple videos."""
    statuses = {vid: "Uploaded" for vid in video_ids}
    
    while True:
        all_complete = True
        
        for video_id in video_ids:
            if statuses[video_id] not in ['Processed', 'Failed']:
                url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Videos/{video_id}/Index"
                params = {"accessToken": access_token}
                
                try:
                    response = requests.get(url, params=params)
                    response.raise_for_status()
                    result = response.json()
                    statuses[video_id] = result.get('state', 'Unknown')
                except:
                    statuses[video_id] = 'Error'
                
                if statuses[video_id] not in ['Processed', 'Failed']:
                    all_complete = False
        
        # Print status update
        print(f"\nBatch Status:")
        for vid, status in statuses.items():
            print(f"  {vid[:8]}...: {status}")
        
        if all_complete:
            break
        
        time.sleep(check_interval)
    
    return statuses

# Example usage
print("\n=== Batch Video Processing ===")
print("Benefits:")
print("  - Process multiple videos simultaneously")
print("  - Reduce total processing time")
print("  - Centralized monitoring")
print("  - Bulk insights extraction")
print("\nWorkflow:")
print("  1. Upload videos in parallel")
print("  2. Monitor indexing progress")
print("  3. Collect insights from all videos")
print("  4. Perform comparative analysis")

## 7. Custom Language Models

Create custom language models for domain-specific terminology.

In [None]:
def create_language_model(model_name: str, language: str, 
                         access_token: str) -> Dict:
    """Create a custom language model."""
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/Language"
    
    params = {
        "accessToken": access_token,
        "modelName": model_name,
        "language": language
    }
    
    response = requests.post(url, params=params)
    response.raise_for_status()
    
    model = response.json()
    print(f"✓ Language model created: {model_name}")
    print(f"  Model ID: {model['id']}")
    
    return model

def upload_language_data(model_id: str, training_text: str, 
                        access_token: str):
    """Upload training data for language model."""
    url = f"{API_BASE_URL}/{VIDEO_INDEXER_LOCATION}/Accounts/{VIDEO_INDEXER_ACCOUNT_ID}/Customization/Language/{model_id}/Data"
    
    params = {"accessToken": access_token}
    
    # Training text should be in proper format
    files = {"file": ("training.txt", training_text, "text/plain")}
    
    response = requests.post(url, params=params, files=files)
    response.raise_for_status()
    
    print("✓ Language training data uploaded")

# Example usage
print("\n=== Custom Language Models ===")
print("Use cases:")
print("  - Industry-specific terminology")
print("  - Medical/legal vocabulary")
print("  - Company-specific jargon")
print("  - Regional dialects")
print("  - Technical documentation")
print("\nProcess:")
print("  1. Create language model for specific language")
print("  2. Upload training text with domain terminology")
print("  3. Train the model")
print("  4. Use model when indexing videos for improved transcription")

## 8. Video Clipping and Editing

Create clips and highlights from indexed videos.

In [None]:
def create_video_clip(video_id: str, start_time: str, end_time: str,
                     access_token: str) -> Dict:
    """Create a clip from an indexed video.
    
    Args:
        video_id: The video ID
        start_time: Start time in format "HH:MM:SS"
        end_time: End time in format "HH:MM:SS"
        access_token: Access token
    """
    # Note: Actual clip creation might require different API endpoint
    # This is a simplified example
    
    clip_info = {
        "video_id": video_id,
        "start_time": start_time,
        "end_time": end_time
    }
    
    print(f"Clip created: {start_time} to {end_time}")
    return clip_info

def create_highlights_from_insights(insights: Dict, 
                                   criteria: str = "keywords") -> List[Dict]:
    """Create highlight clips based on insights."""
    if 'videos' not in insights or len(insights['videos']) == 0:
        return []
    
    video_insights = insights['videos'][0]['insights']
    highlights = []
    
    if criteria == "keywords":
        # Create highlights around top keywords
        if 'keywords' in video_insights:
            top_keywords = sorted(
                video_insights['keywords'],
                key=lambda x: x.get('confidence', 0),
                reverse=True
            )[:5]
            
            for keyword in top_keywords:
                for appearance in keyword.get('appearances', []):
                    highlights.append({
                        'type': 'keyword',
                        'keyword': keyword['name'],
                        'start': appearance.get('startTime'),
                        'end': appearance.get('endTime')
                    })
    
    elif criteria == "sentiment":
        # Create highlights around positive sentiments
        if 'sentiments' in video_insights:
            for sentiment in video_insights['sentiments']:
                if sentiment.get('sentimentType') == 'Positive':
                    highlights.append({
                        'type': 'sentiment',
                        'sentiment': 'Positive',
                        'duration': sentiment.get('seenDuration')
                    })
    
    return highlights

# Example usage
print("\n=== Video Clipping and Editing ===")
print("Features:")
print("  - Create clips from timestamps")
print("  - Auto-generate highlights")
print("  - Extract key moments")
print("  - Create compilations")
print("\nHighlight criteria:")
print("  - Top keywords")
print("  - Positive sentiments")
print("  - Specific topics")
print("  - Face appearances")
print("  - Action scenes")

## 9. Integration with Azure Services

Integrate Video Indexer with other Azure services.

In [None]:
from azure.storage.blob import BlobServiceClient

def setup_blob_storage_integration() -> Dict:
    """Setup integration with Azure Blob Storage."""
    print("\n=== Azure Blob Storage Integration ===")
    print("\nSetup steps:")
    print("1. Create Azure Storage account")
    print("2. Create container for videos")
    print("3. Grant Video Indexer access to storage")
    print("4. Configure automatic upload triggers")
    
    integration_info = {
        "storage_account": "Your storage account",
        "container": "videos",
        "trigger_type": "blob_created"
    }
    
    return integration_info

def setup_logic_apps_workflow() -> Dict:
    """Setup automated workflow with Azure Logic Apps."""
    print("\n=== Azure Logic Apps Integration ===")
    print("\nWorkflow example:")
    print("1. Trigger: New video uploaded to Blob Storage")
    print("2. Action: Upload video to Video Indexer")
    print("3. Action: Wait for indexing completion")
    print("4. Action: Extract insights")
    print("5. Action: Store insights in Cosmos DB")
    print("6. Action: Send notification via email/Teams")
    
    workflow_info = {
        "trigger": "BlobCreated",
        "actions": [
            "UploadToVideoIndexer",
            "WaitForIndexing",
            "ExtractInsights",
            "StoreInCosmosDB",
            "SendNotification"
        ]
    }
    
    return workflow_info

def setup_azure_functions() -> Dict:
    """Setup serverless processing with Azure Functions."""
    print("\n=== Azure Functions Integration ===")
    print("\nFunction examples:")
    print("- ProcessVideoTrigger: Triggered by blob upload")
    print("- ExtractInsightsFunction: Process indexed videos")
    print("- GenerateSummaryFunction: Create video summaries")
    print("- NotificationFunction: Send alerts and reports")
    
    functions_info = {
        "runtime": "Python 3.9",
        "triggers": ["BlobTrigger", "HttpTrigger", "TimerTrigger"],
        "bindings": ["CosmosDB", "Queue", "ServiceBus"]
    }
    
    return functions_info

# Example usage
print("\n=== Azure Services Integration ===")
print("\nIntegration possibilities:")
print("  - Azure Blob Storage: Video storage and triggers")
print("  - Azure Logic Apps: Automated workflows")
print("  - Azure Functions: Serverless processing")
print("  - Azure Cosmos DB: Store insights and metadata")
print("  - Azure Cognitive Search: Index and search video content")
print("  - Power BI: Analytics and visualization")
print("  - Microsoft Teams: Notifications and collaboration")

# Show example setups
setup_blob_storage_integration()
setup_logic_apps_workflow()
setup_azure_functions()

## Summary

This advanced notebook covered:

1. **Custom Brand Models** - Detect company-specific brands and products
2. **Custom People Models** - Recognize specific individuals (requires approval)
3. **Content Moderation** - Detect inappropriate content for compliance
4. **Advanced Audio Analysis** - Speaker diarization and emotion detection
5. **Scene & Shot Detection** - Identify scene boundaries and extract keyframes
6. **Batch Processing** - Process multiple videos efficiently
7. **Custom Language Models** - Improve transcription for domain-specific terms
8. **Video Clipping** - Create highlights and clips from insights
9. **Azure Integration** - Connect with Storage, Logic Apps, Functions

### Production Considerations

- **Authentication**: Use managed identities in production
- **Rate Limits**: Implement retry logic and backoff
- **Error Handling**: Robust error handling and logging
- **Monitoring**: Track processing status and costs
- **Privacy**: Handle PII and comply with regulations
- **Limited Access**: Face recognition requires Microsoft approval

### Next Steps

- Build end-to-end video processing pipelines
- Implement custom models for your use case
- Create automated workflows with Logic Apps
- Integrate with your existing Azure infrastructure
- Develop custom applications using Video Indexer insights

### Additional Resources

- [Video Indexer API Reference](https://api-portal.videoindexer.ai/)
- [Custom Models Documentation](https://learn.microsoft.com/azure/azure-video-indexer/customize-brands-model-overview)
- [Limited Access Features](https://learn.microsoft.com/azure/azure-video-indexer/limited-access-features)
- [Best Practices](https://learn.microsoft.com/azure/azure-video-indexer/considerations-when-use-at-scale)