## Import Libraries

In [None]:
import os
from dotenv import load_dotenv
from urllib.parse import quote
from azure.storage.blob import BlobClient
from azure.core.credentials import AzureSasCredential
from moviepy import VideoFileClip
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import cv2
import requests
load_dotenv()

## Bring in Environment Variables

In [18]:
VISION_API_KEY = os.environ.get("VISION_API_KEY")
VISION_ENDPOINT = os.environ.get("VISION_ENDPOINT")
SOURCE_DIRECTORY = os.environ.get("SOURCE_DIRECTORY")
INDEX_NAME = "blog"
AZURE_STORAGE_ACCOUNT_NAME = os.environ.get("AZURE_STORAGE_ACCOUNT_NAME")
AZURE_STORAGE_CONTAINER_NAME = "blog"
AZURE_STORAGE_SAS = os.environ.get("AZURE_STORAGE_SAS")

## Convenience Functions

In [19]:
def upload_to_blob(blob_name: str, source_directory: str = SOURCE_DIRECTORY, extension: str = "") -> str:
    blob_service_url = f"https://{AZURE_STORAGE_ACCOUNT_NAME}.blob.core.windows.net/"
    complete_blob_name = f"{blob_name}{extension}"
    encoded_name = quote(complete_blob_name) 
    blob_client = BlobClient(
        account_url=blob_service_url,
        container_name=AZURE_STORAGE_CONTAINER_NAME,
        blob_name=complete_blob_name,
        credential=AzureSasCredential(AZURE_STORAGE_SAS),
        max_block_size=4 * 1024 * 1024,
        max_single_put_size=16 * 1024 * 1024,
    )
    file_path = os.path.join(source_directory, complete_blob_name)
    with open(file_path, "rb") as file_data:
        blob_client.upload_blob(data=file_data, overwrite=True, max_concurrency=2)
    return f"{blob_service_url}{AZURE_STORAGE_CONTAINER_NAME}/{encoded_name}"


def upload_videos_to_blob(file_name, source_directory=SOURCE_DIRECTORY):
    video_file_path = os.path.join(source_directory, file_name)
    with VideoFileClip(video_file_path) as video_clip:
        duration_minutes = round(video_clip.duration / 60, 1) 
        frame_rate = round(video_clip.fps)
        video_resolution = video_clip.size
    file_size_in_bytes = os.path.getsize(video_file_path)
    file_size_in_mb = round(file_size_in_bytes / (1024 * 1024), 1)
    blob_storage_url = upload_to_blob(file_name, source_directory)
    video_metadata = {
        "duration_minutes": duration_minutes,
        "frame_rate": frame_rate,
        "resolution": video_resolution,
        "blob_url": blob_storage_url,
        "file_size_mb": file_size_in_mb,
    }
    return video_metadata

def create_index(endpoint_url, subscription_key, index_name):
    url = f"{endpoint_url}/computervision/retrieval/indexes/{index_name}?api-version=2023-05-01-preview"
    headers = {
        "Ocp-Apim-Subscription-Key": subscription_key,
        "Content-Type": "application/json"
    }
    payload = {
        "metadataSchema": {
            "fields": [
                {"name": "filename", "searchable": False, "filterable": True, "type": "string"},
                {
                    "name": "duration_mins",
                    "searchable": False,
                    "filterable": True,
                    "type": "string",
                },
                {"name": "size_mb", "searchable": False, "filterable": True, "type": "string"},
                {
                    "name": "resolution",
                    "searchable": False,
                    "filterable": True,
                    "type": "string",
                },
                {"name": "fps", "searchable": False, "filterable": True, "type": "string"},
            ]
        },
        "features": [
            {
                "name": "vision",
                "domain": "surveillance"
            },
            {
                "name": "speech"
            }
        ]
    }

    response = requests.put(url, headers=headers, json=payload)
    return response.json(), response.status_code

def delete_index(endpoint_url, subscription_key, index_name):
    url = f"{endpoint_url}/computervision/retrieval/indexes/{index_name}?api-version=2023-05-01-preview"
    headers = {
        "Ocp-Apim-Subscription-Key": subscription_key,
        "Content-Type": "application/json",
    }
    response = requests.delete(url, headers=headers)

    return response

def create_ingestion(endpoint_url, subscription_key, index_name, video_metadata_list):
    ingestion_name = f"{index_name}-ingestion"
    url = f"{endpoint_url}/computervision/retrieval/indexes/{index_name}/ingestions/{ingestion_name}?api-version=2023-05-01-preview"
    headers = {
        "Ocp-Apim-Subscription-Key": subscription_key,
        "Content-Type": "application/json",
    }
    body = {
        "videos": video_metadata_list,
        "includeSpeechTranscript": True,
        "moderation": False,
    }
    
    response = requests.put(url, json=body, headers=headers)
    return response.json()

def list_indexes(endpoint_url, subscription_key):
    """
    List indexes
    """
    url = f"{endpoint_url}/computervision/retrieval/indexes?api-version=2023-05-01-preview"
    headers = {"Ocp-Apim-Subscription-Key": subscription_key}
    response = requests.get(url, headers=headers)

    return response.json()


def check_ingestion(endpoint_url, subscription_key, index_name, top=20):
    url = f"{endpoint_url}/computervision/retrieval/indexes/{index_name}/ingestions?api-version=2023-05-01-preview&$top={top}"
    headers = {
        "Ocp-Apim-Subscription-Key": subscription_key
    }

    response = requests.get(url, headers=headers)
    return response.json(), response.status_code

def search_video(endpoint_url, index_name, subscription_key, queryText="", featureFilters=["vision", "speech"]):
    url = (
        endpoint_url
        + "/computervision/retrieval/indexes/"
        + index_name
        + ":queryByText?api-version=2023-05-01-preview"
    )

    headers = {
        "Content-type": "application/json",
        "Ocp-Apim-Subscription-Key": subscription_key,
    }

    body = {
        "queryText": queryText,
        "dedup": True,
        "filters": {"featureFilters": featureFilters},
        #"top": 2
    }

    query_result = None

    try:
        r = requests.post(url, json=body, headers=headers)
        query_result = r.json()
    except Exception as error:
        print("Video search operation failed ")
        print(error)

    return query_result

def extract_frame(video_path, timestamp):
    try:
        h, m, s = timestamp.split(":")
        s, ms = (s.split(".") + ["0"])[
            :2
        ] 
        frame_time = (int(h) * 3600 + int(m) * 60 + int(s)) * 1000 + int(
            ms[:3]
        )
        cap = cv2.VideoCapture(video_path)
        cap.set(cv2.CAP_PROP_POS_MSEC, frame_time)

        success, frame = cap.read()

        if success:
            return cv2.cvtColor(
                frame, cv2.COLOR_BGR2RGB
            ) 
    except Exception as e:
        print(f"Error extracting frame: {e}")

    return None

    
def display_tiles(matches_df, nrows=2, ncols=3, figsize=(16, 9)):
    """
    Display videos tiles
    """
    fig, axes = plt.subplots(nrows=nrows, figsize=figsize, dpi=100)
    axes = axes.flatten()

    for i, (ax, (_, row)) in enumerate(zip(axes, matches_df.iterrows())):
        row = row["value"]
        video_path = f"{SOURCE_DIRECTORY}/{row['documentId']}"
        timestamp = row["best"]
        frame = extract_frame(video_path, timestamp)
        if frame is not None:
            ax.imshow(frame)
        else:
            ax.imshow(
                np.zeros((100, 100, 3), dtype=np.uint8)
            )
        title = f"Top {i+1}, (score: {row['relevance']:.3f}) - Video id: {row['documentId']}"
        start = row["start"].split(".")[0] if "start" in row else None
        end = row["end"].split(".")[0] if "end" in row else None
        subtitle = f"{start} - {end}" if start and end else "Timestamp not available"

        ax.set_title(f"{title}\n{subtitle}", fontsize=10, pad=10)
        ax.axis("off")

    plt.tight_layout()
    plt.show()
    

def gather_video_metadata(video):
    return {
        "mode": "add",
        "documentId": video["id"],
        "documentUrl": video["blob_url"] + "?" + AZURE_STORAGE_SAS,
        "metadata": {
            "filename": video["filename"],
            "duration_mins": str(video["duration_minutes"]),
            "size_mb": str(video["file_size_mb"]),
            "resolution": str(video["resolution"]),
            "fps": str(video["frame_rate"]),
        },
    }

def chat_with_your_own_videos(query):
    search_results = search_video(VISION_ENDPOINT, INDEX_NAME, VISION_API_KEY, queryText=query, featureFilters=["speech", "vision"])
    df = pd.DataFrame.from_records(search_results).head(10)
    display_tiles(df)

    return df

## Generate Metadata for Video

In [None]:
video_files = [
    filename for filename in os.listdir(SOURCE_DIRECTORY) if filename.lower().endswith(".mp4")
]
video_df = pd.DataFrame({"id": video_files})
metadata_df = video_df["id"].apply(upload_videos_to_blob)
metadata_df = pd.DataFrame(metadata_df.tolist())
video_df = video_df.join(metadata_df)
video_df["filename"] = video_df["id"]
display(video_df)

## Delete Index If Already Exists

In [None]:
delete_index(VISION_ENDPOINT, VISION_API_KEY, INDEX_NAME)

## Create Index

In [None]:
create_index(VISION_ENDPOINT, VISION_API_KEY, INDEX_NAME)

## List Indexes

In [None]:
list_indexes(VISION_ENDPOINT, VISION_API_KEY)

## Gather Metadata and Create Video Ingestion Into Azure AI Vision

In [None]:
videos = [gather_video_metadata(video) for _, video in video_df.iterrows()]
create_ingestion(VISION_ENDPOINT, VISION_API_KEY, INDEX_NAME, videos)

## Check Ingestion Status

In [None]:
check_ingestion(VISION_ENDPOINT, VISION_API_KEY, INDEX_NAME)

## Query #1

In [None]:
chat_with_your_own_videos("horses running")

## Query #2

In [None]:
chat_with_your_own_videos("birds flying")

## Query #3

In [None]:
chat_with_your_own_videos("very sleepy")