In [None]:
import os
import polars as pl
import sys
from dotenv import load_dotenv
from IPython.display import display

load_dotenv()

sys.path.append(os.path.dirname(os.getcwd()))

# Import Culture Extractor client
import libraries.client_culture_extractor as client_culture_extractor

# Import StashApp client
from libraries.client_stashapp import StashAppClient, get_stashapp_client

# Initialize Culture Extractor client
user = os.environ.get("CE_DB_USERNAME")
pw = os.environ.get("CE_DB_PASSWORD")
host = os.environ.get("CE_DB_HOST")
port = os.environ.get("CE_DB_PORT")
db = os.environ.get("CE_DB_NAME")

connection_string = f"dbname={db} user={user} password={pw} host={host} port={port}"
culture_extractor_client = client_culture_extractor.ClientCultureExtractor(connection_string)

# Initialize Stashapp clients
stash_client = StashAppClient()
stash_raw_client = get_stashapp_client()

# Define Culture Extractor endpoint
CULTURE_EXTRACTOR_ENDPOINT = "https://culture.extractor/graphql"

# DEBUG FLAG: Set to True to write intermediate outputs to JSON files
DEBUG_WRITE_JSON = True
DEBUG_OUTPUT_DIR = os.path.join(os.path.dirname(os.getcwd()), ".debug_outputs")


def write_debug_json(df: pl.DataFrame, name: str):
    """Write DataFrame to JSON file if DEBUG_WRITE_JSON is enabled."""
    if DEBUG_WRITE_JSON:
        # Ensure debug output directory exists
        os.makedirs(DEBUG_OUTPUT_DIR, exist_ok=True)
        
        filename = f"set_id_from_db__{name}.json"
        filepath = os.path.join(DEBUG_OUTPUT_DIR, filename)
        df.write_json(filepath)
        print(f"[DEBUG] Wrote {len(df)} rows to {filepath}")


def get_existing_ce_stash_id(existing_stash_ids: list) -> str | None:
    """Get existing Culture Extractor stash_id if it exists."""
    for sid in existing_stash_ids:
        if sid.get("endpoint") == CULTURE_EXTRACTOR_ENDPOINT:
            return sid.get("stash_id")
    return None


def merge_stash_ids(
    existing_stash_ids: list, new_endpoint: str, new_stash_id: str
) -> list:
    """Merge new stash_id with existing ones, preserving other endpoints."""
    # Start with existing stash_ids, filtering out any existing culture.extractor entries
    merged = [sid for sid in existing_stash_ids if sid.get("endpoint") != new_endpoint]

    # Add the new culture.extractor stash_id
    merged.append({"endpoint": new_endpoint, "stash_id": new_stash_id})

    return merged

In [None]:
# Get all sites from Culture Extractor
all_ce_sites = culture_extractor_client.get_sites()
print(f"Found {len(all_ce_sites)} sites in Culture Extractor")

# Write debug output
write_debug_json(all_ce_sites, "all_ce_sites")

all_ce_sites

In [None]:
# Get all scenes from Stashapp with their file fingerprints and existing stash_ids
scenes = stash_raw_client.find_scenes(
    fragment="""
    id
    title
    stash_ids {
        endpoint
        stash_id
    }
    files {
        id
        basename
        path
        fingerprints {
            type
            value
        }
    }
    """
)

# Process scenes to extract oshash from fingerprints
results = []

for scene in scenes:
    scene_id = scene.get("id")
    scene_title = scene.get("title")
    existing_stash_ids = scene.get("stash_ids", [])
    files = scene.get("files", [])
    
    # Get primary file (first file)
    if files:
        primary_file = files[0]
        file_basename = primary_file.get("basename")
        file_path = primary_file.get("path")
        fingerprints = primary_file.get("fingerprints", [])
        
        # Extract oshash
        oshash = next(
            (fp["value"] for fp in fingerprints if fp["type"] == "oshash"),
            None
        )
        
        # Only include scenes with oshash
        if oshash:
            results.append(
                {
                    "scene_id": scene_id,
                    "scene_title": scene_title,
                    "file_basename": file_basename,
                    "file_path": file_path,
                    "oshash": oshash,
                    "existing_stash_ids": existing_stash_ids,
                }
            )

# Create DataFrame
scenes_df = pl.DataFrame(results)

print(f"Total scenes: {len(scenes)}")
print(f"Scenes with oshash: {len(scenes_df)}")

# Write debug output
write_debug_json(scenes_df, "scenes_df")

scenes_df

In [None]:
# Get all downloads from Culture Extractor database
# You can filter by specific site_uuid if needed, or get all sites

all_downloads = []

# Default to a single site for testing - change site name or set to None to process all sites
SELECTED_SITE_NAME = "Meana Wolf"  # Set to None to process all sites

if SELECTED_SITE_NAME:
    selected_sites = all_ce_sites.filter(pl.col("ce_sites_name") == SELECTED_SITE_NAME)
    print(f"Processing single site: {SELECTED_SITE_NAME}")
else:
    selected_sites = all_ce_sites
    print(f"Processing all {len(all_ce_sites)} sites")

for site_row in selected_sites.iter_rows(named=True):
    site_uuid = site_row["ce_sites_uuid"]
    site_name = site_row["ce_sites_name"]
    
    print(f"Fetching downloads for {site_name}...")
    site_downloads = culture_extractor_client.get_downloads(site_uuid)
    
    if len(site_downloads) > 0:
        all_downloads.append(site_downloads)
        print(f"  Found {len(site_downloads)} downloads")

# Concatenate all downloads
if all_downloads:
    downloads_df = pl.concat(all_downloads)
else:
    downloads_df = pl.DataFrame()

# Filter to video files with content_type "scene" and oshash (for exact matching)
downloads_with_oshash = downloads_df.filter(
    (pl.col("ce_downloads_file_type") == "video") &
    (pl.col("ce_downloads_content_type") == "scene") &
    (pl.col("ce_downloads_hash_oshash").is_not_null())
)

print(f"\nTotal downloads: {len(downloads_df)}")
print(f"Video scene downloads with oshash: {len(downloads_with_oshash)}")

# Write debug output
write_debug_json(downloads_with_oshash, "downloads_with_oshash")

downloads_with_oshash

In [None]:
# Join Culture Extractor downloads with Stashapp scenes on oshash (exact matching only)
joined_df = scenes_df.join(
    downloads_with_oshash,
    left_on="oshash",
    right_on="ce_downloads_hash_oshash",
    how="inner",
)

print(f"Matched scenes: {len(joined_df)}")

# Write debug output for joined data
write_debug_json(joined_df, "joined_df")

# Filter out scenes that already have the matching Culture Extractor stash ID
scenes_to_update = []
scenes_already_set = []

for row in joined_df.iter_rows(named=True):
    existing_stash_ids = row["existing_stash_ids"] or []
    existing_ce_stash_id = get_existing_ce_stash_id(existing_stash_ids)
    ce_release_uuid = row["ce_downloads_release_uuid"]

    # Check if the same UUID is already set
    if existing_ce_stash_id == ce_release_uuid:
        scenes_already_set.append(row)
    else:
        scenes_to_update.append(row)

# Create DataFrames for verification
scenes_to_update_df = (
    pl.DataFrame(scenes_to_update) if scenes_to_update else pl.DataFrame()
)
scenes_already_set_df = (
    pl.DataFrame(scenes_already_set) if scenes_already_set else pl.DataFrame()
)

print(f"\nScenes that need updating: {len(scenes_to_update_df)}")
print(f"Scenes already set (skipped): {len(scenes_already_set_df)}")

# Write debug outputs
if len(scenes_to_update_df) > 0:
    write_debug_json(scenes_to_update_df, "scenes_to_update_df")
if len(scenes_already_set_df) > 0:
    write_debug_json(scenes_already_set_df, "scenes_already_set_df")

if len(scenes_already_set_df) > 0:
    print("\nScenes already set with matching Culture Extractor stash ID:")
    display(
        scenes_already_set_df.select(
            [
                "scene_id",
                "scene_title",
                "file_basename",
                "ce_downloads_release_name",
                "ce_downloads_release_uuid",
            ]
        )
    )

print("\nScenes to be updated:")

scenes_to_update_df.select(
    [
        "scene_id",
        "scene_title",
        "file_basename",
        "ce_downloads_release_name",
        "ce_downloads_release_uuid",
        "ce_downloads_site_name",
    ]
)


In [None]:
# Apply step: Update scenes with Culture Extractor release UUIDs as stash_ids
# (preserving existing stash_ids from other endpoints)

update_results = []

for row in scenes_to_update_df.iter_rows(named=True):
    scene_id = row["scene_id"]
    ce_release_uuid = row["ce_downloads_release_uuid"]
    scene_title = row["scene_title"]
    file_basename = row["file_basename"]
    ce_release_name = row["ce_downloads_release_name"]
    ce_site_name = row["ce_downloads_site_name"]
    existing_stash_ids = row["existing_stash_ids"] or []

    try:
        # Merge stash_ids preserving existing ones
        merged_stash_ids = merge_stash_ids(
            existing_stash_ids, CULTURE_EXTRACTOR_ENDPOINT, ce_release_uuid
        )

        # Update the scene with merged stash_ids
        result = stash_raw_client.update_scene(
            {
                "id": scene_id,
                "stash_ids": merged_stash_ids,
            }
        )

        update_results.append(
            {
                "scene_id": scene_id,
                "scene_title": scene_title,
                "file_basename": file_basename,
                "ce_site_name": ce_site_name,
                "ce_release_name": ce_release_name,
                "ce_release_uuid": ce_release_uuid,
                "status": "success",
                "error": None,
            }
        )

        print(
            f"✓ Updated scene {scene_id} ({scene_title}) with CE UUID {ce_release_uuid} "
            f"from {ce_site_name} - {ce_release_name} (preserved {len(existing_stash_ids)} existing stash_ids)"
        )

    except Exception as e:
        update_results.append(
            {
                "scene_id": scene_id,
                "scene_title": scene_title,
                "file_basename": file_basename,
                "ce_site_name": ce_site_name,
                "ce_release_name": ce_release_name,
                "ce_release_uuid": ce_release_uuid,
                "status": "error",
                "error": str(e),
            }
        )

        print(f"✗ Failed to update scene {scene_id} ({scene_title}): {e}")

In [None]:
# Verification of apply step results
if update_results:
    update_results_df = pl.DataFrame(update_results)
    print(f"Total scenes processed: {len(update_results_df)}")
    print(
        f"Successful updates: {len(update_results_df.filter(pl.col('status') == 'success'))}"
    )
    print(
        f"Failed updates: {len(update_results_df.filter(pl.col('status') == 'error'))}"
    )

    # Write debug output
    write_debug_json(update_results_df, "update_results_df")

    # Show any errors
    errors_df = update_results_df.filter(pl.col("status") == "error")
    if len(errors_df) > 0:
        print("\nErrors encountered:")
        display(errors_df)

    # Show successful updates
    success_df = update_results_df.filter(pl.col("status") == "success")
    print("\nSuccessful updates:")
    success_df
else:
    print("No scenes needed updating.")