In [1]:
#!/usr/bin/env python3
"""
SmartSPIM Assets Analysis Script for PATCHSEQ Subjects

This script queries the AIND metadata database to find all SmartSPIM assets
for the specified PATCHSEQ subjects and analyzes their procedures metadata.
"""

import pandas as pd
from typing import List, Dict, Any
import requests
import json
from datetime import datetime
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)

# Prevent text truncation in cells
pd.set_option('display.max_colwidth', None)

# Also useful for preventing row/column truncation
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

# If you want to see the full content but with some reasonable limits
pd.set_option('display.max_colwidth', 1000)  # Show up to 1000 characters per cell

In [2]:
# PATCHSEQ Subject IDs
PATCHSEQ_SUBJECT_IDS = [
    "716946", "716947", "716948", "716949", "716950", "716951",
    "725231", "725328", "725329", "728854", "731907", "737038",
    "737042", "744117", "744119", "746041", "746043", "746045",
    "746046", "751017", "751019", "751023", "751024", "751035",
    "755069", "755071", "755072", "755073", "755790", "762196", "762199"
]

In [3]:
def has_procedures(procedures: Dict[str, Any]) -> bool:
    """
    Check if procedures exists and is populated with more than just metadata.
    
    Args:
        procedures: The procedures dictionary from the asset
        
    Returns:
        bool: True if procedures contains meaningful content
    """
    if not procedures:
        return False
    
    # Check if procedures is more than just basic metadata
    metadata_keys = {'describedBy', 'schema_version', 'subject_id', 'notes'}
    content_keys = set(procedures.keys()) - metadata_keys
    
    if not content_keys:
        return False
    
    # Check if any content keys have non-empty values
    for key in content_keys:
        value = procedures[key]
        if isinstance(value, list) and len(value) > 0:
            return True
        if isinstance(value, dict) and len(value) > 0:
            return True
        if value and not isinstance(value, (list, dict)):
            return True
    
    return False

def has_specimen_procedures(procedures: Dict[str, Any]) -> bool:
    """
    Check if specimen_procedures exists and is populated.
    
    Args:
        procedures: The procedures dictionary from the asset
        
    Returns:
        bool: True if specimen_procedures is a non-empty list
    """
    if not procedures or 'specimen_procedures' not in procedures:
        return False
    
    specimen_procedures = procedures['specimen_procedures']
    return isinstance(specimen_procedures, list) and len(specimen_procedures) > 0

def get_specimen_procedures_count(procedures: Dict[str, Any]) -> int:
    """
    Get the number of specimen procedures.
    
    Args:
        procedures: The procedures dictionary from the asset
        
    Returns:
        int: Number of specimen procedures (0 if none or not present)
    """
    if not has_specimen_procedures(procedures):
        return 0
    
    return len(procedures['specimen_procedures'])

def query_smartspim_assets() -> List[Dict[str, Any]]:
    """
    Query the AIND metadata database for all SmartSPIM assets for PATCHSEQ subjects.
    
    Returns:
        List of asset dictionaries with metadata
    """
    
    # Use the find endpoint with filters and projection
    url = "https://api.allenneuraldynamics.org/v1/metadata_index/data_assets/find"
    
    # Query parameters
    params = {
        "filter": json.dumps({
            "subject.subject_id": {"$in": PATCHSEQ_SUBJECT_IDS},
            "data_description.modality": {
                "$elemMatch": {
                    "abbreviation": "SPIM"
                }
            }
        }),
        "projection": json.dumps({
            "_id": 0,
            "procedures": 1,
            "subject.subject_id": 1,
            "data_description.name": 1,
            "data_description.data_level": 1
        }),
        "limit": 100  # Increase if needed
    }
    
    try:
        response = requests.get(url, params=params, timeout=30)
        response.raise_for_status()
        assets = response.json()
        
        # Transform the response to match expected format
        transformed_assets = []
        for asset in assets:
            transformed_asset = {
                "subject_id": asset.get("subject", {}).get("subject_id"),
                "asset_name": asset.get("data_description", {}).get("name"),
                "data_level": asset.get("data_description", {}).get("data_level"),
                "procedures": asset.get("procedures", {})
            }
            transformed_assets.append(transformed_asset)
        
        return transformed_assets
        
    except requests.exceptions.RequestException as e:
        print(f"Error querying AIND API: {e}")
        print("You may need to use the AIND data access tools directly.")
        print("Consider using the aind-data-access-api package or MCP server.")
        return []

def extract_date_from_asset_name(asset_name: str) -> str:
    """
    Extract and simplify the creation date from the asset name.
    
    Args:
        asset_name: The asset name containing date information
        
    Returns:
        str: Simplified date in YYYY-MM-DD format
    """
    try:
        # Look for date pattern in asset name (e.g., "2024-08-14_17-13-33")
        import re
        date_pattern = r'(\d{4}-\d{2}-\d{2})'
        match = re.search(date_pattern, asset_name)
        
        if match:
            date_str = match.group(1)
            # Parse and return just the date part
            parsed_date = datetime.strptime(date_str, '%Y-%m-%d')
            return parsed_date.strftime('%Y-%m-%d')
        else:
            return 'Unknown'
    except Exception as e:
        print(f"Error extracting date from asset name '{asset_name}': {e}")
        return 'Unknown'

def query_metadata_service_for_specimen_procedures(subject_id: str, asset_name: str, timeout: int = 30) -> bool:
    """
    Query the metadata service to check if specimen procedures are available.
    Also caches the result to the local procedures folder.
    
    Args:
        subject_id: The subject ID
        asset_name: The asset name
        timeout: Request timeout in seconds
        
    Returns:
        bool: True if specimen procedures are found in the metadata service
    """
    try:
        # Query the metadata service for the specific subject
        url = f"http://aind-metadata-service/procedures/{subject_id}"
        
        print(f"    Querying: {url}")
        response = requests.get(url, timeout=timeout)
        
        # Handle 200 (success) and 406 (validation errors but data returned)
        if response.status_code in [200, 406]:
            try:
                response_data = response.json()
                
                # For 406 responses, extract just the 'data' field and promote it to top level
                if response.status_code == 406:
                    if 'data' in response_data:
                        procedures_data = response_data['data']
                        print(f"      Extracted data from 406 response (ignoring validation errors)")
                    else:
                        print(f"      HTTP 406 response missing 'data' field")
                        return False
                else:
                    # For 200 responses, use the response directly
                    procedures_data = response_data
                
                # Cache the clean procedures data
                cache_dir = f"procedures/{subject_id}"
                os.makedirs(cache_dir, exist_ok=True)
                cache_file = f"{cache_dir}/procedures.json.v1"
                
                with open(cache_file, 'w') as f:
                    json.dump(procedures_data, f, indent=2)
                
                if response.status_code == 406:
                    print(f"      Cached clean procedures data (removed validation error wrapper)")
                else:
                    print(f"      Cached fresh procedures data to {cache_file}")
                
                # Check if specimen_procedures exists and has content
                if 'specimen_procedures' in procedures_data:
                    specimen_procedures = procedures_data['specimen_procedures']
                    if isinstance(specimen_procedures, list) and len(specimen_procedures) > 0:
                        print(f"      Found {len(specimen_procedures)} specimen procedures")
                        return True
                    else:
                        print(f"      No specimen procedures found")
                        return False
                else:
                    print(f"      No specimen_procedures field in response")
                    return False
                    
            except json.JSONDecodeError as e:
                print(f"      Error parsing JSON response: {e}")
                return False
                
        elif response.status_code == 500:
            print(f"      HTTP 500: Internal Server Error - metadata service is having issues")
            print(f"      This suggests the service is not working properly for subject {subject_id}")
            return False
            
        else:
            print(f"      HTTP {response.status_code}: {response.text}")
            return False
            
    except requests.exceptions.RequestException as e:
        print(f"      Request error: {e}")
        return False
    except Exception as e:
        print(f"      Unexpected error: {e}")
        return False

def clear_procedures_cache(subject_ids: List[str]) -> int:
    """Clear cached procedures files for the given subject IDs."""
    print("Clearing cached procedures files for PATCHSEQ subjects...")
    cleared_count = 0
    
    for subject_id in subject_ids:
        cache_dir = f"procedures/{subject_id}"
        v1_file = f"{cache_dir}/procedures.json.v1"
        v2_file = f"{cache_dir}/procedures.json.v2"
        
        if os.path.exists(v1_file):
            os.remove(v1_file)
            cleared_count += 1
            print(f"  Cleared v1 cache for subject {subject_id}")
        
        if os.path.exists(v2_file):
            os.remove(v2_file)
            cleared_count += 1
            print(f"  Cleared v2 cache for subject {subject_id}")
    
    print(f"Cleared {cleared_count} cached procedures files.")
    return cleared_count

def get_cached_specimen_procedures_result(subject_id: str) -> bool:
    """Read cached procedures file and check if specimen procedures exist."""
    cache_file = f"procedures/{subject_id}/procedures.json.v1"
    try:
        with open(cache_file, 'r') as f:
            cached_data = json.load(f)
        
        # Check if specimen_procedures exists and has content
        if 'specimen_procedures' in cached_data:
            specimen_procedures = cached_data['specimen_procedures']
            return isinstance(specimen_procedures, list) and len(specimen_procedures) > 0
        else:
            return False
            
    except Exception as e:
        print(f"    Error reading cached file for {subject_id}: {e}")
        return False

def query_single_subject_parallel(record: pd.Series, timeout: int = 30) -> tuple:
    """Query a single subject and return the result. Used in parallel processing."""
    subject_id = record['subject_id']
    asset_name = record['asset_name']
    
    # Check if we already have a cached file
    cache_file = f"procedures/{subject_id}/procedures.json.v1"
    print(f"    Checking cache file: {cache_file}")
    print(f"    Cache file exists: {os.path.exists(cache_file)}")
    
    if os.path.exists(cache_file):
        print(f"  Subject {subject_id}: Using existing cache, skipping download")
        return subject_id, asset_name, "cached"
    
    print(f"  Subject {subject_id}: No cache found, querying metadata service...")
    
    # Always query and overwrite cache
    try:
        result = query_metadata_service_for_specimen_procedures(subject_id, asset_name, timeout)
        print(f"  Subject {subject_id}: {result}")
        return subject_id, asset_name, result
    except Exception as e:
        print(f"  Error querying metadata service for subject {subject_id}: {e}")
        return subject_id, asset_name, False

def process_parallel_query_results(future_results: list, df: pd.DataFrame) -> pd.DataFrame:
    """Process the results from parallel queries and update the dataframe."""
    for subject_id, asset_name, result in future_results:
        # Update the dataframe
        mask = (df['subject_id'] == subject_id) & (df['asset_name'] == asset_name)
        
        if result == "cached":
            # Read from existing cache to get the actual result
            cached_result = get_cached_specimen_procedures_result(subject_id)
            df.loc[mask, 'specimen_procedures_from_service'] = cached_result
            print(f"    Cached result for {subject_id}: {cached_result}")
        else:
            df.loc[mask, 'specimen_procedures_from_service'] = result
    
    return df

def query_metadata_service_parallel(df: pd.DataFrame, max_workers: int = 10, timeout: int = 30) -> pd.DataFrame:
    """Query metadata service in parallel for ALL records to get complete picture."""
    print(f"Querying metadata service for ALL {len(df)} records to check specimen procedures availability...")
    print("This will overwrite the old cached v1 files with fresh data from the fixed metadata service.")
    
    # Use ThreadPoolExecutor for parallel processing
    max_workers = min(max_workers, len(df))
    print(f"Using {max_workers} parallel workers for metadata service queries...")
    print(f"Request timeout set to {timeout} seconds")
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all queries
        future_to_record = {
            executor.submit(query_single_subject_parallel, record, timeout): record 
            for _, record in df.iterrows()
        }
        
        # Collect results
        future_results = []
        error_count = 0
        for future in as_completed(future_to_record):
            try:
                result = future.result()
                future_results.append(result)
                if result[2] is False:  # Check if result is False (error)
                    error_count += 1
            except Exception as e:
                print(f"  Error in parallel query: {e}")
                error_count += 1
    
    # Process results and update dataframe
    df = process_parallel_query_results(future_results, df)
    
    print("Metadata service queries completed.")
    print(f"Total queries: {len(df)}")
    print(f"Successful queries: {len(df) - error_count}")
    print(f"Failed queries: {error_count}")
    
    if error_count > 0:
        print("\nWARNING: Many queries failed with HTTP 500 errors.")
        print("This suggests the metadata service is having issues.")
        print("The missing specimen procedures may be due to service problems, not data availability.")
    
    return df

def create_initial_dataframe(assets: List[Dict[str, Any]]) -> pd.DataFrame:
    """Create the initial dataframe from assets."""
    results = []
    for asset in assets:
        procedures = asset.get('procedures', {})
        
        result = {
            'subject_id': asset['subject_id'],
            'asset_name': asset['asset_name'],
            'data_level': asset['data_level'],
            'has_procedures': has_procedures(procedures),
            'has_specimen_procedures': has_specimen_procedures(procedures),
            'number_of_specimen_procedures': get_specimen_procedures_count(procedures)
        }
        results.append(result)
    
    df = pd.DataFrame(results)
    
    print(f"Analysis complete. Found {len(df)} total assets.")
    print(f"Subjects with SmartSPIM data: {df['subject_id'].nunique()}")
    print(f"Total subjects in PATCHSEQ list: {len(PATCHSEQ_SUBJECT_IDS)}")
    
    return df

def analyze_smartspim_assets(clear_cache: bool = False, max_workers: int = 10, timeout: int = 30) -> pd.DataFrame:
    """
    Main function to analyze SmartSPIM assets and return a pandas DataFrame.
    
    Args:
        clear_cache: If True, clear all cached procedures files for PATCHSEQ subjects
        max_workers: Maximum number of parallel workers for metadata service queries
        timeout: Request timeout in seconds for metadata service queries
        
    Returns:
        pd.DataFrame: Analysis results with columns:
            - subject_id
            - asset_name  
            - data_level
            - has_procedures
            - has_specimen_procedures
            - number_of_specimen_procedures
            - creation_date
            - specimen_procedures_from_service
    """
    
    # Clear cache if requested
    if clear_cache:
        clear_procedures_cache(PATCHSEQ_SUBJECT_IDS)
    
    # Query and analyze assets
    print("Querying AIND metadata database for SmartSPIM assets...")
    assets = query_smartspim_assets()
    
    if not assets:
        print("No assets found or query failed.")
        return pd.DataFrame()
    
    print(f"Found {len(assets)} SmartSPIM assets. Analyzing procedures...")
    
    # Create initial dataframe
    df = create_initial_dataframe(assets)
    
    # Add creation dates
    print("Adding creation dates...")
    df['creation_date'] = df['asset_name'].apply(lambda x: extract_date_from_asset_name(x))
    
    # Add specimen procedures from metadata service
    df['specimen_procedures_from_service'] = df['has_specimen_procedures'].copy()
    
    # Query metadata service in parallel for ALL records
    df = query_metadata_service_parallel(df, max_workers=max_workers, timeout=timeout)
    
    return df

In [4]:
df = analyze_smartspim_assets(clear_cache=False, max_workers=1, timeout=90)

Querying AIND metadata database for SmartSPIM assets...
Found 66 SmartSPIM assets. Analyzing procedures...
Analysis complete. Found 66 total assets.
Subjects with SmartSPIM data: 31
Total subjects in PATCHSEQ list: 31
Adding creation dates...
Querying metadata service for ALL 66 records to check specimen procedures availability...
This will overwrite the old cached v1 files with fresh data from the fixed metadata service.
Using 1 parallel workers for metadata service queries...
Request timeout set to 90 seconds
    Checking cache file: procedures/737042/procedures.json.v1
    Cache file exists: False
  Subject 737042: No cache found, querying metadata service...
    Querying: http://aind-metadata-service/procedures/737042
      Extracted data from 406 response (ignoring validation errors)
      Cached clean procedures data (removed validation error wrapper)
      No specimen procedures found
  Subject 737042: False
    Checking cache file: procedures/751023/procedures.json.v1
    Cache

In [5]:
df.sort_values(by='creation_date', ascending=True)

Unnamed: 0,subject_id,asset_name,data_level,has_procedures,has_specimen_procedures,number_of_specimen_procedures,creation_date,specimen_procedures_from_service
55,716949,SmartSPIM_716949_2024-04-23_12-53-35_stitched_2024-04-24_15-46-00,derived,True,True,6,2024-04-23,False
54,716950,SmartSPIM_716950_2024-04-23_17-36-59,raw,True,True,6,2024-04-23,False
7,716950,SmartSPIM_716950_2024-04-23_17-36-59_stitched_2024-04-24_22-47-42,derived,True,True,6,2024-04-23,False
4,716949,SmartSPIM_716949_2024-04-23_12-53-35,raw,True,True,6,2024-04-23,False
53,716951,SmartSPIM_716951_2024-05-24_20-26-43,raw,True,True,6,2024-05-24,False
8,716951,SmartSPIM_716951_2024-05-24_20-26-43_stitched_2024-05-26_13-48-44,derived,True,True,6,2024-05-24,False
61,716948,SmartSPIM_716948_2024-05-24_15-47-29,raw,True,True,6,2024-05-24,False
62,716948,SmartSPIM_716948_2024-05-24_15-47-29_stitched_2024-05-26_09-48-20,derived,True,True,6,2024-05-24,False
60,716947,SmartSPIM_716947_2024-06-17_16-49-13,raw,True,True,6,2024-06-17,False
59,716947,SmartSPIM_716947_2024-06-17_16-49-13_stitched_2024-06-19_11-21-35,derived,True,True,6,2024-06-17,False


In [None]:
ok, i would like to create a new notebook to answer a different but related question. 

You previously helped me create some functions for reading specimen procedures from this excel document: /Users/doug.ollerenshaw/code/aind-workbench/projects/patchseq_metadata/DT_HM_TissueClearingTracking_.xlsx. Can you see those functions inside this folder?

For each of the 31 mice in our list, I want to create a table with the following information:
* subject_id
* collection_date (just date, not time)
* specimen_procedures_in_db - boolean that tells us if the specimen_procedures is populated (True) or an empty list (False),
* specimen_id_correct: boolean that tells us if the specimen_id in specimen_procedures matches the subject_id. Each individual procedure in specimen_procedures has a specimen_id field, so all must match for this to be True. It's False by default if specimen_procedures_in_db is False.
* lot_numbers_correct - a boolean that tells us if the lot numbers in all of the procedures match what would have been pulled from the spreadsheet if this was being populated from scratch. False if specimen_procedures_in_db is False. Otherwise, iterate through all procedures and all reagents in those procedures and compare with what exists in the DB.

And for more context, someone previously performed a 

In [None]:
df = analyze_smartspim_assets()

Querying AIND metadata database for SmartSPIM assets...
Found 66 SmartSPIM assets. Analyzing procedures...
Analysis complete. Found 66 total assets.
Subjects with SmartSPIM data: 31
Total subjects in PATCHSEQ list: 31
