In [2]:
import requests
import json
import pandas as pd
import time
import os
import ast # Needed for safely evaluating string representations of lists/dicts
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    VectorSearch,
    VectorSearchProfile,
    HnswAlgorithmConfiguration,
    # Optional: Semantic Search Imports
    # SemanticSearch,
    # SemanticConfiguration,
    # SemanticPrioritizedFields,
    # SemanticField,
)
from math import ceil # For batching
from dotenv import load_dotenv

# --- Load Environment Variables ---
# Ensure you have a .env file in the parent directory (../.env)
# or adjust the path in load_dotenv()
load_dotenv("../.env")

# --- Configuration ---

# Data Source Config
FILE_PATH = os.getenv("SOURCE_FILE_PATH", "movies_metadata.csv") # Default filename if not set
if not FILE_PATH:
    print("Error: SOURCE_FILE_PATH environment variable not set.")
    exit()
# Define how many records to process from the start of the file
RECORDS_TO_PROCESS = None # <<< Adjust as needed. Start small for testing. >>>
# Define columns to read from CSV and use for combined text + ID
# Make sure 'id' column exists and is suitable as a unique key for Azure Search
COLUMNS_TO_READ = ['id', 'title', 'overview', 'genres', 'tagline']
# Explicitly list columns used to generate the combined text for embeddings
COLUMNS_TO_EMBED = ['title', 'overview', 'genres', 'tagline']

# Ollama Config
OLLAMA_ENDPOINT = os.getenv("OLLAMA_ENDPOINT", "http://localhost:11434/api/embeddings")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "bge-m3") # Ensure you have pulled this model: ollama pull bge-m3
VECTOR_DIMENSION = 1024 # <--- IMPORTANT: Dimension of bge-m3 embeddings

# Azure AI Search Config
SEARCH_SERVICE_ENDPOINT = os.environ.get("AZURE_SEARCH_SERVICE_ENDPOINT")
SEARCH_API_KEY = os.environ.get("AZURE_SEARCH_API_KEY")
SEARCH_INDEX_NAME = os.environ.get("AZURE_SEARCH_INDEX_NAME", "movies-ollama-bge-m3-index") # Default index name

# Output/Processing Config
OUTPUT_FILE = f"movie_embeddings_{OLLAMA_MODEL}_{RECORDS_TO_PROCESS}records.jsonl" # For backup
UPLOAD_BATCH_SIZE = 100 # Upload documents in batches

# --- Ollama API Call Function ---
def get_ollama_embedding(text: str, model: str = OLLAMA_MODEL):
    """Calls the local Ollama API to get embeddings for the given text."""
    if pd.isna(text):
        text = "" # Treat NaN as empty string
    if not isinstance(text, str):
        text = str(text) # Attempt to convert non-strings

    text = text.strip()
    if not text:
        # print("Warning: Text is empty or whitespace. Skipping embedding.") # Less verbose
        return None

    # Optional: Add text length check/truncation if Ollama has limits
    # MAX_TEXT_LENGTH = 8000
    # if len(text) > MAX_TEXT_LENGTH:
    #     print(f"Warning: Truncating text longer than {MAX_TEXT_LENGTH} chars.")
    #     text = text[:MAX_TEXT_LENGTH]

    try:
        response = requests.post(
            OLLAMA_ENDPOINT,
            json={"model": model, "prompt": text},
            timeout=180 # Generous timeout for potentially long text or slow model
        )
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
        response_json = response.json()

        if "embedding" in response_json:
            embedding = response_json["embedding"]
            if isinstance(embedding, list) and len(embedding) == VECTOR_DIMENSION:
                return embedding
            else:
                print(f"Error: Invalid embedding format/dimension (Expected: {VECTOR_DIMENSION}, Got: {len(embedding) if isinstance(embedding, list) else type(embedding)}). Text: '{text[:100]}...'")
                return None
        else:
            print(f"Error: 'embedding' key missing in response for text '{text[:100]}...'. Response: {response_json}")
            return None
    except requests.exceptions.Timeout:
        print(f"Error: Ollama API request timed out for text: '{text[:100]}...'")
    except requests.exceptions.ConnectionError:
        print(f"Error: Could not connect to Ollama API at {OLLAMA_ENDPOINT}. Is Ollama running?")
    except requests.exceptions.RequestException as e:
        print(f"Error calling Ollama API for text '{text[:100]}...': {e}")
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON response from Ollama API. Response Text: {response.text}")
    return None

# --- Azure Search: Create or Update Index Function ---
def create_or_update_index(endpoint: str, api_key: str, index_name: str, vector_dimension: int):
    """
    Creates a new Azure AI Search index or updates an existing one with the required schema.
    Returns True if successful, False otherwise.
    """
    print(f"Attempting to create or update search index '{index_name}'...")
    try:
        credential = AzureKeyCredential(api_key)
        index_client = SearchIndexClient(endpoint=endpoint, credential=credential)

        fields = [
            # Key field - must be string, unique, and filterable/sortable is good practice
            SimpleField(name="movie_id", type=SearchFieldDataType.String, key=True, sortable=True, filterable=True, facetable=False),
            # Searchable text fields
            SearchableField(name="title", type=SearchFieldDataType.String, sortable=True, filterable=True),
            SearchableField(name="overview", type=SearchFieldDataType.String),
            SearchableField(name="tagline", type=SearchFieldDataType.String),
            # Store genres as a single searchable/filterable/facetable string (e.g., "Action, Adventure, Science Fiction")
            SearchableField(name="genres", type=SearchFieldDataType.String, filterable=True, facetable=True),
            # Vector field for embeddings
            SearchField(
                name="embedding",
                type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                searchable=True, # Indicates it can be used in vector queries
                vector_search_dimensions=vector_dimension,
                vector_search_profile_name="my-hnsw-profile", # Links to the vector search profile
            ),
        ]

        # Vector Search Configuration
        vector_search = VectorSearch(
            profiles=[
                VectorSearchProfile(
                    name="my-hnsw-profile", # Must match profile name used in the 'embedding' field
                    algorithm_configuration_name="my-hnsw-config", # Links to the algorithm config
                )
            ],
            algorithms=[
                HnswAlgorithmConfiguration(
                    name="my-hnsw-config", # Must match algorithm name used in the profile
                    # Parameters like "metric" ("cosine", "euclidean", "dotProduct"), "m", "efConstruction", "efSearch" can be set here
                    # Default metric is "cosine", which is often suitable for text embeddings
                    parameters={"metric": "cosine"}
                )
            ],
        )

        # --- (Optional) Semantic Search Configuration ---
        # Requires Standard tier or higher Search Service and enabling Semantic Search on the service
        # Uncomment if needed:
        # semantic_search = SemanticSearch(
        #     configurations=[
        #         SemanticConfiguration(
        #             name="my-semantic-config", # Choose a name
        #             prioritized_fields=SemanticPrioritizedFields(
        #                 title_field=SemanticField(field_name="title"),
        #                 keywords_fields=[], # Optional keyword fields
        #                 content_fields=[
        #                     SemanticField(field_name="overview"),
        #                     SemanticField(field_name="tagline"),
        #                     SemanticField(field_name="genres"),
        #                 ]
        #             )
        #         )
        #     ]
        # )

        # --- Create the Index Object ---
        index = SearchIndex(
            name=index_name,
            fields=fields,
            vector_search=vector_search,
            # semantic_search=semantic_search # Uncomment if using semantic search
        )

        # --- Create or Update the Index in Azure ---
        # This command handles both creation and updates based on the index name
        result = index_client.create_or_update_index(index)
        print(f"Index '{result.name}' created or updated successfully.")
        return True

    except Exception as e:
        print(f"Error creating/updating index '{index_name}': {e}")
        # Consider more specific error handling if needed (e.g., authentication errors)
        return False


# --- Main Processing Logic ---
def process_movies_and_embed(dataframe_subset: pd.DataFrame):
    """
    Iterates through the provided DataFrame subset, combines text, generates embeddings,
    and returns documents formatted for Azure AI Search upload.
    """
    documents_for_upload = []
    processed_count = 0
    success_count = 0
    fail_count = 0

    if dataframe_subset.empty:
        print("Warning: Input DataFrame subset is empty. No records to process.")
        return documents_for_upload, success_count, fail_count

    # Verify required columns are present in the subset (should be guaranteed by read_csv if no errors)
    required_cols = ['id'] + COLUMNS_TO_EMBED
    if not all(col in dataframe_subset.columns for col in required_cols):
         print(f"Error: DataFrame subset is missing one or more required columns: {required_cols}")
         return [], 0, len(dataframe_subset) # Fail all

    print(f"\nStarting embedding generation and document preparation for {len(dataframe_subset)} movies...")
    print(f"Using combined text from columns: {', '.join(COLUMNS_TO_EMBED)}")

    for row in dataframe_subset.itertuples(index=False):
        processed_count += 1
        movie_id_raw = getattr(row, "id", None)
        movie_title = getattr(row, "title", "") # Default to empty string

        # --- Validate and Format ID (Critical for Azure Search Key) ---
        if pd.isna(movie_id_raw):
            print(f"  - Warning: Skipping record ~'{movie_title}' due to missing ID.")
            fail_count += 1
            continue
        # Ensure ID is a non-empty string
        movie_id = str(movie_id_raw).strip()
        if not movie_id:
            print(f"  - Warning: Skipping record ~'{movie_title}' due to empty ID after cleaning.")
            fail_count += 1
            continue

        # Ensure title is a string for consistent handling
        if pd.isna(movie_title): movie_title = ""
        if not isinstance(movie_title, str): movie_title = str(movie_title)
        movie_title = movie_title.strip()

        print(f"\nProcessing record {processed_count}: ID='{movie_id}', Title='{movie_title}'")

        # --- Combine Text from Specified Columns ---
        text_parts = []
        genre_names = [] # Store parsed genre names separately for the 'genres' field

        # Title
        if "title" in COLUMNS_TO_EMBED and movie_title:
            text_parts.append(movie_title)

        # Overview
        if "overview" in COLUMNS_TO_EMBED:
            overview = getattr(row, "overview", "")
            if pd.notna(overview) and isinstance(overview, str) and overview.strip():
                text_parts.append(overview.strip())

        # Tagline
        if "tagline" in COLUMNS_TO_EMBED:
            tagline = getattr(row, "tagline", "")
            if pd.notna(tagline) and isinstance(tagline, str) and tagline.strip():
                text_parts.append(tagline.strip())

        # Genres (Requires Parsing from string representation of list/dict)
        if "genres" in COLUMNS_TO_EMBED:
            genres_str = getattr(row, "genres", "[]") # Default to empty list string
            if pd.notna(genres_str) and isinstance(genres_str, str) and genres_str.strip() not in ('[]', '{}', ''):
                try:
                    # Safely evaluate the string: handles list of dicts like [{'id': 1, 'name': 'Action'}]
                    genres_list = ast.literal_eval(genres_str)
                    if isinstance(genres_list, list):
                        for genre_item in genres_list:
                            # Check if item is dict and has 'name' key with non-empty value
                            if isinstance(genre_item, dict) and 'name' in genre_item and genre_item['name']:
                                genre_name = str(genre_item['name']).strip()
                                if genre_name:
                                    genre_names.append(genre_name)
                except (SyntaxError, ValueError, TypeError) as e:
                    # Log parsing errors but continue processing the record
                    print(f"  - Warning: Could not parse genres for ID {movie_id}. Error: {e}. Genres string: '{genres_str[:100]}...'")

            if genre_names:
                 # Add the collected genre names as a single string part for embedding
                 text_parts.append(" ".join(genre_names)) # e.g., "Action Adventure Science Fiction"

        # --- Create the final combined text for embedding ---
        combined_text = ". ".join(filter(None, text_parts)) # Join non-empty parts with ". "

        if not combined_text.strip():
            print(f"  - Warning: Skipping ID '{movie_id}' ('{movie_title}') due to empty combined text after processing.")
            fail_count += 1
            continue

        # --- Generate the embedding ---
        print(f"  - Generating embedding for combined text (length: {len(combined_text)} chars).")
        embedding_vector = get_ollama_embedding(combined_text)

        # --- Prepare document for Azure Search ---
        if embedding_vector:
            print(f"  - Successfully generated embedding (dimension: {len(embedding_vector)}).")

            # --- Safely retrieve, convert, and clean text fields ---
            # Title is already cleaned earlier in the loop
            overview_raw = getattr(row, "overview", "")
            overview_cleaned = str(overview_raw).strip() # Convert to string THEN strip

            tagline_raw = getattr(row, "tagline", "")
            tagline_cleaned = str(tagline_raw).strip() # Convert to string THEN strip

            doc = {
                # Map DataFrame fields to Azure Search index fields
                "movie_id": movie_id, # Key field (string)
                "title": movie_title if movie_title else None, # Already cleaned
                "overview": overview_cleaned if overview_cleaned else None, # Assign None if empty after cleaning
                "tagline": tagline_cleaned if tagline_cleaned else None,   # Assign None if empty after cleaning
                "genres": ", ".join(genre_names) if genre_names else None, # Store as comma-separated string or None
                "embedding": embedding_vector, # The generated vector
                "@search.action": "mergeOrUpload" # Action for upload batch: updates if key exists, inserts if not
            }
            documents_for_upload.append(doc)
            success_count += 1
        else:
            print(f"  - Failed to generate embedding for ID '{movie_id}' ('{movie_title}'). Skipping document upload.")
            fail_count += 1 

    print(f"\nFinished processing {processed_count} records. Successful embeddings: {success_count}, Failed/Skipped: {fail_count}")
    return documents_for_upload, success_count, fail_count

# --- Function to save raw results to JSON Lines file (Backup) ---
def save_processed_docs_to_jsonl(documents: list, filename: str):
    """Saves the list of processed documents (formatted for upload) to a JSON Lines file."""
    if not documents:
        print("\nNo documents with successful embeddings to save to JSONL.")
        return
    print(f"\nAttempting to save {len(documents)} processed documents to {filename}...")
    saved_count = 0
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            for record in documents:
                # Make a copy and remove the Azure Search action field before saving
                record_to_save = record.copy()
                record_to_save.pop("@search.action", None)

                # Optional: Add check for valid embedding before saving
                if not isinstance(record_to_save.get('embedding'), list):
                    print(f"Warning: Skipping record ID '{record_to_save.get('movie_id')}' in JSONL save due to invalid embedding.")
                    continue
                try:
                    json_string = json.dumps(record_to_save, ensure_ascii=False) # ensure_ascii=False for non-latin chars
                    f.write(json_string + '\n')
                    saved_count += 1
                except TypeError as e:
                    print(f"Error: Could not serialize record ID '{record_to_save.get('movie_id')}' to JSON. Error: {e}. Record: {record_to_save}")
        print(f"Successfully saved {saved_count} documents to {filename}")
    except IOError as e:
        print(f"Error: Could not write to file {filename}. Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred during file saving: {e}")

#--- Azure Search: Upload Documents Function ---
def upload_documents_to_index(endpoint: str, api_key: str, index_name: str, documents: list, batch_size: int = UPLOAD_BATCH_SIZE):
    """Uploads documents to the specified Azure AI Search index in batches."""
    if not documents:
        print("\nNo documents to upload.")
        return 0 # Return 0 uploaded

    print(f"\nUploading {len(documents)} documents to index '{index_name}' in batches of {batch_size}...")
    credential = AzureKeyCredential(api_key)
    search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential)

    num_batches = ceil(len(documents) / batch_size)
    total_uploaded_successfully = 0
    total_failed = 0

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = start_idx + batch_size
        batch = documents[start_idx:end_idx]
        print(f"  Uploading batch {i+1}/{num_batches} ({len(batch)} documents)...")
        try:
            # upload_documents automatically handles batching internally, but we do it manually
            # for better progress reporting and potential error isolation per batch.
            # The `results` object details success/failure per document in the batch.
            results = search_client.upload_documents(documents=batch)

            batch_success_count = 0
            batch_fail_count = 0
            for result in results:
                if result.succeeded:
                    batch_success_count += 1
                else:
                    batch_fail_count += 1
                    # Log detailed error per document failure
                    print(f"    Failed to upload document ID '{result.key}': {result.error_message} (Status Code: {result.status_code})")

            total_uploaded_successfully += batch_success_count
            total_failed += batch_fail_count
            print(f"  Batch {i+1} result: {batch_success_count} succeeded, {batch_fail_count} failed.")

        except Exception as e:
            # Catch broader exceptions during the batch upload call itself
            print(f"  Error uploading batch {i+1}: {e}")
            # Assume all documents in this batch failed if the upload call itself threw an exception
            total_failed += len(batch)

    print(f"\nUpload finished. Total documents uploaded successfully: {total_uploaded_successfully}, Total failed: {total_failed}")
    return total_uploaded_successfully


# --- Main Execution Block ---
if __name__ == "__main__":
    print(f"Script started at {time.strftime('%Y-%m-%d %H:%M:%S')}")
    script_start_time = time.time()

    # --- Validate Azure Search Configuration ---
    if not SEARCH_SERVICE_ENDPOINT or "<" in SEARCH_SERVICE_ENDPOINT:
        print("Error: Azure Search Service Endpoint is not configured correctly. Set AZURE_SEARCH_SERVICE_ENDPOINT.")
        exit(1)
    if not SEARCH_API_KEY or "<" in SEARCH_API_KEY:
        print("Error: Azure Search Admin API Key is not configured correctly. Set AZURE_SEARCH_API_KEY.")
        exit(1)
    if not SEARCH_INDEX_NAME:
        print("Error: Search Index Name is not configured. Set SEARCH_INDEX_NAME.")
        exit(1)

    # --- 1. Create or Update Azure Search Index ---
    # This step ensures the index exists with the correct schema before processing data.
    # The Azure SDK handles the create/update logic.
    index_ready = create_or_update_index(
        endpoint=SEARCH_SERVICE_ENDPOINT,
        api_key=SEARCH_API_KEY,
        index_name=SEARCH_INDEX_NAME,
        vector_dimension=VECTOR_DIMENSION
    )

    if not index_ready:
        print("Exiting script because index creation/update failed.")
        exit(1) # Exit if index setup fails

    # --- 2. Read Data Subset ---
    df_processed = pd.DataFrame() # Initialize empty DataFrame
    try:
        print(f"\nReading columns {COLUMNS_TO_READ} for the first {RECORDS_TO_PROCESS} rows from '{FILE_PATH}'...")
        # Read only necessary columns and rows, crucial for large files.
        # Ensure 'id' is treated as a string, as it's the Azure Search key.
        df_processed = pd.read_csv(
            FILE_PATH,
            usecols=COLUMNS_TO_READ,
            nrows=RECORDS_TO_PROCESS,
            dtype={'id': str}, # Read 'id' column specifically as string
            low_memory=False # May help with mixed types, less critical with nrows/usecols
        )
        print(f"Successfully read {len(df_processed)} rows.")
        if len(df_processed) < RECORDS_TO_PROCESS:
             print(f"Info: Requested {RECORDS_TO_PROCESS} records, but the file (or section read) only contains {len(df_processed)}.")

    except FileNotFoundError:
        print(f"Error: File not found at '{FILE_PATH}'. Please check the path and filename.")
        exit(1)
    except ValueError as e:
        # Catch errors related to missing columns specified in 'usecols'
        print(f"Error reading CSV. Does the file '{FILE_PATH}' contain all required columns: {COLUMNS_TO_READ}? Error details: {e}")
        exit(1)
    except Exception as e:
        print(f"An unexpected error occurred reading the CSV file: {e}")
        exit(1)

    # --- 3. Process Movies and Generate Embeddings ---
    # Pass the already loaded subset DataFrame to the processing function
    documents_to_upload, successes, failures = process_movies_and_embed(df_processed)

    # --- 4. Save Processed Docs to JSONL (Backup) ---
    # Only save if there are documents that were successfully processed
    if documents_to_upload:
        save_processed_docs_to_jsonl(documents_to_upload, OUTPUT_FILE)
    else:
        print("\nNo documents with successful embeddings were generated, skipping JSONL backup.")

    # --- 5. Upload to Azure Search ---
    # Only upload if there are documents to upload
    if documents_to_upload:
        upload_documents_to_index(
            endpoint=SEARCH_SERVICE_ENDPOINT,
            api_key=SEARCH_API_KEY,
            index_name=SEARCH_INDEX_NAME,
            documents=documents_to_upload,
            batch_size=UPLOAD_BATCH_SIZE
        )
    else:
        print("\nSkipping upload to Azure Search as no documents were successfully processed.")

    # --- Finish ---
    script_end_time = time.time()
    print(f"\nScript finished at {time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total execution time: {script_end_time - script_start_time:.2f} seconds")

Script started at 2025-04-09 20:13:06
Attempting to create or update search index 'contosomoviesnomicembedtextidx'...
Error creating/updating index 'contosomoviesnomicembedtextidx': (OperationNotAllowed) Existing field 'embedding' cannot be changed.
Code: OperationNotAllowed
Message: Existing field 'embedding' cannot be changed.
Exception Details:	(CannotChangeExistingField) Existing field 'embedding' cannot be changed.
	Code: CannotChangeExistingField
	Message: Existing field 'embedding' cannot be changed.
Exiting script because index creation/update failed.

Reading columns ['id', 'title', 'overview', 'genres', 'tagline'] for the first None rows from '../data/raw/kaggle_movie_dataset/movies_metadata.csv'...
Successfully read 45466 rows.
An unexpected error occurred reading the CSV file: '<' not supported between instances of 'int' and 'NoneType'

Starting embedding generation and document preparation for 45466 movies...
Using combined text from columns: title, overview, genres, tagli

KeyboardInterrupt: 

In [None]:
import requests
import json
import pandas as pd
import time
import os
import ast # Needed for safely evaluating string representations of lists/dicts
# Removed ThreadPoolExecutor imports as we'll use request batching
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    VectorSearch,
    VectorSearchProfile,
    HnswAlgorithmConfiguration,
)
from math import ceil
from dotenv import load_dotenv

# --- Load Environment Variables ---
# Assuming .env is in the parent directory
dotenv_path = "../.env"
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path)
else:
    print("Warning: .env file not found in parent directory.")
    # Attempt loading from current directory if needed
    load_dotenv()


# --- Configuration ---

# Data Source Config
FILE_PATH = os.getenv("SOURCE_FILE_PATH", "movies_metadata.csv")
if not FILE_PATH:
    print("Error: SOURCE_FILE_PATH environment variable not set.")
    exit(1)
# For 45k rows, let's set this appropriately or remove the limit if you want all
RECORDS_TO_PROCESS = 46000 # Adjust as needed, or remove 'nrows' in pd.read_csv to process all
COLUMNS_TO_READ = ['id', 'title', 'overview', 'genres', 'tagline']
COLUMNS_TO_EMBED = ['title', 'overview', 'genres', 'tagline']

# Ollama Config
OLLAMA_ENDPOINT = os.getenv("OLLAMA_ENDPOINT", "http://localhost:11434/api/embed")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "nomic-embed-text") # Ensure pulled: ollama pull nomic-embed-text
VECTOR_DIMENSION = 768 # Correct for nomic-embed-text
OLLAMA_BATCH_SIZE = int(os.getenv("OLLAMA_BATCH_SIZE", 32))

# Azure AI Search Config
SEARCH_SERVICE_ENDPOINT = os.environ.get("AZURE_SEARCH_SERVICE_ENDPOINT")
SEARCH_API_KEY = os.environ.get("AZURE_SEARCH_API_KEY")
SEARCH_INDEX_NAME = os.environ.get("AZURE_SEARCH_INDEX_NAME", f"movies-ollama-{OLLAMA_MODEL.replace('/','-')}-index")

# Processing & Output Config
# Ensure OUTPUT_PATH exists or handle its creation
output_dir = os.getenv("OUTPUT_PATH", ".") # Default to current directory if not set
os.makedirs(output_dir, exist_ok=True) # Create output directory if it doesn't exist
# Construct full output file path
OUTPUT_FILE = os.path.join(output_dir, f"movie_embeddings_{OLLAMA_MODEL.replace('/','-')}_{RECORDS_TO_PROCESS}records.jsonl")
UPLOAD_BATCH_SIZE = 100 # Batch size for uploading docs TO AZURE SEARCH

# --- Ollama API Call Function for Batch Embeddings ---
# (Keep the function as it is in your original code)
def get_ollama_embeddings_batch(texts: list[str], model: str):
    """Calls the local Ollama API (/api/embed) to get embeddings for a batch of texts."""
    global VECTOR_DIMENSION # Access global dimension for validation

    if not texts: # Handle empty input list
        return []

    try:
        # Adjust endpoint if needed based on your Ollama setup
        response = requests.post(
            OLLAMA_ENDPOINT,
            json={"model": model, "prompt": "", "input": texts}, # Key is "input" for batch
            timeout=300 # Increased timeout
        )
        response.raise_for_status()
        response_json = response.json()

        # Validate response structure from /api/embed
        if "embeddings" in response_json and isinstance(response_json["embeddings"], list):
            embeddings_list = response_json["embeddings"]
            if len(embeddings_list) == len(texts):
                 # Minimal validation for speed - check first embedding's dimension if needed
                 # if embeddings_list and (not isinstance(embeddings_list[0], list) or len(embeddings_list[0]) != VECTOR_DIMENSION):
                 #      print(f"Error: Unexpected vector dimension. Expected {VECTOR_DIMENSION}, Got: {len(embeddings_list[0])}")
                 #      return None
                 return embeddings_list
            else:
                print(f"Error: Mismatch between input texts ({len(texts)}) and returned embeddings ({len(embeddings_list)}) from model '{model}'.")
                return None
        else:
             print(f"Error: 'embeddings' key missing or invalid in batch response for model '{model}'. Response: {response_json}")
             return None

    except requests.exceptions.Timeout:
        print(f"Error: Ollama API request timed out for batch request. Model: '{model}', Batch Size: {len(texts)}")
    except requests.exceptions.ConnectionError:
        print(f"Error: Could not connect to Ollama API at {OLLAMA_ENDPOINT}. Is Ollama running?")
    except requests.exceptions.RequestException as e:
        print(f"Error calling Ollama API for batch request. Model: '{model}', Batch Size: {len(texts)}: {e}")
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON response from Ollama API. Response Text: {response.text}")
    return None


# --- Azure Search: Create or Update Index Function ---
# (Keep the function as it is in your original code)
def create_or_update_index(endpoint: str, api_key: str, index_name: str, vector_dimension: int):
    """Creates/updates Azure AI Search index. Returns True if successful."""
    print(f"Attempting to create or update search index '{index_name}' with vector dimension {vector_dimension}...")
    try:
        credential = AzureKeyCredential(api_key)
        index_client = SearchIndexClient(endpoint=endpoint, credential=credential)
        fields = [
            SimpleField(name="movie_id", type=SearchFieldDataType.String, key=True, sortable=True, filterable=True, facetable=False),
            SearchableField(name="title", type=SearchFieldDataType.String, sortable=True, filterable=True),
            SearchableField(name="overview", type=SearchFieldDataType.String),
            SearchableField(name="tagline", type=SearchFieldDataType.String),
            SearchableField(name="genres", type=SearchFieldDataType.String, filterable=True, facetable=True),
            SearchField(
                name="embedding",
                type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                searchable=True,
                vector_search_dimensions=vector_dimension,
                vector_search_profile_name="my-hnsw-profile",
            ),
        ]
        vector_search = VectorSearch(
            profiles=[VectorSearchProfile(name="my-hnsw-profile", algorithm_configuration_name="my-hnsw-config")],
            algorithms=[HnswAlgorithmConfiguration(name="my-hnsw-config", parameters={"metric": "cosine"})],
        )
        index = SearchIndex(name=index_name, fields=fields, vector_search=vector_search)
        result = index_client.create_or_update_index(index)
        print(f"Index '{result.name}' created or updated successfully.")
        return True
    except Exception as e:
        print(f"Error creating/updating index '{index_name}': {e}")
        return False


# --- Text Preparation Helper ---
# (Keep the function as it is in your original code)
def prepare_text_and_metadata(row_tuple):
    """Prepares combined text and metadata from a row tuple."""
    global COLUMNS_TO_EMBED
    movie_id = None
    try:
        movie_id_raw = getattr(row_tuple, "id", None)
        movie_title = getattr(row_tuple, "title", "")

        # Added stricter ID check
        if pd.isna(movie_id_raw) or str(movie_id_raw).strip() == '': return None, None
        movie_id = str(movie_id_raw).strip()

        # Handle potential NaN/missing titles robustly
        if pd.isna(movie_title): movie_title = ""
        if not isinstance(movie_title, str): movie_title = str(movie_title)
        movie_title = movie_title.strip()

        text_parts = []
        genre_names = []

        if "title" in COLUMNS_TO_EMBED and movie_title:
            text_parts.append(f"Title: {movie_title}") # Adding labels might help

        overview_raw = getattr(row_tuple, "overview", "")
        overview_cleaned = ""
        if pd.notna(overview_raw) and str(overview_raw).strip():
             overview_cleaned = str(overview_raw).strip()
             if "overview" in COLUMNS_TO_EMBED:
                text_parts.append(f"Overview: {overview_cleaned}")

        tagline_raw = getattr(row_tuple, "tagline", "")
        tagline_cleaned = ""
        if pd.notna(tagline_raw) and str(tagline_raw).strip():
             tagline_cleaned = str(tagline_raw).strip()
             if "tagline" in COLUMNS_TO_EMBED:
                 text_parts.append(f"Tagline: {tagline_cleaned}")

        genres_cleaned_str = None
        if "genres" in COLUMNS_TO_EMBED:
            genres_str = getattr(row_tuple, "genres", "[]")
            if pd.notna(genres_str) and isinstance(genres_str, str) and genres_str.strip() not in ('[]', '{}', ''):
                try:
                    # Robust parsing for various possible string formats
                    potential_list = ast.literal_eval(genres_str)
                    if isinstance(potential_list, list):
                        for item in potential_list:
                            if isinstance(item, dict) and 'name' in item and item['name'] and str(item['name']).strip():
                                genre_names.append(str(item['name']).strip())
                            elif isinstance(item, str) and item.strip(): # Handle case where it's just a list of strings
                                genre_names.append(item.strip())
                except (ValueError, SyntaxError, TypeError):
                     # Fallback: Treat as simple comma-separated string if literal_eval fails
                     if ',' in genres_str:
                         genre_names.extend([g.strip() for g in genres_str.split(',') if g.strip()])
                     elif genres_str.strip(): # Treat as a single genre if no commas
                         genre_names.append(genres_str.strip())
            if genre_names:
                text_parts.append(f"Genres: {', '.join(genre_names)}")
                genres_cleaned_str = ", ".join(genre_names) # For metadata field

        combined_text = " ".join(filter(None, text_parts)).strip()
        # Ensure we have at least an ID and some text to proceed
        if not combined_text: return None, None

        metadata = {
            "movie_id": movie_id,
            # Use cleaned versions or None if empty/NaN originally
            "title": movie_title if movie_title else None,
            "overview": overview_cleaned if overview_cleaned else None,
            "tagline": tagline_cleaned if tagline_cleaned else None,
            "genres": genres_cleaned_str, # Already joined string or None
        }
        return combined_text, metadata

    except Exception as e:
        print(f"Error preparing text/metadata for potential ID '{getattr(row_tuple, 'id', 'UNKNOWN')}': {e}")
        return None, None


# --- Function to save raw results to JSON Lines file (Backup) ---
# (Keep the function as it is - it will be called at the end with all *successfully* processed docs)
def save_processed_docs_to_jsonl(documents: list, filename: str):
    """Saves the final list of successfully processed documents (with embeddings) to a JSON Lines file."""
    if not documents:
        print("\nNo documents with successful embeddings to save to JSONL.")
        return
    # Filter out documents that might be missing embeddings before saving
    docs_to_save = [doc for doc in documents if isinstance(doc.get('embedding'), list)]
    if not docs_to_save:
        print("\nNo documents with valid embeddings found to save to JSONL.")
        return

    print(f"\nAttempting to save {len(docs_to_save)} processed documents to {filename}...")
    saved_count = 0
    try:
        # Use 'w' mode to overwrite any previous incomplete file from prior runs
        with open(filename, 'w', encoding='utf-8') as f:
            for record in docs_to_save:
                record_to_save = record.copy()
                # Remove the search action field before saving, if present
                record_to_save.pop("@search.action", None)
                try:
                    json_string = json.dumps(record_to_save, ensure_ascii=False)
                    f.write(json_string + '\n')
                    saved_count += 1
                except TypeError as e:
                    print(f"Error: Could not serialize record ID '{record_to_save.get('movie_id')}' to JSON. Error: {e}. Record: {record_to_save}")
        print(f"Successfully saved {saved_count} documents to {filename}")
    except IOError as e:
        print(f"Error: Could not write to file {filename}. Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred during file saving: {e}")


#--- Azure Search: Upload Documents Function ---
# (Keep the function as it is in your original code - it handles batching internally)
def upload_documents_to_index(endpoint: str, api_key: str, index_name: str, documents: list, batch_size: int = UPLOAD_BATCH_SIZE):
    """Uploads a list of documents to the specified Azure Search index."""
    if not documents:
        print("\n(upload_documents_to_index) No documents provided for upload.")
        return 0
    # Ensure documents have the @search.action field
    for doc in documents:
        if "@search.action" not in doc:
            doc["@search.action"] = "mergeOrUpload" # Default action

    print(f"\nUploading {len(documents)} documents to index '{index_name}' (Azure client handles internal batching)...")
    credential = AzureKeyCredential(api_key)
    search_client = SearchClient(endpoint=endpoint, index_name=index_name, credential=credential)
    try:
        results = search_client.upload_documents(documents=documents)
        # Check results
        success_count = sum(1 for result in results if result.succeeded)
        fail_count = len(documents) - success_count
        total_uploaded_successfully = success_count

        print(f"  Upload attempt finished. Succeeded: {success_count}, Failed: {fail_count}")
        if fail_count > 0:
             for result in results:
                 if not result.succeeded:
                      # Limit error message length if it's very long
                      error_message = str(result.error_message)
                      if len(error_message) > 500:
                          error_message = error_message[:500] + "... (truncated)"
                      print(f"    Failed doc ID '{result.key}': {error_message}")
        return total_uploaded_successfully
    except Exception as e:
        print(f"  Error during bulk upload: {e}")
        # You might want more sophisticated error handling here,
        # like attempting smaller batches or individual uploads on failure.
        return 0


# --- Main Execution Block ---
if __name__ == "__main__":
    print(f"Script started at {time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Using Ollama model: {OLLAMA_MODEL} (Vector Dim: {VECTOR_DIMENSION}) at {OLLAMA_ENDPOINT}")
    print(f"Processing up to {RECORDS_TO_PROCESS} records.")
    print(f"Ollama embedding batch size: {OLLAMA_BATCH_SIZE}")
    print(f"Azure Search upload batch size: {UPLOAD_BATCH_SIZE}")
    script_start_time = time.time()

    # --- Validate Config ---
    if not SEARCH_SERVICE_ENDPOINT or "<" in SEARCH_SERVICE_ENDPOINT or not SEARCH_SERVICE_ENDPOINT.startswith("https"):
        print("Error: Azure Search Service Endpoint not configured correctly (should start with https://)."); exit(1)
    if not SEARCH_API_KEY or "<" in SEARCH_API_KEY:
        print("Error: Azure Search Admin API Key not configured."); exit(1)
    if not SEARCH_INDEX_NAME:
        print("Error: Search Index Name not configured."); exit(1)
    if not os.path.exists(FILE_PATH):
        print(f"Error: Source file not found at '{FILE_PATH}'."); exit(1)

    # --- 1. Create or Update Azure Search Index ---
    index_ready = create_or_update_index(
        endpoint=SEARCH_SERVICE_ENDPOINT,
        api_key=SEARCH_API_KEY,
        index_name=SEARCH_INDEX_NAME,
        vector_dimension=VECTOR_DIMENSION
    )
    if not index_ready: print("Exiting script because index creation/update failed."); exit(1)

    # --- 2. Read Data Subset ---
    df_processed = pd.DataFrame()
    actual_records_read = 0
    try:
        print(f"\nReading columns {COLUMNS_TO_READ} for first {RECORDS_TO_PROCESS} rows from '{FILE_PATH}'...")
        # Consider using chunking for very large files if memory is an issue
        df_processed = pd.read_csv(
            FILE_PATH,
            usecols=COLUMNS_TO_READ,
            nrows=RECORDS_TO_PROCESS,
            dtype={'id': str}, # Keep ID as string
            low_memory=False,
            on_bad_lines='warn' # or 'skip' if you prefer
        )
        # Drop rows where essential columns for embedding are missing entirely AFTER reading
        df_processed.dropna(subset=['id', 'title', 'overview'], how='any', inplace=True)
        # Reset index after dropping rows if needed, though itertuples handles it
        df_processed.reset_index(drop=True, inplace=True)

        actual_records_read = len(df_processed)
        print(f"Successfully read and initially filtered {actual_records_read} rows.")
        if actual_records_read < RECORDS_TO_PROCESS:
             print(f"Info: Requested {RECORDS_TO_PROCESS}, file/section contains {actual_records_read} after initial read/filtering.")
    except FileNotFoundError: print(f"Error: File not found at '{FILE_PATH}'."); exit(1)
    except ValueError as e: print(f"Error reading CSV. Check columns {COLUMNS_TO_READ} are present in '{FILE_PATH}'. Details: {e}"); exit(1)
    except Exception as e: print(f"An unexpected error occurred reading CSV: {e}"); exit(1)


    # --- 3. Process Movies, Generate Embeddings, and Upload Incrementally ---
    # Use one list to accumulate docs for Azure upload batching
    documents_for_azure_batch = []
    # Use a separate list to keep track of ALL successfully processed docs for final JSONL backup
    all_successful_documents_for_backup = []
    total_processed_count = 0
    success_embedding_count = 0
    fail_count = 0
    total_uploaded_count = 0 # Track total docs successfully sent to Azure

    if df_processed.empty:
        print("DataFrame is empty after initial filtering, skipping embedding generation.")
    else:
        num_ollama_batches = ceil(actual_records_read / OLLAMA_BATCH_SIZE)
        print(f"\nStarting batch embedding generation for {actual_records_read} movies in {num_ollama_batches} Ollama batches...")
        processing_start_time = time.time()

        for i in range(num_ollama_batches):
            batch_start_time = time.time()
            start_idx = i * OLLAMA_BATCH_SIZE
            end_idx = min(start_idx + OLLAMA_BATCH_SIZE, actual_records_read) # Ensure end_idx doesn't exceed actual rows
            df_batch = df_processed.iloc[start_idx:end_idx] # Use iloc for positional slicing

            print(f"\nProcessing Ollama Batch {i+1}/{num_ollama_batches} (Rows {start_idx+1}-{end_idx})...")

            texts_to_embed = []
            batch_metadata = [] # Store corresponding metadata for successful text preps
            batch_prep_fail_count = 0 # Count failures within this batch's prep stage

            # Prepare texts and metadata for the current Ollama batch
            for row_tuple in df_batch.itertuples(index=False, name='MovieRow'): # Use iloc, so index=False is safe
                total_processed_count += 1
                combined_text, metadata = prepare_text_and_metadata(row_tuple)
                if combined_text and metadata and metadata.get("movie_id"): # Ensure ID is valid
                    texts_to_embed.append(combined_text)
                    batch_metadata.append(metadata)
                else:
                    batch_prep_fail_count += 1 # Count prep failures (missing ID, no text, etc.)

            fail_count += batch_prep_fail_count # Add prep failures to total fail count

            if not texts_to_embed:
                print(f"  Ollama Batch {i+1}: No valid text generated from rows. Skipping API call.")
                continue # Skip to next Ollama batch if no texts were prepared

            # Call Ollama API for the batch of prepared texts
            print(f"  Ollama Batch {i+1}: Sending {len(texts_to_embed)} texts to Ollama...")
            embeddings_batch = get_ollama_embeddings_batch(texts_to_embed, OLLAMA_MODEL)

            # Process the batch response
            if embeddings_batch and len(embeddings_batch) == len(batch_metadata):
                print(f"  Ollama Batch {i+1}: Received {len(embeddings_batch)} embeddings successfully.")
                # Combine embeddings with metadata and add to the upload queue
                batch_docs_processed_count = 0
                for embedding, meta in zip(embeddings_batch, batch_metadata):
                     # Basic check on received embedding before adding
                     if isinstance(embedding, list) and len(embedding) == VECTOR_DIMENSION:
                         doc = meta.copy() # Start with prepared metadata
                         doc["embedding"] = embedding
                         # doc["@search.action"] = "mergeOrUpload" # Action added in upload func if needed
                         documents_for_azure_batch.append(doc)
                         all_successful_documents_for_backup.append(doc) # Also add to backup list
                         success_embedding_count += 1
                         batch_docs_processed_count += 1
                     else:
                         print(f"  Warning: Skipping record ID '{meta.get('movie_id')}' due to invalid embedding vector received in batch.")
                         fail_count += 1

                print(f"  Ollama Batch {i+1}: Added {batch_docs_processed_count} docs to upload queue (Current queue size: {len(documents_for_azure_batch)}).")

                # --- >>> MODIFICATION: Trigger Azure upload if batch size reached <<< ---
                if len(documents_for_azure_batch) >= UPLOAD_BATCH_SIZE:
                    print(f"\n--- Triggering intermediate Azure upload: Queue size ({len(documents_for_azure_batch)}) >= Upload batch size ({UPLOAD_BATCH_SIZE}) ---")
                    uploaded_in_batch = upload_documents_to_index(
                        endpoint=SEARCH_SERVICE_ENDPOINT,
                        api_key=SEARCH_API_KEY,
                        index_name=SEARCH_INDEX_NAME,
                        documents=documents_for_azure_batch, # Upload the accumulated batch
                        batch_size=UPLOAD_BATCH_SIZE # Pass Azure batch size for info (client handles actual batching)
                    )
                    if uploaded_in_batch > 0:
                        total_uploaded_count += uploaded_in_batch
                        documents_for_azure_batch = [] # IMPORTANT: Clear the list after successful upload attempt
                        print(f"--- Intermediate upload finished. Cleared queue. Total uploaded so far: {total_uploaded_count} ---")
                    else:
                        print(f"--- WARNING: Intermediate upload attempt failed for {len(documents_for_azure_batch)} documents. They remain in the queue. ---")
                        # Decide on error handling: stop? retry later? skip these docs?
                        # For now, they stay in the queue and will be retried with the next batch trigger or final upload.

            else:
                # API call failed or returned incorrect number of embeddings for the batch
                print(f"  Ollama Batch {i+1}: Failed to get embeddings for {len(batch_metadata)} items.")
                fail_count += len(batch_metadata) # Mark all items in this API call as failed

            batch_duration = time.time() - batch_start_time
            print(f"  Ollama Batch {i+1} processing time: {batch_duration:.2f} seconds")
            # Estimate remaining time
            if success_embedding_count > 0:
                elapsed_time = time.time() - processing_start_time
                avg_time_per_doc = elapsed_time / success_embedding_count
                remaining_docs = actual_records_read - total_processed_count
                estimated_remaining_time = remaining_docs * avg_time_per_doc
                print(f"  Estimated time remaining: {estimated_remaining_time / 60:.1f} minutes")


        total_processing_duration = time.time() - processing_start_time
        print(f"\nFinished embedding generation loop.")
        print(f"Total items processed (read rows attempted): {total_processed_count}")
        print(f"Successful embeddings generated: {success_embedding_count}, Failed/Skipped: {fail_count}")
        if success_embedding_count > 0:
             print(f"Average time per successful embedding (processing phase): {total_processing_duration / success_embedding_count:.3f} seconds")


    # --- 4. Upload any remaining documents ---
    if documents_for_azure_batch:
        print(f"\n--- Triggering final upload for remaining {len(documents_for_azure_batch)} documents ---")
        uploaded_in_final_batch = upload_documents_to_index(
            endpoint=SEARCH_SERVICE_ENDPOINT,
            api_key=SEARCH_API_KEY,
            index_name=SEARCH_INDEX_NAME,
            documents=documents_for_azure_batch,
            batch_size=UPLOAD_BATCH_SIZE
        )
        if uploaded_in_final_batch > 0:
            total_uploaded_count += uploaded_in_final_batch
            print(f"--- Final upload finished. Total documents uploaded overall: {total_uploaded_count} ---")
        else:
             print(f"--- WARNING: Final upload attempt failed for {len(documents_for_azure_batch)} documents. ---")
        documents_for_azure_batch = [] # Clear list
    else:
        print("\nNo remaining documents needed final upload.")


    # --- 5. Save ALL Processed Docs (with embeddings) to JSONL (Backup) ---
    # This now saves everything that was successfully embedded, regardless of when it was uploaded
    if all_successful_documents_for_backup:
        save_processed_docs_to_jsonl(all_successful_documents_for_backup, OUTPUT_FILE)
    else:
        print("\nNo documents successfully embedded, skipping JSONL backup.")


    # --- Finish ---
    script_end_time = time.time()
    print(f"\nScript finished at {time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total execution time: {(script_end_time - script_start_time):.2f} seconds")
    print(f"Total documents successfully uploaded to Azure Search: {total_uploaded_count}")
    print(f"Total documents successfully embedded (and saved to backup): {success_embedding_count}")
    print(f"Total documents failed or skipped during processing: {fail_count}")

NameError: name '__file__' is not defined