# OCI Vision Video Analysis (Beginner Notebook)

Analyze video with Oracle Cloud Vision's pre-trained models for labels, objects, text, and faces, using the official Python SDK.

This notebook is adapted from the script in `vision/oci_vision_video.py`, step-by-step for beginner-friendly workflows.

## 1. Setup & Requirements

- `sandbox.yaml` must be properly set up
- `.env` for secrets/prefix (if needed)
- Default video: `vision/mall.mp4` (replace with your own if needed)

**Install dependencies**
```bash
pip install oci python-dotenv envyaml
```


In [1]:
import os
from dotenv import load_dotenv
from envyaml import EnvYAML
from pathlib import Path
import json
import time
import oci
from oci.object_storage import ObjectStorageClient
load_dotenv()

## 2. Load OCI and Bucket Configuration

In [2]:
SANDBOX_CONFIG_FILE = "sandbox.yaml"
VIDEO_PATH = Path("vision/mall.mp4")

def load_config(config_path):
    try:
        return EnvYAML(config_path)
    except FileNotFoundError:
        print(f"Error: Configuration file '{config_path}' not found.")
        return None
    except Exception as e:
        print(f"Error loading config: {e}")
        return None

scfg = load_config(SANDBOX_CONFIG_FILE)
assert scfg is not None and 'oci' in scfg and 'bucket' in scfg, 'Check your config!'
oci_cfg = oci.config.from_file(os.path.expanduser(scfg["oci"]["configFile"]), scfg["oci"]["profile"])
bucket_cfg = scfg["bucket"]
compartment_id = scfg["oci"]["compartment"]

## 3. Upload Video to Object Storage

In [3]:
def upload(oci_cfg, bucket_cfg, file_path):
    if not file_path.exists():
        print(f"Error: File '{file_path}' not found.")
        return False
    object_storage_client = ObjectStorageClient(oci_cfg)
    print(f"Uploading file {file_path} ...")
    object_storage_client.put_object(
        bucket_cfg["namespace"], 
        bucket_cfg["bucketName"], 
        f"{bucket_cfg['prefix']}/{file_path.name}", 
        open(file_path, 'rb')
    )
    print("Upload completed!")
    return True

uploaded = upload(oci_cfg, bucket_cfg, VIDEO_PATH)
if not uploaded:
    raise ValueError("Upload failed. Check your video and configuration.")

## 4. Build Video Analysis Feature List

In [4]:
from oci.ai_vision.models import (
    ObjectLocation, ObjectListInlineInputLocation, OutputLocation,
    CreateVideoJobDetails, VideoObjectDetectionFeature,
    VideoFaceDetectionFeature, VideoLabelDetectionFeature, VideoTextDetectionFeature
)

def get_input_location(bucket_cfg, file_name):
    object_location = ObjectLocation(
        namespace_name=bucket_cfg["namespace"],
        bucket_name=bucket_cfg["bucketName"],
        object_name=f"{bucket_cfg['prefix']}/{file_name}",
    )
    return ObjectListInlineInputLocation(object_locations=[object_location])

def get_output_location(bucket_cfg):
    return OutputLocation(
        namespace_name=bucket_cfg["namespace"],
        bucket_name=bucket_cfg["bucketName"],
        prefix=bucket_cfg["prefix"],
    )

features = [
    VideoTextDetectionFeature(),
    VideoFaceDetectionFeature(),
    VideoLabelDetectionFeature(),
    VideoObjectDetectionFeature()
]

## 5. Submit Video Analysis Job

In [5]:
vision_client = oci.ai_vision.AIServiceVisionClient(config=oci_cfg)
job_details = CreateVideoJobDetails(
    features=features,
    input_location=get_input_location(bucket_cfg, VIDEO_PATH.name),
    output_location=get_output_location(bucket_cfg),
    compartment_id=compartment_id,
)
res = vision_client.create_video_job(create_video_job_details=job_details)
job_id = None
if res is not None and hasattr(res, 'data') and hasattr(res.data, 'id'):
    job_id = res.data.id
    print(f"Job {job_id} created. State: {getattr(res.data,'lifecycle_state','?')}")
else:
    print(f"Error submitting video job: {res}")

## 6. Poll for Completion

In [6]:
poll_seconds = 0
if job_id is not None:
    while True:
        get_res = vision_client.get_video_job(video_job_id=job_id)
        state = getattr(get_res.data, 'lifecycle_state', None)
        percent = getattr(get_res.data, 'percent_complete', None)
        print(f"{state} after {poll_seconds}s (progress={percent})")
        if state not in ["IN_PROGRESS", "ACCEPTED"]:
            break
        time.sleep(5)
        poll_seconds += 5
    print(f"Job finished with state: {state}")

## 7. Download and Parse Video Results

In [7]:
def pretty_print_response(data):
    if not isinstance(data, dict):
        return print("Not a dict response!")
    video_labels = data.get("videoLabels", [])
    if video_labels:
        print("\nLabels:")
        for lab in video_labels:
            print(f"  {lab.get('name', 'Unknown')}")
            for seg in lab.get("segments", []):
                span = seg.get("videoSegment", {})
                start = span.get("startTimeOffsetMs", "N/A")
                end = span.get("endTimeOffsetMs", "N/A")
                conf = seg.get("confidence", 0)
                print(f"    {start}-{end}ms  conf={conf:.2f}")
    video_objects = data.get("videoObjects", [])
    if video_objects:
        print("\nObjects:")
        for obj in video_objects:
            label = obj.get("name", "Unknown")
            for seg in obj.get("segments", []):
                span = seg.get("videoSegment", {})
                start = span.get("startTimeOffsetMs", "N/A")
                end = seg.get("videoSegment", {}).get("endTimeOffsetMs", "N/A")
                conf = seg.get("confidence", 0)
                print(f"    {start}-{end}ms  {label}  conf={conf:.2f}")
    video_texts = data.get("videoTexts", [])
    if video_texts:
        print("\nText Lines:")
        for txt in video_texts:
            content = txt.get("text", "")
            for seg in txt.get("segments", []):
                span = seg.get("videoSegment", {})
                start = span.get("startTimeOffsetMs", "N/A")
                end = span.get("endTimeOffsetMs", "N/A")
                conf = seg.get("confidence", 0)
                print(f"    {start}-{end}ms  \"{content}\"  conf={conf:.2f}")
    video_faces = data.get("videoFaces", [])
    if video_faces:
        print("\nFaces:")
        for face in video_faces:
            for seg in face.get("segments", []):
                span = seg.get("videoSegment", {})
                start = span.get("startTimeOffsetMs", "N/A")
                end = span.get("endTimeOffsetMs", "N/A")
                conf = seg.get("confidence", 0)
                print(f"    {start}-{end}ms  Face  conf={conf:.2f}")
    if not any([video_labels, video_objects, video_texts, video_faces]):
        print("No features detected.")

if job_id is not None and state == "SUCCEEDED":
    object_storage_client = ObjectStorageClient(oci_cfg)
    prefix = bucket_cfg['prefix']
    object_name = f"{prefix}/{job_id}/{prefix}/{VIDEO_PATH.name}.json"
    response = object_storage_client.get_object(
        bucket_cfg['namespace'], bucket_cfg['bucketName'], object_name)
    if hasattr(response, 'data') and hasattr(response.data, 'content'):
        json_data = json.loads(response.data.content.decode('utf-8'))
        pretty_print_response(json_data)
    else:
        print('Failed to download or parse job results!')

## ðŸŽ‰ Done! Try using different video files, and experiment with features/models.