## Multi-Modal RAG with Azure AI Content Understanding - Data Prep

![rag_data_prep](./Assets/rag_data_prep.png)

### Install Dependencies and Libraries

In [1]:
%pip install python-dotenv pdfminer.six openai azure-identity azure-storage-blob

Collecting pdfminer.six
  Downloading pdfminer_six-20260107-py3-none-any.whl.metadata (4.3 kB)
Downloading pdfminer_six-20260107-py3-none-any.whl (6.6 MB)
   ---------------------------------------- 0.0/6.6 MB ? eta -:--:--
   ---------------------------------------  6.6/6.6 MB 36.6 MB/s eta 0:00:01
   ---------------------------------------- 6.6/6.6 MB 31.2 MB/s  0:00:00
Installing collected packages: pdfminer.six
Successfully installed pdfminer.six-20260107
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: C:\Users\asridharan\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


### Setting Up the Environment 

In [2]:
import os
from dotenv import load_dotenv
import requests
import json
from azure.identity import DefaultAzureCredential

load_dotenv(override=True)

# Content Understanding Service Connections
CONTENT_UNDERSTANDING_ENDPOINT = os.getenv("CONTENT_UNDERSTANDING_ENDPOINT").strip().rstrip('/')
CONTENT_UNDERSTANDING_API_KEY = os.getenv("CONTENT_UNDERSTANDING_API_KEY", "").strip()

# Storage Account Connections
storage_account_endpoint = os.getenv("STORAGE_ACCOUNT_ENDPOINT").strip().rstrip('/')
storage_container_name = os.getenv("STORAGE_CONTAINER_NAME")

# Authentication Mode
AUTH_MODE = os.getenv("AUTH_MODE", "entra").lower()  # "entra" or "key"

# API Version - using GA API
API_VERSION = "2025-11-01"

# Setup credential for Entra ID auth
credential = None
if AUTH_MODE == "entra":
    credential = DefaultAzureCredential()
    print("‚úÖ Using Entra ID authentication")
else:
    print("üîë Using API Key authentication")

def get_headers():
    """Get appropriate headers based on auth mode."""
    if AUTH_MODE == "entra" and credential:
        token = credential.get_token("https://cognitiveservices.azure.com/.default")
        return {
            "Authorization": f"Bearer {token.token}",
            "Content-Type": "application/json"
        }
    else:
        return {
            "Content-Type": "application/json",
            "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
        }

print(f"Endpoint: {CONTENT_UNDERSTANDING_ENDPOINT}")
print(f"Storage: {storage_account_endpoint}/{storage_container_name}")

‚úÖ Using Entra ID authentication
Endpoint: https://demo-aisvcs-eus.services.ai.azure.com
Storage: https://multimodalragsa2.blob.core.windows.net/ragdata


### Creating Functions for Performing Video Analysis using Prebuilt Video Analyzer

In [5]:
# Create a function to analyze video using prebuilt video analyzer - makes API calls to Content Understanding service
# GA API uses "prebuilt-video" (not "prebuilt-videoAnalyzer")
def analyze_video(video_url):
    prebuilt_video_analyzer_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzers/prebuilt-video:analyze?api-version={API_VERSION}"

    # GA API uses "inputs" array format
    body = {
        "inputs": [{"url": video_url}]
    }

    video_analysis_result = {}

    try:
        headers = get_headers()

        response = requests.post(prebuilt_video_analyzer_url, headers=headers, json=body)
        response.raise_for_status()
        
        # GA API returns operation-location in header
        operation_location = response.headers.get("Operation-Location")
        if operation_location:
            get_result_url = operation_location
        else:
            result = response.json()
            analysis_id = result.get("id")
            print("Analysis ID:", analysis_id)
            get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version={API_VERSION}"
        
        print(f"Operation URL: {get_result_url}")

        # Poll for results
        analysis_status = "Running"
        while analysis_status in ["Running", "NotStarted"]:
            import time
            time.sleep(10)  # Wait before polling
            headers_poll = get_headers()
            if "Content-Type" in headers_poll:
                del headers_poll["Content-Type"]
            status_response = requests.get(get_result_url, headers=headers_poll)
            status_response.raise_for_status()
            status_result = status_response.json()
            analysis_status = status_result.get("status")
            print("Current Analysis Status:", analysis_status)
            
        video_analysis_result = status_result
        print("Video Analysis Complete!")
        return video_analysis_result

    except requests.RequestException as e:
        print(f"Error occurred: {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"Response: {e.response.text}")


# Creating a Function to extract relevant information from the video analysis result and prepare it for RAG
def build_simple_video_dataset(
    analysis_result,
    video_url,
    title,
    include_full_transcript_row=True,
    include_words=False,          # set True to include word-level timings in each phrase
    ensure_ascii=False
):
    """
    Builds dataset rows with segments + their transcripts.
    Output rows: [{"document_title": ..., "content_text": <JSON string>}]

    content_text JSON schema (per segment):
    {
      "about": "...",
      "video_url": "...",
      "segment_id": "...",
      "time_window": {"start_ms": int, "end_ms": int},
      "segment_description": "...",
      "transcript": [
        {
          "speaker": "Speaker 1",
          "start_ms": 1360,
          "end_ms": 6640,
          "text": "...",
          "confidence": 0.937,
          "words": [ { "start_ms": ..., "end_ms": ..., "text": "..." }, ... ]   # only if include_words=True
        },
        ...
      ],
      "transcript_text": "Concatenated transcript text for this segment"
    }

    If include_full_transcript_row=True, an extra row is appended with the entire transcript.
    """
    result = (analysis_result or {}).get("result", {}) or {}
    contents = result.get("contents") or []

    # --- Collect segments from both shapes ---
    segments = []
    for c in contents:
        # Shape 1
        for s in c.get("segments") or []:
            segments.append({
                "segmentId": s.get("segmentId"),
                "startTimeMs": s.get("startTimeMs"),
                "endTimeMs": s.get("endTimeMs"),
                "description": s.get("description") or ""
            })
        # Shape 2
        fields = c.get("fields") or {}
        f_segments = ((fields.get("Segments") or {}).get("valueArray")) or []
        for item in f_segments:
            vo = (item or {}).get("valueObject") or {}
            segments.append({
                "segmentId": ((vo.get("SegmentId") or {}).get("valueString")),
                "startTimeMs": ((vo.get("StartTimeMs") or {}).get("valueInteger")),
                "endTimeMs": ((vo.get("EndTimeMs") or {}).get("valueInteger")),
                "description": ((vo.get("Description") or {}).get("valueString")) or ""
            })

    # --- Collect transcript phrases ---
    all_phrases = []
    for c in contents:
        for p in c.get("transcriptPhrases") or []:
            phrase = {
                "speaker": p.get("speaker"),
                "start_ms": p.get("startTimeMs"),
                "end_ms": p.get("endTimeMs"),
                "text": p.get("text"),
                "confidence": p.get("confidence")
            }
            if include_words:
                phrase["words"] = [
                    {
                        "start_ms": w.get("startTimeMs"),
                        "end_ms": w.get("endTimeMs"),
                        "text": w.get("text")
                    } for w in (p.get("words") or [])
                ]
            all_phrases.append(phrase)

    def overlaps(seg_start, seg_end, p_start, p_end):
        """True if [p_start, p_end] overlaps [seg_start, seg_end]."""
        if seg_start is None or seg_end is None or p_start is None or p_end is None:
            return False
        return not (p_end < seg_start or p_start > seg_end)

    rows = []

    # If no segments, create a single fallback with full transcript (if present)
    if not segments:
        # Build overall transcript text (if any)
        overall_text = " ".join([p.get("text") or "" for p in all_phrases]).strip() if all_phrases else None

        content_text_obj = {
            "about": "This is a JSON string representing the overall video summary.",
            "video_url": video_url,
            "note": "No segment details available."
        }
        if overall_text:
            content_text_obj["full_transcript_text"] = overall_text
            content_text_obj["full_transcript"] = all_phrases  # could be large

        rows.append({
            "document_title": f"{title} ‚Ä¢ Full Video",
            "content_text": json.dumps(content_text_obj, ensure_ascii=ensure_ascii)
        })
        return rows

    # Build rows per segment with attached transcript snippets
    for s in segments:
        st, et = s.get("startTimeMs"), s.get("endTimeMs")
        seg_id = s.get("segmentId") or "?"
        seg_desc = s.get("description") or ""

        # collect overlapping phrases
        seg_phrases = [
            p for p in all_phrases
            if overlaps(st, et, p.get("start_ms"), p.get("end_ms"))
        ]

        # Concatenate a readable segment transcript
        seg_transcript_text = " ".join([(p.get("text") or "").strip() for p in seg_phrases]).strip()

        content_text_obj = {
            "about": "This is a JSON string representing a slice of video analysis for RAG.",
            "video_url": video_url,
            "segment_id": seg_id,
            "time_window": {"start_ms": st, "end_ms": et},
            "segment_description": seg_desc,
            "transcript": seg_phrases,
            "transcript_text": seg_transcript_text
        }

        rows.append({
            "document_title": f"{title} ‚Ä¢ Segment {seg_id}",
            "content_text": json.dumps(content_text_obj, ensure_ascii=ensure_ascii)
        })

    # Optional final row with the full transcript (nice for global search)
    if include_full_transcript_row and all_phrases:
        full_text = " ".join([(p.get("text") or "").strip() for p in all_phrases]).strip()
        full_obj = {
            "about": "This is a JSON string representing the overall video transcript.",
            "video_url": video_url,
            "transcript": all_phrases,
            "transcript_text": full_text
        }
        rows.append({
            "document_title": f"{title} ‚Ä¢ Full Transcript",
            "content_text": json.dumps(full_obj, ensure_ascii=ensure_ascii)
        })

    return rows

In [6]:
# Constructing the Video URL
video_url = f"{storage_account_endpoint}/{storage_container_name}/BMW_circularity.mp4"

# Analyzing the video and building the dataset
video_analysis_result = analyze_video(video_url)
video_dataset = build_simple_video_dataset(video_analysis_result, video_url, title="BMW_circularity_video")

# print the dataset in a pretty format
print(json.dumps(video_dataset, indent=2))

Operation URL: https://demo-aisvcs-eus.services.ai.azure.com/contentunderstanding/analyzerResults/ced64248-aebe-4138-b290-23223a24bf34?api-version=2025-11-01
Current Analysis Status: Running
Current Analysis Status: Succeeded
Video Analysis Complete!
[
  {
    "document_title": "BMW_circularity_video \u2022 Full Video",
    "content_text": "{\"about\": \"This is a JSON string representing the overall video summary.\", \"video_url\": \"https://multimodalragsa2.blob.core.windows.net/ragdata/BMW_circularity.mp4\", \"note\": \"No segment details available.\"}"
  }
]


### Creating Functions for Performing Audio Analysis using Prebuilt Audio Analyzer

In [7]:
# Create a function to analyze audio using prebuilt audio analyzer - makes API calls to Content Understanding service
# GA API uses "prebuilt-audio" (not "prebuilt-audioAnalyzer")
def analyze_audio(audio_url):
    prebuilt_audio_analyzer_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzers/prebuilt-audio:analyze?api-version={API_VERSION}"

    # GA API uses "inputs" array format
    body = {
        "inputs": [{"url": audio_url}]
    }

    audio_analysis_result = {}

    try:
        headers = get_headers()

        response = requests.post(prebuilt_audio_analyzer_url, headers=headers, json=body)
        response.raise_for_status()
        
        # GA API returns operation-location in header
        operation_location = response.headers.get("Operation-Location")
        if operation_location:
            get_result_url = operation_location
        else:
            result = response.json()
            analysis_id = result.get("id")
            print("Analysis ID:", analysis_id)
            get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version={API_VERSION}"
        
        print(f"Operation URL: {get_result_url}")

        # Poll for results
        analysis_status = "Running"
        while analysis_status in ["Running", "NotStarted"]:
            import time
            time.sleep(10)  # Wait before polling
            headers_poll = get_headers()
            if "Content-Type" in headers_poll:
                del headers_poll["Content-Type"]
            status_response = requests.get(get_result_url, headers=headers_poll)
            status_response.raise_for_status()
            status_result = status_response.json()
            analysis_status = status_result.get("status")
            print("Current Analysis Status:", analysis_status)
            
        audio_analysis_result = status_result
        print("Audio Analysis Complete!")
        return audio_analysis_result

    except requests.RequestException as e:
        print(f"Error occurred: {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"Response: {e.response.text}")

# Creating a Function to extract relevant information from the audio analysis result and prepare it for RAG
def build_simple_audio_dataset(analysis_result, audio_url, title, include_full_transcript_row=True):
    """
    Create simple dataset rows with document_title + content_text from audio analysis.
    """
    result = (analysis_result or {}).get("result", {}) or {}
    contents = result.get("contents") or []

    rows = []
    all_phrases = []

    for idx, item in enumerate(contents, start=1):
        kind = item.get("kind")
        st, et = item.get("startTimeMs"), item.get("endTimeMs")
        md = item.get("markdown") or ""
        phrases = item.get("transcriptPhrases") or []
        all_phrases.extend(phrases)

        # Optional summary field (if present)
        fields = item.get("fields") or {}
        summary = ((fields.get("Summary") or {}).get("valueString")) if fields else None

        # Short transcript excerpt
        transcript_excerpt = ""
        if phrases:
            transcript_excerpt = " ".join((p.get("text") or "").strip() for p in phrases[:3]).strip()

        content_text = json.dumps({
            "about": "This is a JSON string representing a slice of audio analysis for RAG.",
            "audio_url": audio_url,
            "content_index": idx,
            "kind": kind,
            "time_window": {"start_ms": st, "end_ms": et},
            "analyzer_markdown_excerpt": md[:400],
            "transcript_excerpt": transcript_excerpt,
            **({"summary": summary} if summary else {})
        }, ensure_ascii=False)

        rows.append({
            "document_title": f"{title} ‚Ä¢ Segment {idx}",
            "content_text": content_text
        })

    # Optionally add a full transcript row for global search
    if include_full_transcript_row and all_phrases:
        full_transcript_text = " ".join((p.get("text") or "").strip() for p in all_phrases).strip()
        full_obj = {
            "about": "This is a JSON string representing the full audio transcript.",
            "audio_url": audio_url,
            "transcript_text": full_transcript_text,
            "transcript": [
                {
                    "speaker": p.get("speaker"),
                    "start_ms": p.get("startTimeMs"),
                    "end_ms": p.get("endTimeMs"),
                    "text": p.get("text"),
                    "confidence": p.get("confidence")
                } for p in all_phrases
            ]
        }
        rows.append({
            "document_title": f"{title} ‚Ä¢ Full Transcript",
            "content_text": json.dumps(full_obj, ensure_ascii=False)
        })

    # Fallback if nothing produced
    if not rows:
        content_text = json.dumps({
            "about": "This is a JSON string representing a generic audio analysis record.",
            "audio_url": audio_url,
            "note": "No content segments found in analysis."
        }, ensure_ascii=False)
        rows.append({
            "document_title": f"{title} ‚Ä¢ Full Audio",
            "content_text": content_text
        })

    return rows

In [8]:
# Constructing the Audio URL
audio_url = f"{storage_account_endpoint}/{storage_container_name}/BMW_forwardism.mp3"

# Analyzing the audio and building the dataset
audio_analysis_result = analyze_audio(audio_url)
audio_dataset = build_simple_audio_dataset(audio_analysis_result, audio_url, title="BMW_forwardism_audio")

# print the dataset in a pretty format
print(json.dumps(audio_dataset, indent=2))

Operation URL: https://demo-aisvcs-eus.services.ai.azure.com/contentunderstanding/analyzerResults/9e6466a4-ab7c-4298-9ff2-b8423c2c0dec?api-version=2025-11-01
Current Analysis Status: Running
Current Analysis Status: Running
Current Analysis Status: Running
Current Analysis Status: Succeeded
Audio Analysis Complete!
[
  {
    "document_title": "BMW_forwardism_audio \u2022 Segment 1",
    "content_text": "{\"about\": \"This is a JSON string representing a slice of audio analysis for RAG.\", \"audio_url\": \"https://multimodalragsa2.blob.core.windows.net/ragdata/BMW_forwardism.mp3\", \"content_index\": 1, \"kind\": \"audioVisual\", \"time_window\": {\"start_ms\": 0, \"end_ms\": 405513}, \"analyzer_markdown_excerpt\": \"# Audio: 00:00.000 => 06:45.513\\n\\nTranscript\\n```\\nWEBVTT\\n\\n00:02.480 --> 00:03.040\\n<v Speaker 1>Wow.\\n\\n00:04.000 --> 00:05.680\\n<v Speaker 1>Time flies when you're having fun.\\n\\n00:06.560 --> 00:16.960\\n<v Speaker 1>It was such an honor to be able to talk

### Creating Functions for Performing Image Analysis using Prebuilt Image Analyzer

In [9]:
# Create a function to analyze image using prebuilt image analyzer - makes API calls to Content Understanding service
# GA API uses "prebuilt-image" (not "prebuilt-imageAnalyzer")
def analyze_image(image_url):
    prebuilt_image_analyzer_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzers/prebuilt-image:analyze?api-version={API_VERSION}"

    body = {
        "inputs": [{"url": image_url}]
    }

    image_analysis_result = {}

    try:
        headers = get_headers()

        response = requests.post(prebuilt_image_analyzer_url, headers=headers, json=body)
        response.raise_for_status()
        
        # For GA API, check if we get operation-location header for async processing
        if response.status_code == 202:
            # Async operation - get the result URL from header
            operation_location = response.headers.get("operation-location")
            if operation_location:
                get_result_url = operation_location
            else:
                result = response.json()
                analysis_id = result.get("id")
                print("Analysis ID:", analysis_id)
                get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version={API_VERSION}"
        else:
            result = response.json()
            analysis_id = result.get("id")
            print("Analysis ID:", analysis_id)
            get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version={API_VERSION}"

        # Using the result URL to get results; polling until the analysis is complete
        poll_headers = get_headers()
        analysis_status = "Running"
        while analysis_status == "Running":
            status_response = requests.get(get_result_url, headers=poll_headers)
            status_response.raise_for_status()
            status_result = status_response.json()
            analysis_status = status_result.get("status")
            print("Current Analysis Status:", analysis_status)
            if analysis_status == "Running":
                import time
                time.sleep(1)  # Wait before polling again
        result_response = requests.get(get_result_url, headers=poll_headers)
        result_response.raise_for_status()
        image_analysis_result = result_response.json()
        print("Image Analysis Result:", image_analysis_result)
        return image_analysis_result

    except requests.RequestException as e:
        print(f"Error occurred: {e}")

# Creating a Function to extract relevant information from the image analysis result and prepare it for RAG
def build_simple_image_dataset(analysis_result, image_url, title):
    """
    Create simple dataset rows with document_title + content_text from image analysis.
    Pulls 'Summary' from fields if present.
    """
    result = (analysis_result or {}).get("result", {}) or {}
    contents = result.get("contents") or []

    rows = []

    for idx, item in enumerate(contents, start=1):
        kind = item.get("kind")
        md = item.get("markdown") or ""
        fields = item.get("fields") or {}

        # Extract a clean summary string if present
        summary_obj = fields.get("Summary") or {}
        summary_text = summary_obj.get("valueString")

        payload = {
            "about": "This is a JSON string representing a slice of image analysis for RAG.",
            "image_url": image_url,
            "content_index": idx,
            "kind": kind,
            "analyzer_markdown_excerpt": md[:400],
        }
        if summary_text:
            payload["summary"] = summary_text
        else:
            # If you prefer preserving all fields, keep them‚Äîbut they're verbose
            payload["fields_raw"] = fields

        rows.append({
            "document_title": f"{title} ‚Ä¢ Content {idx}",
            "content_text": json.dumps(payload, ensure_ascii=False)
        })

    if not rows:
        rows.append({
            "document_title": f"{title} ‚Ä¢ Full Image",
            "content_text": json.dumps({
                "about": "This is a JSON string representing a generic image analysis record.",
                "image_url": image_url,
                "note": "No content found in analysis."
            }, ensure_ascii=False)
        })

    return rows

In [10]:
# Constructing the Image URL
image_url = f"{storage_account_endpoint}/{storage_container_name}/image.png"

# Analyzing the image and building the dataset
image_analysis_result = analyze_image(image_url)
image_dataset = build_simple_image_dataset(image_analysis_result, image_url, title="Sample Image")

# print the dataset in a pretty format
print(json.dumps(image_dataset, indent=2))

Error occurred: 400 Client Error: Bad Request for url: https://demo-aisvcs-eus.services.ai.azure.com/contentunderstanding/analyzers/prebuilt-image:analyze?api-version=2025-11-01
[
  {
    "document_title": "Sample Image \u2022 Full Image",
    "content_text": "{\"about\": \"This is a JSON string representing a generic image analysis record.\", \"image_url\": \"https://multimodalragsa2.blob.core.windows.net/ragdata/image.png\", \"note\": \"No content found in analysis.\"}"
  }
]


### Extracting Textual Data from PDF Document and Preparing it for RAG

In [11]:
import io
import json
import re
import requests
from pdfminer.high_level import extract_text_to_fp
from pdfminer.layout import LAParams
from azure.storage.blob import BlobServiceClient
from urllib.parse import urlparse


def _is_url(s: str) -> bool:
    return s.lower().startswith(("http://", "https://"))

def _download_bytes_with_entra(url: str, credential) -> bytes:
    """Download blob bytes using Entra ID authentication."""
    # Parse the blob URL to extract account, container, and blob name
    parsed = urlparse(url)
    account_url = f"{parsed.scheme}://{parsed.netloc}"
    path_parts = parsed.path.lstrip('/').split('/', 1)
    container_name = path_parts[0]
    blob_name = path_parts[1] if len(path_parts) > 1 else ""
    
    # Use BlobServiceClient with Entra ID credential
    blob_service_client = BlobServiceClient(account_url=account_url, credential=credential)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    
    return blob_client.download_blob().readall()

def _clean_text(text: str) -> str:
    text = text.replace("\r", "")
    text = re.sub(r"[ \t]+\n", "\n", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()


def extract_pdf_text(pdf_source: str, credential=None):
    """
    Extracts plain text from each page of a PDF.
    pdf_source can be a local file path or a URL.
    credential: Azure credential for accessing private blob storage (optional)
    Returns: list of {"pageNumber": int, "text": str}
    """
    if _is_url(pdf_source):
        if credential:
            data = _download_bytes_with_entra(pdf_source, credential)
        else:
            # Fallback to anonymous download for public blobs
            r = requests.get(pdf_source, timeout=30)
            r.raise_for_status()
            data = r.content
        fp = io.BytesIO(data)
    else:
        fp = open(pdf_source, "rb")

    pages = []
    try:
        output = io.StringIO()
        laparams = LAParams()
        extract_text_to_fp(fp, output, laparams=laparams, output_type="text")
        full_text = output.getvalue()
        raw_pages = full_text.split("\x0c")  # pdfminer page delimiter
        for i, txt in enumerate(raw_pages, start=1):
            cleaned = _clean_text(txt)
            if i == len(raw_pages) and not cleaned:
                continue
            pages.append({"pageNumber": i, "text": cleaned})
    finally:
        fp.close()
    return pages


def build_simple_pdf_dataset(pages, pdf_source: str, title: str):
    """
    Convert PDF pages into dataset rows like your audio/video/image functions.
    """
    rows = []
    for page in pages:
        pno = page["pageNumber"]
        text = page["text"]

        payload = {
            "about": "This is a JSON string representing a slice of PDF for RAG.",
            "pdf_source": pdf_source,
            "page_number": pno,
            "page_text": text
        }

        rows.append({
            "document_title": f"{title} ‚Ä¢ Page {pno}",
            "content_text": json.dumps(payload, ensure_ascii=False)
        })


    return rows

In [12]:
# Constructing the PDF URL
pdf_url = f"{storage_account_endpoint}/{storage_container_name}/BMW_sustainable_natural_rubber.pdf"

# Extracting text from the PDF and building the dataset
# Pass credential for Entra ID authenticated blob access
extracted_text = extract_pdf_text(pdf_url, credential=credential)
pdf_dataset = build_simple_pdf_dataset(extracted_text, pdf_url, title="BMW Sustainable Natural Rubber PDF")

print(json.dumps(pdf_dataset, indent=2))

[
  {
    "document_title": "BMW Sustainable Natural Rubber PDF \u2022 Page 1",
    "content_text": "{\"about\": \"This is a JSON string representing a slice of PDF for RAG.\", \"pdf_source\": \"https://multimodalragsa2.blob.core.windows.net/ragdata/BMW_sustainable_natural_rubber.pdf\", \"page_number\": 1, \"page_text\": \"High-Level Commitment of the BMW Group\\nfor Sustainable Natural Rubber\\n\\nProtection of forests and other natural ecosystems is critical for maintaining biodiversity, combating climate\\nchange, and sustaining livelihoods. As part of our overall sustainability goals, the BMW Group is committed to\\neliminating deforestation and ecosystem conversion from our supply chains and to safeguarding human rights\\nacross all our operations and suppliers. Given that natural rubber is a known driver of deforestation, this\\ndocument outlines our commitment to sourcing sustainable natural rubber and is aligned with the Policy\\nFramework that was adopted in a September 2020 r

### Combining Datasets for Multi-Modal RAG

In [13]:
final_dataset = video_dataset + audio_dataset + image_dataset + pdf_dataset
print(f"Total dataset rows: {len(final_dataset)}")

print(json.dumps(final_dataset, indent=2))  # print first 2 rows as a sample

Total dataset rows: 5
[
  {
    "document_title": "BMW_circularity_video \u2022 Full Video",
    "content_text": "{\"about\": \"This is a JSON string representing the overall video summary.\", \"video_url\": \"https://multimodalragsa2.blob.core.windows.net/ragdata/BMW_circularity.mp4\", \"note\": \"No segment details available.\"}"
  },
  {
    "document_title": "BMW_forwardism_audio \u2022 Segment 1",
    "content_text": "{\"about\": \"This is a JSON string representing a slice of audio analysis for RAG.\", \"audio_url\": \"https://multimodalragsa2.blob.core.windows.net/ragdata/BMW_forwardism.mp3\", \"content_index\": 1, \"kind\": \"audioVisual\", \"time_window\": {\"start_ms\": 0, \"end_ms\": 405513}, \"analyzer_markdown_excerpt\": \"# Audio: 00:00.000 => 06:45.513\\n\\nTranscript\\n```\\nWEBVTT\\n\\n00:02.480 --> 00:03.040\\n<v Speaker 1>Wow.\\n\\n00:04.000 --> 00:05.680\\n<v Speaker 1>Time flies when you're having fun.\\n\\n00:06.560 --> 00:16.960\\n<v Speaker 1>It was such an hono

### Creating an Azure OpenAI Client

In [14]:
from openai import AzureOpenAI

# Use Entra ID authentication if AUTH_MODE is set to "entra"
if AUTH_MODE == "entra":
    # Get token provider for Azure OpenAI
    from azure.identity import get_bearer_token_provider
    
    token_provider = get_bearer_token_provider(
        credential, 
        "https://cognitiveservices.azure.com/.default"
    )
    
    openai_client = AzureOpenAI(
        azure_ad_token_provider=token_provider,
        api_version="2024-02-15-preview",
        azure_endpoint=os.getenv("AZURE_AI_ENDPOINT") or os.getenv("CONTENT_UNDERSTANDING_ENDPOINT")
    )
else:
    openai_client = AzureOpenAI(
        api_key=os.getenv("AZURE_AI_API_KEY"),
        api_version="2024-02-15-preview",
        azure_endpoint=os.getenv("AZURE_AI_ENDPOINT")
    )

### Creating a Function for Generating Embeddings using Azure OpenAI

In [15]:
from openai import AzureOpenAI

def generate_embeddings(text, embedding_model_name):
    response = openai_client.embeddings.create(
        input=text,
        model=embedding_model_name
    )

    embeddings = response.model_dump()

    return embeddings["data"][0]["embedding"]

### Finalising RAG Dataset with Embeddings

In [16]:
import uuid
for data in final_dataset:
    embedding = generate_embeddings(data["content_text"], os.getenv("EMBEDDING_MODEL_NAME"))
    data["content_embedding"] = embedding
    data["content_id"] = uuid.uuid4().hex

# printing the final dataset with embeddings in a file 
with open("data.json", "w") as f:
    f.write(json.dumps(final_dataset, indent=2))

### Creating our Azure AI Search Index in Azure Search Service

In [22]:
search_service_endpoint = os.getenv("SEARCH_SERVICE_ENDPOINT").strip().rstrip('/')
search_service_api_key = os.getenv("SEARCH_SERVICE_API_KEY")

index_creation_url = search_service_endpoint + "/indexes?api-version=2025-08-01-preview"

# Set up headers based on auth mode
if AUTH_MODE == "entra":
    # Get token for Azure Search
    search_token = credential.get_token("https://search.azure.com/.default").token
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {search_token}"
    }
else:
    headers = {
        "Content-Type": "application/json",
        "api-key": search_service_api_key
    }

# reading the index schema from index.json file
with open("index.json", "r") as f:
    index_schema = json.load(f)

body = index_schema 

response = requests.post(index_creation_url, headers=headers, json=body)

print(f"Status Code: {response.status_code}")
print(f"Response Text: {response.text[:500] if response.text else '(empty)'}")

if response.status_code == 201:
    print("‚úÖ Index created successfully.")
elif response.status_code == 200:
    print("‚úÖ Index already exists or was updated.")
elif response.status_code == 204:
    print("‚úÖ Success (no content returned).")
else:
    print(f"‚ùå Error creating index: {response.status_code}")
    if response.text:
        try:
            print("Response body:", response.json())
        except:
            print("Response body (raw):", response.text)

Status Code: 400
Response Text: {"error":{"code":"ResourceNameAlreadyInUse","message":"Cannot create index 'multi-modal-rag-index' because it already exists.","details":[{"code":"CannotCreateExistingIndex","message":"Cannot create index 'multi-modal-rag-index' because it already exists."}]}}
‚ùå Error creating index: 400
Response body: {'error': {'code': 'ResourceNameAlreadyInUse', 'message': "Cannot create index 'multi-modal-rag-index' because it already exists.", 'details': [{'code': 'CannotCreateExistingIndex', 'message': "Cannot create index 'multi-modal-rag-index' because it already exists."}]}}


### Preparing the RAG Data for Bulk Upload to Azure AI Search Index

In [19]:
with open("data.json", "r") as f:
    dataset = json.load(f)

for object in dataset:
    object["@search.action"] = "upload" # append the action to each object

with open("data_with_actions.json", "w") as f:
    f.write(json.dumps(dataset, indent=2))

### Pushing RAG formatted data to Azure AI Search Index

In [None]:
with open("data_with_actions.json", "r") as f:
    rag_dataset = json.load(f)


bulk_upload_url = search_service_endpoint + "/indexes/multi-modal-rag-index/docs/index?api-version=2025-08-01-preview"

# Set up headers based on auth mode
if AUTH_MODE == "entra":
    # Get fresh token for Azure Search
    search_token = credential.get_token("https://search.azure.com/.default").token
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {search_token}"
    }
else:
    headers = {
        "Content-Type": "application/json",
        "api-key": search_service_api_key
    }

body = {
    "value": [*rag_dataset]
}

response = requests.post(bulk_upload_url, headers=headers, json=body)

print(f"Status Code: {response.status_code}")
print(f"Response Text: {response.text[:1000] if response.text else '(empty)'}")

if response.status_code == 200:
    print("‚úÖ Bulk upload successful.")
elif response.status_code == 207:
    print("‚ö†Ô∏è Partial success - some documents may have failed.")
else:
    print(f"‚ùå Error during bulk upload: {response.status_code}")

Error during bulk upload: 403 


JSONDecodeError: Expecting value: line 1 column 1 (char 0)