<a href="https://colab.research.google.com/github/Palaeoprot/IPA/blob/main/fish2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PhASTM: Final Complete Restored Pipeline

## Phylogenetic Anchor-based Sequence-Trait Mapping Engine 🧬

This notebook implements the complete PhASTM pipeline with all advanced features, restored from the user-provided complete version. It includes:

- **Advanced Protein Classification**: Enhanced Collagen and Keratin analysis.
- **Nexyme Digestion Pipeline**: Parallel processing with comprehensive peptide mapping.
- **Product Numbering**: Sophisticated reference point calculation for collagens.
- **KAP Detection**: Keratin-Associated Protein classification.
- **Smart Caching & Checkpointing**: Robust system for saving and loading intermediate results.
- **Multi-source Data**: Integration of Ensembl, NCBI, and local FASTA files.

In [None]:
# === TESTING CONFIGURATION ===
RUN_COMPONENT_TESTS = True  # Set to False to skip individual tests
RUN_INTEGRATION_TEST = True  # Set to False to skip integration test
VERBOSE_TESTING = True      # Set to False for minimal output
Then in each test cell:
pythonif not RUN_COMPONENT_TESTS:
    print("⏭️ Component testing disabled")
else:
    # Run the actual test

In [8]:
# ===== Enhanced Cell 1: Complete Environment Setup =====
# Preserves all existing functionality and adds missing June 28th infrastructure

import os
import sys
from pathlib import Path

print("⏳ Setting up PhASTM environment...")

# === EXISTING: Install required packages (KEEP AS IS) ===
# Only install packages that aren't already in Colab
packages = [
    "biopython",
    "requests-cache",
    "tenacity"      # For robust API requests (missing in Colab)
]

for package in packages:
    os.system(f"pip install -q {package}")

print("✅ Required packages installed successfully.")

# === EXISTING: Mount Google Drive for Colab (KEEP AS IS) ===
try:
    from google.colab import drive
    drive.mount('/content/drive')
    print("✅ Google Drive mounted at /content/drive")
    IN_COLAB = True
except ImportError:
    print("⚠️ Not in Colab environment, assuming local file system.")
    IN_COLAB = False

# === NEW: Add June 28th Smart Path Management ===
print("🔧 Setting up smart path management...")

# Detect environment and set up paths
if IN_COLAB:
    # Running in Google Colab
    project_folder_name = "PhASTM-fish-net"
    WORKSPACE_ROOT = Path('/content/drive/MyDrive/Colab_Notebooks/GitHub/')
    PROJECT_DIR = Path('/content/')
    output_subfolder_name = project_folder_name
else:
    # Running locally
    PROJECT_DIR = Path.cwd()
    WORKSPACE_ROOT = PROJECT_DIR.parent
    output_subfolder_name = PROJECT_DIR.name

# Build shared data and output paths
SHARED_DATA_DIR = WORKSPACE_ROOT / '_SHARED_DATA'
FASTA_DIR = SHARED_DATA_DIR / 'FASTA'
DICTIONARIES_DIR = SHARED_DATA_DIR / 'DICTIONARIES'
OUTPUT_DIR = SHARED_DATA_DIR / 'outputs' / output_subfolder_name

# Create output directory if it doesn't exist
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Global variables for PhASTMConfig compatibility
global SHARED_DATA_BASE_PATH, OUTPUT_BASE_PATH_PREFIX
SHARED_DATA_BASE_PATH = str(SHARED_DATA_DIR)
OUTPUT_BASE_PATH_PREFIX = str(OUTPUT_DIR)

# === NEW: Add June 28th Robust API Request Handling ===
import requests
import requests_cache
from tenacity import retry, stop_after_attempt, wait_exponential
import time

# Set up API caching
requests_cache.install_cache('phastm_api_cache', expire_after=86400)  # 24 hours
print("✅ API caching enabled (24-hour expiry)")

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=10))
def robust_request(url, headers=None, timeout=30):
    """
    Robust API request function with retry logic and rate limiting.
    From June 28th version - essential for Ensembl API reliability.
    """
    if headers is None:
        headers = {"Content-Type": "application/json"}

    try:
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()

        # Check if we got JSON back
        try:
            return response.json()
        except ValueError:
            print(f"Warning: Non-JSON response from {url}")
            return response.text

    except requests.exceptions.RequestException as e:
        print(f"Request failed for {url}: {e}")
        raise

print("✅ Robust API request handling configured")

# === NEW: Set up NCBI Entrez (required for June 28th NCBI integration) ===
try:
    from Bio import Entrez
    # Will be set later in configuration, but initialize here
    print("✅ NCBI Entrez (BioPython) available for sequence fetching")
except ImportError:
    print("⚠️ BioPython Entrez not available - NCBI features will be disabled")

# === NEW: Initialize logging for debugging ===
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# === VERIFICATION AND SUMMARY ===
def check_path(path_to_check):
    """Helper function to verify path existence"""
    return "✅ Exists" if path_to_check.exists() else "❌ NOT FOUND"

print("\n" + "="*50)
print("📋 ENVIRONMENT SETUP SUMMARY")
print("="*50)
print(f"🖥️  Environment: {'Google Colab' if IN_COLAB else 'Local'}")
print(f"📁 Workspace Root: {WORKSPACE_ROOT} ({check_path(WORKSPACE_ROOT)})")
print(f"📊 Shared Data: {SHARED_DATA_DIR} ({check_path(SHARED_DATA_DIR)})")
print(f"🧬 FASTA Directory: {FASTA_DIR} ({check_path(FASTA_DIR)})")
print(f"📚 Dictionaries: {DICTIONARIES_DIR} ({check_path(DICTIONARIES_DIR)})")
print(f"💾 Output Directory: {OUTPUT_DIR} ({check_path(OUTPUT_DIR)})")
print("="*50)

# Verify critical directories exist
required_dirs = [SHARED_DATA_DIR, FASTA_DIR, DICTIONARIES_DIR]
missing_dirs = [d for d in required_dirs if not d.exists()]

if missing_dirs:
    print("⚠️  WARNING: Some required directories are missing:")
    for missing_dir in missing_dirs:
        print(f"   ❌ {missing_dir}")
    print("   Please ensure your _SHARED_DATA structure is set up correctly.")
else:
    print("✅ All required directories found!")

print("\n🚀 PhASTM environment setup complete!")
print("📝 Next: Run Cell 2 to configure your analysis parameters")

⏳ Setting up PhASTM environment...
✅ Required packages installed successfully.
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Google Drive mounted at /content/drive
🔧 Setting up smart path management...
✅ API caching enabled (24-hour expiry)
✅ Robust API request handling configured
✅ NCBI Entrez (BioPython) available for sequence fetching

📋 ENVIRONMENT SETUP SUMMARY
🖥️  Environment: Google Colab
📁 Workspace Root: /content/drive/MyDrive/Colab_Notebooks/GitHub (✅ Exists)
📊 Shared Data: /content/drive/MyDrive/Colab_Notebooks/GitHub/_SHARED_DATA (✅ Exists)
🧬 FASTA Directory: /content/drive/MyDrive/Colab_Notebooks/GitHub/_SHARED_DATA/FASTA (✅ Exists)
📚 Dictionaries: /content/drive/MyDrive/Colab_Notebooks/GitHub/_SHARED_DATA/DICTIONARIES (✅ Exists)
💾 Output Directory: /content/drive/MyDrive/Colab_Notebooks/GitHub/_SHARED_DATA/outputs/PhASTM-fish-net (✅ Exists)
✅ All required directories found!

🚀 PhASTM envi

# Cell 2: Configuration Parameters

## User-Configurable Settings

Modify these parameters to customize your analysis. This cell uses Colab's parameter widgets for easy adjustment.

In [9]:
# ===== Cell 2: Ultimate Configuration Parameters =====
# Combines the best features from all versions (fish2, June 28th, June 29th)

print("--- Cell 2: PhASTM Configuration & Initialization ---")

# =============================================================================
# @title ## 🧬 Core Analysis Configuration
# =============================================================================

# @markdown ### Primary Analysis Parameters
# @markdown Set the gene family to analyze (Collagen or Keratin). This determines protein class specialization and data sources.
target_gene_family = "Collagen" #@param ["Collagen", "Keratin"]

# @markdown Choose a reference tissue type for analysis and checkpoint naming.
reference_tissue = "Bone" #@param {type:"string"}

# @markdown Your email address for NCBI Entrez access (required for NCBI sequence fetching).
entrez_email = "matthew@palaeome.org" #@param {type:"string"}

# =============================================================================
# @title ## ⚗️ Enzyme Digestion Parameters
# =============================================================================

# @markdown Select the primary enzyme for in-silico digestion.
enzyme = "trypsin" #@param ["trypsin", "chymotrypsin", "pepsin", "lys-c", "arg-c", "asp-n", "glu-c"]

# @markdown Maximum number of missed cleavages to allow during digestion.
missed_cleavages = 2 #@param {type:"integer"}

# @markdown Minimum peptide length (amino acids) to include in results.
min_peptide_length = 6 #@param {type:"integer"}

# @markdown Maximum peptide length (amino acids) to include in results.
max_peptide_length = 35 #@param {type:"integer"}

# @markdown Convert Isoleucine (I) and Leucine (L) to a common representation for mass spec compatibility.
convert_il_to_b = True #@param {type:"boolean"}

# =============================================================================
# @title ## 🌐 Data Sources & External Integration
# =============================================================================

# @markdown Include sequences from NCBI NR that are not in the initial dataset (expands species coverage).
include_ncbi_sequences = True #@param {type:"boolean"}

# @markdown Maximum number of NCBI NR sequences to fetch per gene query.
max_ncbi_sequences = 500 #@param {type:"integer"}

# @markdown Force update of UniProt architecture cache (set True to refresh cached protein architecture data).
force_update_architecture_cache = False #@param {type:"boolean"}

# =============================================================================
# @title ## 🖥️ Performance & Parallel Processing
# =============================================================================

# @markdown Enable parallel processing for computationally intensive tasks.
enable_parallel_processing = True #@param {type:"boolean"}

# @markdown Number of worker processes for parallel digestion and analysis.
num_workers = 4 #@param {type:"integer"}

# =============================================================================
# @title ## 💾 Checkpointing & Data Management
# =============================================================================

# @markdown Enable checkpointing to save intermediate results (recommended for large analyses).
enable_checkpointing = True #@param {type:"boolean"}

# @markdown Force recomputation of all steps, ignoring existing checkpoints.
force_recompute = False #@param {type:"boolean"}

# =============================================================================
# @title ## 🔬 Model Species Configuration
# =============================================================================

# @markdown Use the comprehensive model species list (recommended) or custom list below.
use_full_model_species_list = True #@param {type:"boolean"}

# @markdown Custom model species list (used only if full list is disabled above).
model_species_custom = ["Homo_sapiens", "Mus_musculus", "Gallus_gallus", "Danio_rerio"] #@param

# =============================================================================
# @title ## 📊 Advanced Analysis Parameters
# =============================================================================

# @markdown Minimum length for peptides to be displayed in conservation analysis.
min_conserved_peptide_length_display = 6 #@param {type:"integer"}

# @markdown Minimum anchor peptide length for phylogenetic anchoring.
min_anchor_peptide_length_num = 6 #@param {type:"integer"}

# @markdown Maximum anchor peptide length for phylogenetic anchoring.
max_anchor_peptide_length_num = 35 #@param {type:"integer"}

# @markdown Minimum proportion of Glycine at third position for Gly-X-Y repeat detection (Collagen-specific).
min_gly_at_third_pos_proportion = 0.5 #@param {type:"slider", min:0.0, max:1.0, step:0.1}

# @markdown Number of top anchor candidates to report in analysis.
top_n_anchor_candidates_report = 10 #@param {type:"integer"}

# @markdown Reference species for anchoring analysis.
reference_species_for_anchoring = "Homo_sapiens" #@param {type:"string"}

# =============================================================================
# PARAMETER PROCESSING & VALIDATION
# =============================================================================

# Define the comprehensive model species list (from June 28th)
_full_model_species_list = [
    "Homo_sapiens",           # Human
    "Mus_musculus",           # Mouse
    "Rattus_norvegicus",      # Rat
    "Gallus_gallus",          # Chicken
    "Danio_rerio",            # Zebrafish
    "Canis_lupus_familiaris", # Dog
    "Sus_scrofa",             # Pig
    "Ovis_aries",             # Sheep
    "Bos_taurus",             # Cattle
    "Petromyzon_marinus",     # Sea lamprey
    "Callorhinchus_milii",    # Elephant shark
    "Anolis_carolinensis",    # Green anole lizard
    "Xenopus_tropicalis",     # Western clawed frog
    "Sphenodon_punctatus"     # Tuatara
]

# Determine final model species list
final_model_species_list = _full_model_species_list if use_full_model_species_list else model_species_custom

# Parameter mapping for PhASTMConfig compatibility
workspace_root = str(WORKSPACE_ROOT)  # From Cell 1's path setup
additional_fasta = None               # Auto-handled by smart path system

# Rename parameters to match PhASTMConfig expectations
min_len = min_peptide_length
max_len = max_peptide_length
include_ncbi_nr_novel_species = include_ncbi_sequences
max_ncbi_nr_sequences = max_ncbi_sequences
enable_parallel_digestion = enable_parallel_processing
num_digestion_workers = num_workers
enable_parallel_mrca = enable_parallel_processing
num_mrca_workers = num_workers
force_recompute_checkpoints = force_recompute
model_species = final_model_species_list

# Validate email address
if not entrez_email or entrez_email == "your.email@example.com":
    print("⚠️  WARNING: Please update entrez_email with your actual email address")
    print("   NCBI requires a valid email for sequence fetching")

# REMOVE OLD HARDCODED PATHS - Use smart path system instead
# The following old fish2.ipynb paths are NO LONGER USED:
# WORKSPACE_ROOT = "/content/drive/MyDrive/PhASTM_Workspace"  # ❌ OLD
# SHARED_DATA_PATH = "/content/drive/MyDrive/PhASTM_Data"     # ❌ OLD
# ADDITIONAL_FASTA = "/content/drive/MyDrive/PhASTM_Data/ClassiCOL_LBFG_Edited.fasta"  # ❌ OLD

# =============================================================================
# SUMMARY & VALIDATION
# =============================================================================

print("\n" + "="*60)
print("📋 CONFIGURATION SUMMARY")
print("="*60)
print(f"🧬 Target Gene Family: {target_gene_family}")
print(f"🦴 Reference Tissue: {reference_tissue}")
print(f"⚗️  Primary Enzyme: {enzyme}")
print(f"📧 Entrez Email: {'✅ Set' if entrez_email != 'your.email@example.com' else '❌ UPDATE REQUIRED'}")
print(f"🔢 Peptide Length Range: {min_len}-{max_len} amino acids")
print(f"🌐 NCBI Integration: {'✅ Enabled' if include_ncbi_nr_novel_species else '❌ Disabled'}")
print(f"🖥️  Parallel Processing: {'✅ Enabled' if enable_parallel_processing else '❌ Disabled'} ({num_workers} workers)")
print(f"💾 Checkpointing: {'✅ Enabled' if enable_checkpointing else '❌ Disabled'}")
print(f"🔬 Model Species: {'Full list' if use_full_model_species_list else 'Custom list'} ({len(final_model_species_list)} species)")
print(f"📁 Workspace: {workspace_root}")
print("="*60)

# Final validation
config_ready = True
if entrez_email == "your.email@example.com":
    config_ready = False
    print("❌ Configuration incomplete: Update entrez_email")

if not WORKSPACE_ROOT.exists():
    config_ready = False
    print("❌ Configuration incomplete: Workspace directory not found")

if config_ready:
    print("✅ Configuration complete and ready for PhASTMConfig initialization!")
else:
    print("⚠️  Please fix the issues above before proceeding to Cell 3")

print(f"\n📝 Next: Run Cell 3 to create PhASTMConfig object with these parameters")

--- Cell 2: PhASTM Configuration & Initialization ---

📋 CONFIGURATION SUMMARY
🧬 Target Gene Family: Collagen
🦴 Reference Tissue: Bone
⚗️  Primary Enzyme: trypsin
📧 Entrez Email: ✅ Set
🔢 Peptide Length Range: 6-35 amino acids
🌐 NCBI Integration: ✅ Enabled
🖥️  Parallel Processing: ✅ Enabled (4 workers)
💾 Checkpointing: ✅ Enabled
🔬 Model Species: Full list (14 species)
📁 Workspace: /content/drive/MyDrive/Colab_Notebooks/GitHub
✅ Configuration complete and ready for PhASTMConfig initialization!

📝 Next: Run Cell 3 to create PhASTMConfig object with these parameters


## Cell 3: Core Configuration Class

### PhASTMConfig Class Definition

This class centralizes all configuration parameters and provides advanced features like robust API requests, checkpointing, and data validation.

In [10]:
# ===== Cell 3: COMPLETE PhASTMConfig Class Definition =====
# Full restoration of ALL June 28th functionality - nothing stripped out!

import datetime
import os
import re
import json
from pathlib import Path
import requests
import requests_cache
import pandas as pd
import pickle
from typing import Dict, List, Any, Optional, Tuple
from collections import defaultdict
from tenacity import retry, wait_exponential, stop_after_attempt
import time

print("🔧 Defining COMPLETE PhASTMConfig Class with ALL June 28th functionality...")

class PhASTMConfig:
    """
    COMPLETE PhASTM Configuration Class with ALL June 28th functionality restored.

    This is the full implementation that was working on June 28th, with no functionality removed.
    Includes: enzyme rules, API handling, path management, validation, caching, and more.
    """

    def __init__(self,
                 workspace_root: str,
                 additional_fasta: Optional[str],
                 target_gene_family: str,
                 reference_tissue: str,
                 include_ncbi_nr_novel_species: bool,
                 max_ncbi_nr_sequences: int,
                 enzyme: str,
                 min_len: int,
                 max_len: int,
                 missed_cleavages: int,
                 convert_il_to_b: bool,
                 min_conserved_peptide_length_display: int,
                 model_species: List[str],
                 force_update_architecture_cache: bool,
                 enable_parallel_digestion: bool,
                 num_digestion_workers: int,
                 enable_parallel_mrca: bool,
                 num_mrca_workers: int,
                 enable_checkpointing: bool,
                 force_recompute_checkpoints: bool,
                 entrez_email: str,
                 use_full_model_species_list: bool = True,
                 min_anchor_peptide_length_num: int = 6,
                 max_anchor_peptide_length_num: int = 35,
                 min_gly_at_third_pos_proportion: float = 0.5,
                 top_n_anchor_candidates_report: int = 10,
                 reference_species_for_anchoring: str = "Homo_sapiens",
                 ensembl_server: str = "https://rest.ensembl.org",
                 ensembl_headers: Dict[str, str] = None,
                 api_cache_name: str = "ensembl_api_cache",
                 api_cache_expire_after: int = 60 * 60 * 24):

        print(f"🚀 Initializing COMPLETE PhASTMConfig for: {target_gene_family}")

        # === CORE CONFIGURATION ===
        self.target_gene_family = target_gene_family
        self.reference_tissue = reference_tissue
        self.enzyme = enzyme
        self.entrez_email = entrez_email

        # === PATH MANAGEMENT FROM CELL 1 ===
        try:
            global SHARED_DATA_BASE_PATH, OUTPUT_BASE_PATH_PREFIX
            self.shared_data_base_path = Path(SHARED_DATA_BASE_PATH)
            self.output_base_path_prefix = Path(OUTPUT_BASE_PATH_PREFIX)
            print(f"✅ Base paths loaded successfully")
        except NameError:
            raise Exception("❌ SHARED_DATA_BASE_PATH not defined. Please run Cell 1 first.")

        # === GENE FAMILY DIRECTORY MAPPING ===
        if self.target_gene_family.upper() in ["COLLAGEN", "COLLAGENS"] or self.target_gene_family.upper().startswith("COL"):
            self.gene_family_dir_name = "collagens"
        elif self.target_gene_family.upper() in ["KERATIN", "KERATINS"] or self.target_gene_family.upper().startswith("KRT"):
            self.gene_family_dir_name = "keratins"
        else:
            if self.target_gene_family.upper().startswith("COL"):
                self.gene_family_dir_name = "collagens"
            elif self.target_gene_family.upper().startswith("KRT"):
                self.gene_family_dir_name = "keratins"
            else:
                self.gene_family_dir_name = self.target_gene_family.lower()

        print(f"📁 Gene family directory: {self.gene_family_dir_name}")

        # === ALL FILE PATHS ===
        self.gene_family_data_path = self.shared_data_base_path / "DICTIONARIES" / "GeneFamily" / self.gene_family_dir_name
        self.gene_tree_folder_path = self.gene_family_data_path / "GeneTree"

        # Multiple path formats support
        self.tissues_json_path = self.gene_family_data_path / f"{self.gene_family_dir_name.upper()}_TISSUES.json"
        self.architecture_json_path = self.gene_family_data_path / f"{self.gene_family_dir_name.upper()}_ARCHITECTURE.json"
        self.alt_tissues_json_path = self.gene_family_data_path / f"{self.target_gene_family.upper()}_TISSUES.json"
        self.alt_architecture_json_path = self.gene_family_data_path / f"{self.target_gene_family.upper()}_ARCHITECTURE.json"

        # Taxonomy and FASTA paths
        self.taxonomy_base_path = self.shared_data_base_path / "FASTA" / "TaxonomyCache"
        self.newick_tree_path = self.taxonomy_base_path / "specieslist.nwk"
        self.taxonomy_cache_path = self.taxonomy_base_path / "taxonomy_cache.json"
        self.additional_collagen_fasta_path = self.shared_data_base_path / "FASTA" / "ClassiCOL" / f"ClassiCOL_mjc_{self.target_gene_family.upper()}.fasta"
        self.alt_additional_fasta_path = self.shared_data_base_path / "FASTA" / "ClassiCOL" / f"ClassiCOL_mjc.fasta"

        # === OUTPUT AND CHECKPOINT PATHS ===
        self.timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        self.output_base_path = self.output_base_path_prefix / f"{self.target_gene_family}_{self.reference_tissue}_{self.timestamp}"
        self.checkpoint_base_path = self.output_base_path / "checkpoints"

        # Create directories
        self.output_base_path.mkdir(parents=True, exist_ok=True)
        self.checkpoint_base_path.mkdir(parents=True, exist_ok=True)

        # === STORE ALL PARAMETERS ===
        self.include_ncbi_nr_novel_species = include_ncbi_nr_novel_species
        self.max_ncbi_nr_sequences = max_ncbi_nr_sequences
        self.min_len = min_len
        self.max_len = max_len
        self.missed_cleavages = missed_cleavages
        self.convert_il_to_b = convert_il_to_b
        self.min_conserved_peptide_length_display = min_conserved_peptide_length_display
        self.model_species = model_species
        self.force_update_architecture_cache = force_update_architecture_cache
        self.enable_parallel_digestion = enable_parallel_digestion
        self.num_digestion_workers = num_digestion_workers
        self.enable_parallel_mrca = enable_parallel_mrca
        self.num_mrca_workers = num_mrca_workers
        self.enable_checkpointing = enable_checkpointing
        self.force_recompute_checkpoints = force_recompute_checkpoints
        self.use_full_model_species_list = use_full_model_species_list
        self.min_anchor_peptide_length_num = min_anchor_peptide_length_num
        self.max_anchor_peptide_length_num = max_anchor_peptide_length_num
        self.min_gly_at_third_pos_proportion = min_gly_at_third_pos_proportion
        self.top_n_anchor_candidates_report = top_n_anchor_candidates_report
        self.reference_species_for_anchoring = reference_species_for_anchoring

        # === COMPLETE ENZYME RULES (June 28th) ===
        self.enzyme_rules = {
            "trypsin": {
                "sequence": "GRGRGRGKPK",
                "cleavage_rule": r"(?<=[KR])(?!P)",
                "cut_after": True,
                "site_residues": ["K", "R"],
                "description": "Cleaves after K or R, not before P",
                "ph_optimum": 8.0,
                "temperature_optimum": 37,
                "molecular_weight": 23300
            },
            "trypsin_nop": {
                "sequence": "GRGRGRGKPK",
                "cleavage_rule": r"(?<=[KR])",
                "cut_after": True,
                "site_residues": ["K", "R"],
                "description": "Cleaves after K or R (including before P)",
                "ph_optimum": 8.0,
                "temperature_optimum": 37,
                "molecular_weight": 23300
            },
            "chymotrypsin": {
                "sequence": "FWYFWYFWY",
                "cleavage_rule": r"(?<=[FWY])(?!P)",
                "cut_after": True,
                "site_residues": ["F", "W", "Y"],
                "description": "Cleaves after F, W, or Y, not before P",
                "ph_optimum": 8.0,
                "temperature_optimum": 37,
                "molecular_weight": 25000
            },
            "pepsin": {
                "sequence": "AFLFAFLFTLA",
                "cleavage_rule": r"(?<=[FL])",
                "cut_after": True,
                "site_residues": ["F", "L"],
                "description": "Cleaves after F or L",
                "ph_optimum": 2.0,
                "temperature_optimum": 37,
                "molecular_weight": 34500
            },
            "pepsin_bovine": {
                "sequence": "AFLFAFLFTLA",
                "cleavage_rule": r"(?<=[FL])",
                "cut_after": True,
                "site_residues": ["F", "L"],
                "description": "Bovine pepsin - cleaves after F or L",
                "ph_optimum": 2.0,
                "temperature_optimum": 37,
                "molecular_weight": 34500
            },
            "elastase": {
                "sequence": "ALIVALIVALI",
                "cleavage_rule": r"(?<=[ALIV])",
                "cut_after": True,
                "site_residues": ["A", "L", "I", "V"],
                "description": "Cleaves after small, uncharged residues",
                "ph_optimum": 8.0,
                "temperature_optimum": 37,
                "molecular_weight": 25900
            },
            "thermolysin": {
                "sequence": "AILMFVAILMFV",
                "cleavage_rule": r"(?=[AILMFV])",
                "cut_after": False,
                "site_residues": ["A", "I", "L", "M", "F", "V"],
                "description": "Cleaves before hydrophobic residues",
                "ph_optimum": 7.0,
                "temperature_optimum": 70,
                "molecular_weight": 34600
            },
            "cnbr": {
                "sequence": "MMMMMMMM",
                "cleavage_rule": r"(?<=M)",
                "cut_after": True,
                "site_residues": ["M"],
                "description": "Chemical cleavage after methionine",
                "ph_optimum": 7.0,
                "temperature_optimum": 25,
                "molecular_weight": None
            },
            "lys-c": {
                "sequence": "KKKKK",
                "cleavage_rule": r"(?<=K)",
                "cut_after": True,
                "site_residues": ["K"],
                "description": "Cleaves after lysine",
                "ph_optimum": 8.5,
                "temperature_optimum": 37,
                "molecular_weight": 24000
            },
            "arg-c": {
                "sequence": "RRRRR",
                "cleavage_rule": r"(?<=R)",
                "cut_after": True,
                "site_residues": ["R"],
                "description": "Cleaves after arginine",
                "ph_optimum": 8.0,
                "temperature_optimum": 37,
                "molecular_weight": 24000
            },
            "asp-n": {
                "sequence": "DDDDDEEEE",
                "cleavage_rule": r"(?=[DE])",
                "cut_after": False,
                "site_residues": ["D", "E"],
                "description": "Cleaves before aspartic acid and glutamic acid",
                "ph_optimum": 8.0,
                "temperature_optimum": 37,
                "molecular_weight": 33000
            },
            "glu-c": {
                "sequence": "EEEEEEEEE",
                "cleavage_rule": r"(?<=E)",
                "cut_after": True,
                "site_residues": ["E"],
                "description": "Cleaves after glutamic acid",
                "ph_optimum": 7.8,
                "temperature_optimum": 37,
                "molecular_weight": 27000
            },
            "staphylococcal_proteinase": {
                "sequence": "EEEEE",
                "cleavage_rule": r"(?<=E)",
                "cut_after": True,
                "site_residues": ["E"],
                "description": "Staphylococcal proteinase - cleaves after glutamic acid",
                "ph_optimum": 7.5,
                "temperature_optimum": 37,
                "molecular_weight": 27000
            }
        }

        print(f"⚗️ Loaded {len(self.enzyme_rules)} complete enzyme definitions")

        # === API CONFIGURATION ===
        self.ensembl_server = ensembl_server
        self.ensembl_headers = ensembl_headers or {'Content-Type': 'application/json'}
        self.api_cache_name = api_cache_name
        self.api_cache_expire_after = api_cache_expire_after

        # Set up API caching
        requests_cache.install_cache(
            self.api_cache_name,
            expire_after=datetime.timedelta(seconds=self.api_cache_expire_after)
        )

        # === FULL MODEL SPECIES LIST (June 28th) ===
        self.full_model_species_list = [
            "Homo_sapiens", "Pan_troglodytes", "Gorilla_gorilla", "Pongo_abelii",
            "Macaca_mulatta", "Papio_anubis", "Chlorocebus_sabaeus", "Callithrix_jacchus",
            "Mus_musculus", "Rattus_norvegicus", "Cricetulus_griseus", "Mesocricetus_auratus",
            "Peromyscus_maniculatus", "Microtus_ochrogaster", "Nannospalax_galili",
            "Cavia_porcellus", "Chinchilla_lanigera", "Octodon_degus", "Spermophilus_dauricus",
            "Marmota_marmota", "Ictidomys_tridecemlineatus", "Jaculus_jaculus",
            "Castor_canadensis", "Mus_caroli", "Mus_pahari", "Mus_spretus",
            "Bos_taurus", "Bubalus_bubalis", "Bison_bison", "Capra_hircus",
            "Ovis_aries", "Pantholops_hodgsonii", "Odocoileus_virginianus",
            "Cervus_hanglu", "Sus_scrofa", "Equus_caballus", "Equus_asinus",
            "Rhinolophus_ferrumequinum", "Pteropus_vampyrus", "Myotis_lucifugus",
            "Eptesicus_fuscus", "Miniopterus_natalensis", "Canis_lupus",
            "Canis_lupus_familiaris", "Vulpes_vulpes", "Mustela_putorius_furo",
            "Ailuropoda_melanoleuca", "Ursus_maritimus", "Odobenus_rosmarus",
            "Leptonychotes_weddellii", "Felis_catus", "Panthera_leo",
            "Panthera_tigris", "Acinonyx_jubatus", "Lynx_canadensis",
            "Gallus_gallus", "Meleagris_gallopavo", "Anas_platyrhynchos",
            "Taeniopygia_guttata", "Geospiza_fortis", "Pseudopodoces_humilis",
            "Ficedula_albicollis", "Falco_cherrug", "Falco_peregrinus",
            "Aquila_chrysaetos", "Haliaeetus_leucocephalus", "Tyto_alba",
            "Pelecanus_crispus", "Aptenodytes_forsteri", "Pygoscelis_adeliae",
            "Fulmarus_glacialis", "Nipponia_nippon", "Balearica_regulorum",
            "Grus_japonensis", "Charadrius_vociferus", "Cuculus_canorus",
            "Mesitornis_unicolor", "Tauraco_erythrolophus", "Opisthocomus_hoazin",
            "Chlamydotis_macqueenii", "Eurypyga_helias", "Phaethon_lepturus",
            "Pterocles_gutturalis", "Columba_livia", "Podiceps_cristatus",
            "Phoenicopterus_ruber", "Egretta_garzetta", "Pelecanus_occidentalis",
            "Cathartes_aura", "Cariama_cristata", "Chaetura_pelagica",
            "Calypte_anna", "Buceros_rhinoceros", "Merops_nubicus",
            "Coracias_garrulus", "Leptosomus_discolor", "Trogon_mexicanus",
            "Colius_striatus", "Picoides_pubescens", "Nestor_notabilis",
            "Manacus_vitellinus", "Corvus_brachyrhynchos", "Danio_rerio",
            "Astyanax_mexicanus", "Pygocentrus_nattereri", "Electrophorus_electricus",
            "Ictalurus_punctatus", "Clarias_gariepinus", "Silurus_meridionalis",
            "Anguilla_anguilla", "Anguilla_japonica", "Esox_lucius",
            "Galaxias_maculatus", "Salmo_salar", "Oncorhynchus_mykiss",
            "Gadus_morhua", "Poecilia_formosa", "Poecilia_latipinna",
            "Poecilia_mexicana", "Poecilia_reticulata", "Xiphophorus_maculatus",
            "Oryzias_latipes", "Oryzias_melastigma", "Gasterosteus_aculeatus",
            "Pungitius_pungitius", "Hippocampus_comes", "Syngnathus_scovelli",
            "Oreochromis_niloticus", "Neolamprologus_brichardi", "Maylandia_zebra",
            "Pundamilia_nyererei", "Haplochromis_burtoni", "Astatotilapia_calliptera",
            "Cynoglossus_semilaevis", "Scophthalmus_maximus", "Paralichthys_olivaceus",
            "Solea_senegalensis", "Monopterus_albus", "Takifugu_rubripes",
            "Tetraodon_nigroviridis", "Mola_mola", "Dicentrarchus_labrax",
            "Lates_calcarifer", "Sander_lucioperca", "Larimichthys_crocea",
            "Stegastes_partitus", "Amphilophus_citrinellus", "Amatitlania_nigrofasciata",
            "Xenopus_tropicalis", "Nanorana_parkeri", "Rana_temporaria",
            "Rhinatrema_bivittatum", "Microcaecilia_unicolor"
        ]

        # Use full list if requested
        if self.use_full_model_species_list:
            self.model_species = self.full_model_species_list

        print(f"🐾 Model species configured: {len(self.model_species)} species")

        print("✅ COMPLETE PhASTMConfig initialization finished!")
        self._validate_critical_paths()

    def _validate_critical_paths(self):
        """Validate that critical paths exist and provide helpful feedback."""
        print("🔍 Validating critical paths...")

        if not self.gene_family_data_path.exists():
            print(f"⚠️ Gene family data directory not found: {self.gene_family_data_path}")
        else:
            print(f"✅ Gene family data directory: {self.gene_family_data_path}")

        # Check tissues JSON (try both formats)
        tissues_path = None
        if self.tissues_json_path.exists():
            tissues_path = self.tissues_json_path
        elif self.alt_tissues_json_path.exists():
            tissues_path = self.alt_tissues_json_path
            self.tissues_json_path = self.alt_tissues_json_path

        if tissues_path:
            print(f"✅ Tissues JSON found: {tissues_path}")
        else:
            print(f"⚠️ Tissues JSON not found. Tried:")
            print(f"   - {self.tissues_json_path}")
            print(f"   - {self.alt_tissues_json_path}")

        # Check additional FASTA
        fasta_path = None
        if self.additional_collagen_fasta_path.exists():
            fasta_path = self.additional_collagen_fasta_path
        elif self.alt_additional_fasta_path.exists():
            fasta_path = self.alt_additional_fasta_path
            self.additional_collagen_fasta_path = self.alt_additional_fasta_path

        if fasta_path:
            print(f"✅ Additional FASTA found: {fasta_path}")
        else:
            print(f"⚠️ Additional FASTA not found")

    def get_tissues_json_data(self) -> Dict[str, List[str]]:
        """Load and return tissues JSON data."""
        if self.tissues_json_path.exists():
            try:
                with open(self.tissues_json_path, 'r') as f:
                    return json.load(f)
            except Exception as e:
                print(f"❌ Error loading tissues JSON: {e}")
                return {}
        else:
            return {}

    def get_target_genes_for_tissue(self) -> List[str]:
        """Get target genes for the specified tissue."""
        tissues_data = self.get_tissues_json_data()

        if self.reference_tissue in tissues_data:
            genes = tissues_data[self.reference_tissue]
            print(f"🎯 Found {len(genes)} genes for tissue '{self.reference_tissue}': {genes}")
            return genes
        else:
            if self.gene_family_dir_name == "collagens":
                fallback = ["COL1A1", "COL1A2", "COL2A1", "COL3A1"]
            elif self.gene_family_dir_name == "keratins":
                fallback = ["KRT1", "KRT5", "KRT10", "KRT14"]
            else:
                fallback = [self.target_gene_family]

            print(f"📋 Using fallback genes: {fallback}")
            return fallback

    def get_enzyme_properties(self, enzyme_name: str) -> Optional[Dict[str, Any]]:
        """Get complete properties for a specific enzyme."""
        enzyme_name_lower = enzyme_name.lower()
        if enzyme_name_lower in self.enzyme_rules:
            rules = self.enzyme_rules[enzyme_name_lower].copy()
            # Add sequence if missing
            if "sequence" not in rules:
                if enzyme_name.lower() in ["trypsin", "trypsin_nop"]:
                    rules["sequence"] = "GRGRGRGKPK"
                elif enzyme_name.lower() == "lys-c":
                    rules["sequence"] = "KKKKK"
                elif enzyme_name.lower() == "pepsin_bovine":
                    rules["sequence"] = "AFLFAFLFTLA"
                elif enzyme_name.lower() == "staphylococcal_proteinase":
                    rules["sequence"] = "EEEEE"
                elif enzyme_name.lower() == "chymotrypsin":
                    rules["sequence"] = "FWYFWYFWY"
            return rules
        return None

    @retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3))
    def make_api_request(self, url: str, params: Dict = None) -> Optional[Dict]:
        """Make robust API request with retries and error handling."""
        try:
            response = requests.get(url, params=params, headers=self.ensembl_headers, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"❌ API request failed: {url} - {e}")
            raise
        except json.JSONDecodeError as e:
            print(f"❌ JSON decode error: {e}")
            raise

    def save_checkpoint(self, data: Any, checkpoint_name: str) -> bool:
        """Save checkpoint data."""
        if not self.enable_checkpointing:
            return False

        try:
            checkpoint_path = self.checkpoint_base_path / f"{checkpoint_name}.pkl"
            with open(checkpoint_path, 'wb') as f:
                pickle.dump(data, f)
            print(f"💾 Saved checkpoint: {checkpoint_name}")
            return True
        except Exception as e:
            print(f"❌ Failed to save checkpoint {checkpoint_name}: {e}")
            return False

    def load_checkpoint(self, checkpoint_name: str) -> Optional[Any]:
        """Load checkpoint data."""
        if not self.enable_checkpointing or self.force_recompute_checkpoints:
            return None

        try:
            checkpoint_path = self.checkpoint_base_path / f"{checkpoint_name}.pkl"
            if checkpoint_path.exists():
                with open(checkpoint_path, 'rb') as f:
                    data = pickle.load(f)
                print(f"📂 Loaded checkpoint: {checkpoint_name}")
                return data
        except Exception as e:
            print(f"❌ Failed to load checkpoint {checkpoint_name}: {e}")

        return None

print("✅ COMPLETE PhASTMConfig class defined with ALL June 28th functionality!")
print("🔧 Features restored:")
print("   - Complete enzyme rules (12 enzymes with full properties)")
print("   - Robust API request handling with retries")
print("   - Comprehensive path management")
print("   - Full model species list (100+ species)")
print("   - Checkpointing system")
print("   - Path validation and fallbacks")
print("   - All configuration parameters")
print("-" * 60)

🔧 Defining COMPLETE PhASTMConfig Class with ALL June 28th functionality...
✅ COMPLETE PhASTMConfig class defined with ALL June 28th functionality!
🔧 Features restored:
   - Complete enzyme rules (12 enzymes with full properties)
   - Robust API request handling with retries
   - Comprehensive path management
   - Full model species list (100+ species)
   - Checkpointing system
   - Path validation and fallbacks
   - All configuration parameters
------------------------------------------------------------


In [11]:
# ===== Testing Current fish2.ipynb Cells 1-3 =====
# Run this to verify the current implementation works correctly

print("🧪 Testing current fish2.ipynb implementation...")
print("=" * 60)

# Test 1: Check if Cell 1 variables are available
print("TEST 1: Cell 1 Path Variables")
try:
    print(f"✅ SHARED_DATA_BASE_PATH: {SHARED_DATA_BASE_PATH}")
    print(f"✅ OUTPUT_BASE_PATH_PREFIX: {OUTPUT_BASE_PATH_PREFIX}")
    print(f"✅ IN_COLAB: {IN_COLAB}")
    cell1_success = True
except NameError as e:
    print(f"❌ Cell 1 variables missing: {e}")
    cell1_success = False

print("-" * 40)

# Test 2: Check if Cell 2 parameters are available
print("TEST 2: Cell 2 Configuration Parameters")
try:
    test_params = [
        ('target_gene_family', target_gene_family),
        ('reference_tissue', reference_tissue),
        ('enzyme', enzyme),
        ('entrez_email', entrez_email),
        ('enable_checkpointing', enable_checkpointing),
        ('model_species', model_species)
    ]

    for param_name, param_value in test_params:
        print(f"✅ {param_name}: {param_value}") # Corrected indentation
    cell2_success = True

    # Additional debug info
    print(f"📊 Debug: target_gene_family = '{target_gene_family}' (should be 'Collagen')")
    print(f"📊 Debug: reference_tissue = '{reference_tissue}' (should be 'Bone')")

except NameError as e:
    print(f"❌ Cell 2 parameters missing: {e}")
    cell2_success = False

print("-" * 40)

# Test 3: Check if PhASTMConfig class is available and can be instantiated
print("TEST 3: PhASTMConfig Class")
try:
    # Test if class exists
    print(f"✅ PhASTMConfig class available: {PhASTMConfig}")

    # Test basic instantiation (without full parameters)
    print("🔧 Testing PhASTMConfig instantiation...")

    # Create minimal config for testing (use actual parameters from Cell 2)
    test_config = PhASTMConfig(
        workspace_root=str(WORKSPACE_ROOT) if cell1_success else "/tmp",
        additional_fasta=None,
        target_gene_family=target_gene_family if cell2_success else "Collagen",
        reference_tissue=reference_tissue if cell2_success else "Bone",
        include_ncbi_nr_novel_species=True,
        max_ncbi_nr_sequences=100,
        enzyme="trypsin",
        min_len=6,
        max_len=35,
        missed_cleavages=2,
        convert_il_to_b=True,
        min_conserved_peptide_length_display=6,
        model_species=["Homo_sapiens"],
        force_update_architecture_cache=False,
        enable_parallel_digestion=True,
        num_digestion_workers=2,
        enable_parallel_mrca=True,
        num_mrca_workers=2,
        enable_checkpointing=True,
        force_recompute_checkpoints=False,
        entrez_email="test@example.com",
        use_full_model_species_list=False
    )

    print(f"✅ PhASTMConfig instantiated successfully")
    print(f"   - Target gene family: {test_config.target_gene_family}")
    print(f"   - Gene family directory: {test_config.gene_family_dir_name}")
    print(f"   - Output path: {test_config.output_base_path}")
    print(f"   - Enzyme rules available: {len(test_config.enzyme_rules)} enzymes")
    print(f"   - Tissues JSON path: {test_config.tissues_json_path}")

    # Test the target genes loading
    try:
        target_genes = test_config.get_target_genes_for_tissue()
        print(f"   - Target genes for tissue: {target_genes}")
    except Exception as e:
        print(f"   ⚠️ Could not load target genes: {e}")

    cell3_success = True

except Exception as e:
    print(f"❌ PhASTMConfig issues: {e}")
    cell3_success = False

print("=" * 60)

# Summary
print("📊 TESTING SUMMARY:")
print(f"✅ Cell 1 (Path Management): {'PASS' if cell1_success else 'FAIL'}")
print(f"✅ Cell 2 (Configuration): {'PASS' if cell2_success else 'FAIL'}")
print(f"✅ Cell 3 (PhASTMConfig): {'PASS' if cell3_success else 'FAIL'}")

overall_success = cell1_success and cell2_success and cell3_success
print(f"\n🎯 OVERALL STATUS: {'READY FOR NEXT STEPS' if overall_success else 'NEEDS FIXES'}")

if overall_success:
    print("\n🚀 Current cells are working! Ready to add:")
    print("   - Cell 4: SmartGeneTreeGenerator")
    print("   - Cell 5: Enhanced Protein Classes (Collagen/Keratin)")
    print("   - Cell 6: Complete data loading pipeline")
    print("   - Cell 7: Nexyme digestion pipeline")
else:
    print("\n🔧 Please fix the failing cells before proceeding.")

print("=" * 60)

🧪 Testing current fish2.ipynb implementation...
TEST 1: Cell 1 Path Variables
✅ SHARED_DATA_BASE_PATH: /content/drive/MyDrive/Colab_Notebooks/GitHub/_SHARED_DATA
✅ OUTPUT_BASE_PATH_PREFIX: /content/drive/MyDrive/Colab_Notebooks/GitHub/_SHARED_DATA/outputs/PhASTM-fish-net
✅ IN_COLAB: True
----------------------------------------
TEST 2: Cell 2 Configuration Parameters
✅ target_gene_family: Collagen
✅ reference_tissue: Bone
✅ enzyme: trypsin
✅ entrez_email: matthew@palaeome.org
✅ enable_checkpointing: True
✅ model_species: ['Homo_sapiens', 'Mus_musculus', 'Rattus_norvegicus', 'Gallus_gallus', 'Danio_rerio', 'Canis_lupus_familiaris', 'Sus_scrofa', 'Ovis_aries', 'Bos_taurus', 'Petromyzon_marinus', 'Callorhinchus_milii', 'Anolis_carolinensis', 'Xenopus_tropicalis', 'Sphenodon_punctatus']
📊 Debug: target_gene_family = 'Collagen' (should be 'Collagen')
📊 Debug: reference_tissue = 'Bone' (should be 'Bone')
----------------------------------------
TEST 3: PhASTMConfig Class
✅ PhASTMConfig class

## Cell 4: SmartGeneTreeGenerator (for Ensembl API integration)

In [12]:
# ===== Cell 4: SmartGeneTreeGenerator Class =====
# Complete Ensembl API integration for gene tree fetching
# Restores ALL June 28th functionality for robust API handling

import requests
import time
import json
from typing import Dict, List, Any, Optional, Tuple
from tenacity import retry, wait_exponential, stop_after_attempt
from tqdm.notebook import tqdm

print("🌳 Defining SmartGeneTreeGenerator Class...")

class SmartGeneTreeGenerator:
    """
    Advanced gene tree fetching from Ensembl with robust error handling.
    Handles gene name -> Ensembl ID -> Gene Tree ID -> Sequences pipeline.

    Features from June 28th:
    - Automatic retries with exponential backoff
    - Smart rate limiting
    - Comprehensive error handling
    - Progress tracking
    - Checkpoint saving
    """

    def __init__(self, config_instance):
        """Initialize with PhASTMConfig instance."""
        self.config = config_instance
        self.base_url = "https://rest.ensembl.org"
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json',
            'User-Agent': 'PhASTM/1.0 (matthew@palaeome.org)'
        })

        # Tracking dictionaries
        self.gene_to_ensembl_id = {}
        self.gene_to_tree_id = {}
        self.tree_id_to_url = {}
        self.failed_genes = []

        print(f"✅ SmartGeneTreeGenerator initialized")
        print(f"   Base URL: {self.base_url}")

    @retry(wait=wait_exponential(multiplier=1, min=4, max=10),
           stop=stop_after_attempt(3))
    def _make_request(self, url: str, params: Dict = None) -> Optional[Dict]:
        """
        Make robust API request with retries and error handling.
        """
        try:
            response = self.session.get(url, params=params, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"❌ Request failed: {url} - {e}")
            raise
        except json.JSONDecodeError as e:
            print(f"❌ JSON decode error: {e}")
            raise

    def resolve_gene_name_to_ensembl_id(self, gene_name: str,
                                      species: str = "homo_sapiens") -> Optional[str]:
        """
        Convert gene symbol to Ensembl gene ID.
        """
        url = f"{self.base_url}/lookup/symbol/{species}/{gene_name}"

        try:
            result = self._make_request(url)
            if result and 'id' in result:
                ensembl_id = result['id']
                print(f"  ✅ {gene_name} -> {ensembl_id}")
                return ensembl_id
            else:
                print(f"  ❌ No Ensembl ID found for {gene_name}")
                return None
        except Exception as e:
            print(f"  ❌ Failed to resolve {gene_name}: {e}")
            return None

    def get_gene_tree_id_from_gene_id(self, ensembl_gene_id: str) -> Optional[str]:
        """
        Get gene tree ID from Ensembl gene ID.
        """
        url = f"{self.base_url}/genetree/member/id/{ensembl_gene_id}"

        try:
            result = self._make_request(url)
            if result and 'tree' in result and 'id' in result['tree']:
                tree_id = result['tree']['id']
                tree_url = f"{self.base_url}/genetree/id/{tree_id}?sequence=protein;aligned=0;format=fasta"
                self.tree_id_to_url[tree_id] = tree_url
                print(f"  ✅ Tree ID: {tree_id}")
                return tree_id
            else:
                print(f"  ❌ No gene tree found for {ensembl_gene_id}")
                return None
        except Exception as e:
            print(f"  ❌ Failed to get tree ID for {ensembl_gene_id}: {e}")
            return None

    def download_tree_sequences_via_api(self, tree_id: str) -> List[Dict[str, Any]]:
        """
        Download all protein sequences from a gene tree.
        """
        if tree_id not in self.tree_id_to_url:
            print(f"❌ No URL found for tree {tree_id}")
            return []

        url = self.tree_id_to_url[tree_id]

        try:
            response = self.session.get(url, timeout=60)
            response.raise_for_status()
            fasta_content = response.text

            if not fasta_content.strip():
                print(f"  ❌ Empty response for tree {tree_id}")
                return []

            # Parse FASTA content
            sequences = []
            current_header = None
            current_seq = []

            for line in fasta_content.split('\n'):
                line = line.strip()
                if line.startswith('>'):
                    # Save previous sequence
                    if current_header and current_seq:
                        seq_data = self._parse_ensembl_header(current_header, ''.join(current_seq))
                        if seq_data:
                            sequences.append(seq_data)

                    # Start new sequence
                    current_header = line[1:]  # Remove '>'
                    current_seq = []
                elif line and not line.startswith('>'):
                    current_seq.append(line)

            # Don't forget the last sequence
            if current_header and current_seq:
                seq_data = self._parse_ensembl_header(current_header, ''.join(current_seq))
                if seq_data:
                    sequences.append(seq_data)

            print(f"  ✅ Downloaded {len(sequences)} sequences from tree {tree_id}")
            return sequences

        except Exception as e:
            print(f"  ❌ Failed to download tree {tree_id}: {e}")
            return []

    def _parse_ensembl_header(self, header: str, sequence: str) -> Optional[Dict[str, Any]]:
        """
        Parse Ensembl FASTA header to extract metadata.

        Example header:
        >ENSP00000364140.1 pep:known chromosome:GRCh38:17:50183590:50201716:1 gene:ENSG00000108821.15 transcript:ENST00000374988.8 gene_biotype:protein_coding transcript_biotype:protein_coding gene_symbol:COL1A1
        """
        try:
            # Extract basic info
            parts = header.split(' ', 1)
            if len(parts) < 2:
                return None

            protein_id = parts[0]
            metadata_str = parts[1]

            # Parse metadata
            metadata = {}
            for item in metadata_str.split(' '):
                if ':' in item:
                    key, value = item.split(':', 1)
                    metadata[key] = value

            # Extract species from protein ID (format: species_ENSP...)
            species_match = protein_id.split('_')[0] if '_' in protein_id else "unknown"

            return {
                'protein_id': protein_id,
                'gene_symbol': metadata.get('gene_symbol', 'Unknown'),
                'species': species_match,
                'sequence': sequence,
                'gene_id': metadata.get('gene'),
                'transcript_id': metadata.get('transcript'),
                'gene_biotype': metadata.get('gene_biotype'),
                'source': 'ensembl_tree',
                'length': len(sequence)
            }

        except Exception as e:
            print(f"  ⚠️ Failed to parse header: {header[:50]}... Error: {e}")
            return None

    def fetch_sequences_for_gene_list(self, gene_names: List[str],
                                    species: str = "homo_sapiens") -> Dict[str, List[Dict]]:
        """
        Complete pipeline: gene names -> sequences for all genes.
        """
        print(f"🚀 Starting complete gene tree fetching for {len(gene_names)} genes...")

        all_sequences_by_gene = {}

        # Step 1: Resolve gene names to Ensembl IDs
        print("🔍 Step 1: Resolving gene names to Ensembl IDs...")
        for gene_name in tqdm(gene_names, desc="Resolving genes"):
            ensembl_id = self.resolve_gene_name_to_ensembl_id(gene_name, species)
            if ensembl_id:
                self.gene_to_ensembl_id[gene_name] = ensembl_id
            else:
                self.failed_genes.append(gene_name)
            time.sleep(0.2)  # Rate limiting

        print(f"✅ Resolved {len(self.gene_to_ensembl_id)}/{len(gene_names)} genes to Ensembl IDs")

        # Step 2: Get Gene Tree IDs
        print("🌳 Step 2: Getting Gene Tree IDs...")
        for gene_name, ensembl_id in tqdm(self.gene_to_ensembl_id.items(), desc="Getting tree IDs"):
            tree_id = self.get_gene_tree_id_from_gene_id(ensembl_id)
            if tree_id:
                self.gene_to_tree_id[gene_name] = tree_id
            time.sleep(0.2)

        print(f"✅ Found {len(self.gene_to_tree_id)} gene trees")

        # Step 3: Download sequences from trees
        print("💾 Step 3: Downloading sequences from gene trees...")
        for gene_name, tree_id in tqdm(self.gene_to_tree_id.items(), desc="Downloading sequences"):
            sequences = self.download_tree_sequences_via_api(tree_id)
            if sequences:
                # Add gene family info to each sequence
                for seq in sequences:
                    seq['gene_family'] = gene_name
                    seq['tree_id'] = tree_id

                all_sequences_by_gene[gene_name] = sequences
                print(f"  {gene_name}: {len(sequences)} sequences")
            time.sleep(0.5)  # Longer pause for downloads

        # Step 4: Save summary
        self._save_summary()

        return all_sequences_by_gene

    def _save_summary(self):
        """Save summary of the fetching process."""
        summary = {
            'gene_to_ensembl_id': self.gene_to_ensembl_id,
            'gene_to_tree_id': self.gene_to_tree_id,
            'tree_id_to_url': self.tree_id_to_url,
            'failed_genes': self.failed_genes,
            'total_genes_processed': len(self.gene_to_ensembl_id),
            'total_trees_found': len(self.gene_to_tree_id)
        }

        summary_path = self.config.output_base_path / "ensembl_gene_tree_summary.json"
        with open(summary_path, 'w') as f:
            json.dump(summary, f, indent=2)

        print(f"💾 Summary saved to: {summary_path}")

print("✅ SmartGeneTreeGenerator class defined successfully!")
print("🔗 Features restored:")
print("   - Robust API requests with retries")
print("   - Gene name -> Ensembl ID resolution")
print("   - Gene tree fetching and parsing")
print("   - Comprehensive error handling")
print("   - Progress tracking and checkpointing")
print("-" * 60)

🌳 Defining SmartGeneTreeGenerator Class...
✅ SmartGeneTreeGenerator class defined successfully!
🔗 Features restored:
   - Robust API requests with retries
   - Gene name -> Ensembl ID resolution
   - Gene tree fetching and parsing
   - Comprehensive error handling
   - Progress tracking and checkpointing
------------------------------------------------------------


In [13]:
# ===== Test Cell 4: SmartGeneTreeGenerator =====
# Test the SmartGeneTreeGenerator class independently

print("🧪 Testing Cell 4: SmartGeneTreeGenerator...")
print("=" * 60)

# Test 1: Check if SmartGeneTreeGenerator class exists
print("TEST 1: SmartGeneTreeGenerator Class Availability")
try:
    print(f"✅ SmartGeneTreeGenerator class available: {SmartGeneTreeGenerator}")
    cell4_class_available = True
except NameError as e:
    print(f"❌ SmartGeneTreeGenerator class not defined: {e}")
    cell4_class_available = False

print("-" * 40)

# Test 2: Test SmartGeneTreeGenerator instantiation
print("TEST 2: SmartGeneTreeGenerator Instantiation")
if cell4_class_available:
    try:
        # Use the config from previous tests
        if 'test_config' in globals():
            gene_tree_gen = SmartGeneTreeGenerator(test_config)
            print("✅ SmartGeneTreeGenerator instantiated successfully")
            print(f"   - Base URL: {gene_tree_gen.base_url}")
            print(f"   - Session configured: {gene_tree_gen.session is not None}")
            print(f"   - Config attached: {gene_tree_gen.config is not None}")
            cell4_instantiation = True
        else:
            print("❌ test_config not available from previous tests")
            cell4_instantiation = False
    except Exception as e:
        print(f"❌ SmartGeneTreeGenerator instantiation failed: {e}")
        cell4_instantiation = False
else:
    cell4_instantiation = False

print("-" * 40)

# Test 3: Test API methods (without making actual requests)
print("TEST 3: SmartGeneTreeGenerator Methods")
if cell4_instantiation:
    try:
        # Test method availability
        required_methods = [
            'resolve_gene_name_to_ensembl_id',
            'get_gene_tree_id_from_gene_id',
            'download_tree_sequences_via_api',
            '_make_request',
            '_parse_ensembl_header',
            'fetch_sequences_for_gene_list'
        ]

        missing_methods = []
        available_methods = []

        for method_name in required_methods:
            if hasattr(gene_tree_gen, method_name):
                available_methods.append(method_name)
                print(f"   ✅ {method_name}")
            else:
                missing_methods.append(method_name)
                print(f"   ❌ {method_name}")

        if not missing_methods:
            print(f"✅ All {len(required_methods)} required methods available")
            cell4_methods = True
        else:
            print(f"❌ Missing {len(missing_methods)} methods: {missing_methods}")
            cell4_methods = False

    except Exception as e:
        print(f"❌ Error testing methods: {e}")
        cell4_methods = False
else:
    cell4_methods = False

print("-" * 40)

# Test 4: Test tracking dictionaries
print("TEST 4: SmartGeneTreeGenerator Data Structures")
if cell4_instantiation:
    try:
        # Check if tracking dictionaries are initialized
        required_attrs = [
            'gene_to_ensembl_id',
            'gene_to_tree_id',
            'tree_id_to_url',
            'failed_genes'
        ]

        for attr_name in required_attrs:
            if hasattr(gene_tree_gen, attr_name):
                attr_value = getattr(gene_tree_gen, attr_name)
                print(f"   ✅ {attr_name}: {type(attr_value).__name__} (initialized)")
            else:
                print(f"   ❌ {attr_name}: missing")

        cell4_data_structures = True

    except Exception as e:
        print(f"❌ Error testing data structures: {e}")
        cell4_data_structures = False
else:
    cell4_data_structures = False

print("=" * 60)

# Summary for Cell 4
print("📊 CELL 4 TESTING SUMMARY:")
print(f"✅ Class Available: {'PASS' if cell4_class_available else 'FAIL'}")
print(f"✅ Instantiation: {'PASS' if cell4_instantiation else 'FAIL'}")
print(f"✅ Methods: {'PASS' if cell4_methods else 'FAIL'}")
print(f"✅ Data Structures: {'PASS' if cell4_data_structures else 'FAIL'}")

cell4_overall = cell4_class_available and cell4_instantiation and cell4_methods and cell4_data_structures
print(f"\n🎯 CELL 4 STATUS: {'READY' if cell4_overall else 'NEEDS FIXES'}")

if cell4_overall:
    print("\n✅ SmartGeneTreeGenerator is ready for API calls!")
    print("   - Can proceed to test actual gene name resolution")
    print("   - Can test gene tree fetching")
    print("   - All infrastructure in place")
else:
    print("\n🔧 Fix Cell 4 issues before proceeding")

print("=" * 60)

🧪 Testing Cell 4: SmartGeneTreeGenerator...
TEST 1: SmartGeneTreeGenerator Class Availability
✅ SmartGeneTreeGenerator class available: <class '__main__.SmartGeneTreeGenerator'>
----------------------------------------
TEST 2: SmartGeneTreeGenerator Instantiation
✅ SmartGeneTreeGenerator initialized
   Base URL: https://rest.ensembl.org
✅ SmartGeneTreeGenerator instantiated successfully
   - Base URL: https://rest.ensembl.org
   - Session configured: True
   - Config attached: True
----------------------------------------
TEST 3: SmartGeneTreeGenerator Methods
   ✅ resolve_gene_name_to_ensembl_id
   ✅ get_gene_tree_id_from_gene_id
   ✅ download_tree_sequences_via_api
   ✅ _make_request
   ✅ _parse_ensembl_header
   ✅ fetch_sequences_for_gene_list
✅ All 6 required methods available
----------------------------------------
TEST 4: SmartGeneTreeGenerator Data Structures
   ✅ gene_to_ensembl_id: dict (initialized)
   ✅ gene_to_tree_id: dict (initialized)
   ✅ tree_id_to_url: dict (initiali

## Cell 5: Protein Processing Classes

##3 Advanced Protein Family Analysis

Defines the base `ProteinClass` and specialized subclasses for `Collagen` and `Keratin` analysis with the fully restored, advanced analytical methods.

In [14]:
# ===== Cell 5: Enhanced Protein Classes (Collagen, Keratin, KAP) =====
# Detailed protein analysis classes preserving your research work
# Combines June 28th functionality with your enhanced protein classification

import re
import pandas as pd
from typing import Dict, List, Any, Optional, Tuple
from collections import defaultdict

print("🧬 Defining Enhanced Protein Classes...")

class ProteinClass:
    """
    Base class for protein analysis with common functionality.
    Provides framework for specialized protein family analysis.
    """

    def __init__(self, config_instance):
        """Initialize with PhASTMConfig instance."""
        self.config = config_instance
        self.master_df = pd.DataFrame()
        self.enzyme_rules = config_instance.enzyme_rules

        print(f"  ✅ Base ProteinClass initialized")
        print(f"  📊 Available enzymes: {list(self.enzyme_rules.keys())}")

    def digest_sequence_with_enzyme(self, sequence: str, enzyme: str) -> List[str]:
        """
        Digest protein sequence with specified enzyme.
        Returns list of peptide fragments.
        """
        if enzyme not in self.enzyme_rules:
            raise ValueError(f"Enzyme {enzyme} not supported. Available: {list(self.enzyme_rules.keys())}")

        enzyme_data = self.enzyme_rules[enzyme]
        cleavage_rule = enzyme_data['cleavage_rule']

        # Apply cleavage rule using regex
        try:
            peptides = re.split(cleavage_rule, sequence)
            # Remove empty peptides
            peptides = [p for p in peptides if p.strip()]
            return peptides
        except re.error as e:
            print(f"❌ Regex error for enzyme {enzyme}: {e}")
            return [sequence]  # Return original sequence if regex fails

    def extract_species_from_header(self, header: str) -> str:
        """Extract species information from sequence header."""
        # Try different header formats
        patterns = [
            r'\[([A-Za-z]+\s+[A-Za-z]+)\]',  # [Homo sapiens]
            r'OS=([A-Za-z]+\s+[A-Za-z]+)',   # OS=Homo sapiens
            r'_([A-Z]+[A-Z0-9]*)\s',         # _HUMAN
        ]

        for pattern in patterns:
            match = re.search(pattern, header)
            if match:
                species = match.group(1)
                # Convert _HUMAN style to proper species name
                if species.isupper() and len(species) <= 10:
                    species_mapping = {
                        'HUMAN': 'Homo sapiens',
                        'MOUSE': 'Mus musculus',
                        'RAT': 'Rattus norvegicus',
                        'CHICK': 'Gallus gallus',
                        'ZEBRA': 'Danio rerio'
                    }
                    return species_mapping.get(species, f"Unknown_{species}")
                else:
                    return species.replace('_', ' ')

        return "Unknown_species"

    def analyze_protein_composition(self, sequence: str) -> Dict[str, Any]:
        """
        Analyze basic protein composition.
        """
        if not sequence:
            return {}

        aa_counts = defaultdict(int)
        for aa in sequence:
            aa_counts[aa] += 1

        total_length = len(sequence)

        return {
            'length': total_length,
            'amino_acid_counts': dict(aa_counts),
            'amino_acid_frequencies': {aa: count/total_length for aa, count in aa_counts.items()},
            'molecular_weight_approx': total_length * 110,  # Rough approximation
            'charge_positive': sequence.count('K') + sequence.count('R') + sequence.count('H'),
            'charge_negative': sequence.count('D') + sequence.count('E'),
            'hydrophobic_residues': sum(sequence.count(aa) for aa in 'AILMFWYV'),
            'polar_residues': sum(sequence.count(aa) for aa in 'STYNQC'),
            'aromatic_residues': sum(sequence.count(aa) for aa in 'FWY')
        }

print("✅ Base ProteinClass defined")

class Collagen(ProteinClass):
    """
    Enhanced Collagen analysis class with detailed structural analysis.
    Preserves your research work on collagen classification and analysis.
    """

    def __init__(self, config_instance):
        """Initialize Collagen-specific analysis."""
        super().__init__(config_instance)
        self.collagen_types = self._load_collagen_classification()

        # Collagen-specific parameters
        self.gly_x_y_pattern = re.compile(r'G[A-Z][A-Z]')
        self.hydroxyproline_pattern = re.compile(r'GP[PO]|G[A-Z]P')

        print("  🧬 Collagen subclass initialized")
        print(f"  📚 Loaded {len(self.collagen_types)} collagen type classifications")

    def _load_collagen_classification(self) -> Dict[str, Any]:
        """Load collagen type classification data."""
        # Enhanced collagen classification based on your research
        return {
            'COL1A1': {
                'type': 'Type I',
                'description': 'Fibrillar collagen, major component of bone, tendon, skin',
                'typical_tissues': ['bone', 'tendon', 'skin', 'cornea'],
                'chain_composition': 'α1(I)₂α2(I)',
                'triple_helix_length': 1014,
                'n_telopeptide_length': {'default': 16, 'range': [14, 18]},
                'c_telopeptide_length': {'default': 26, 'range': [24, 28]},
                'characteristic_sequences': ['GFOGER', 'GVQGER', 'GLPGER']
            },
            'COL1A2': {
                'type': 'Type I',
                'description': 'Fibrillar collagen, α2 chain of Type I',
                'typical_tissues': ['bone', 'tendon', 'skin', 'cornea'],
                'chain_composition': 'α1(I)₂α2(I)',
                'triple_helix_length': 1008,
                'n_telopeptide_length': {'default': 17, 'range': [15, 19]},
                'c_telopeptide_length': {'default': 25, 'range': [23, 27]},
                'characteristic_sequences': ['GFOGER', 'GVQGER', 'GLAGER']
            },
            'COL2A1': {
                'type': 'Type II',
                'description': 'Cartilage-specific fibrillar collagen',
                'typical_tissues': ['cartilage', 'vitreous_humor', 'notochord'],
                'chain_composition': 'α1(II)₃',
                'triple_helix_length': 1017,
                'n_telopeptide_length': {'default': 18, 'range': [16, 20]},
                'c_telopeptide_length': {'default': 27, 'range': [25, 29]},
                'characteristic_sequences': ['GFOGER', 'GVQGER', 'GAPGER']
            },
            'COL3A1': {
                'type': 'Type III',
                'description': 'Reticular collagen, associated with Type I',
                'typical_tissues': ['skin', 'blood_vessels', 'internal_organs'],
                'chain_composition': 'α1(III)₃',
                'triple_helix_length': 1026,
                'n_telopeptide_length': {'default': 15, 'range': [13, 17]},
                'c_telopeptide_length': {'default': 24, 'range': [22, 26]},
                'characteristic_sequences': ['GFOGER', 'GVQGER', 'GAPGER']
            }
        }

    def analyze_collagen_structure(self, sequence: str, gene_symbol: str = None) -> Dict[str, Any]:
        """
        Comprehensive collagen structural analysis.
        """
        analysis = {}

        # Basic composition analysis
        basic_analysis = self.analyze_protein_composition(sequence)
        analysis.update(basic_analysis)

        # Gly-X-Y repeat analysis
        gxy_analysis = self._analyze_gly_x_y_repeats(sequence)
        analysis['gly_x_y_analysis'] = gxy_analysis

        # Domain extraction
        if gene_symbol and gene_symbol.upper() in self.collagen_types:
            domains = self._extract_collagen_domains(sequence, gene_symbol.upper())
            analysis['domains'] = domains

        # Collagen type classification
        collagen_type = self._classify_collagen_type(sequence, gene_symbol)
        analysis['collagen_classification'] = collagen_type

        # Post-translational modification sites
        ptm_sites = self._predict_ptm_sites(sequence)
        analysis['ptm_prediction'] = ptm_sites

        return analysis

    def _analyze_gly_x_y_repeats(self, sequence: str) -> Dict[str, Any]:
        """
        Analyze Gly-X-Y repeat structure characteristic of collagen.
        """
        # Find all Gly-X-Y triplets
        gxy_triplets = []
        for i in range(0, len(sequence) - 2, 3):
            triplet = sequence[i:i+3]
            if triplet.startswith('G'):
                gxy_triplets.append({
                    'position': i,
                    'sequence': triplet,
                    'x_residue': triplet[1] if len(triplet) > 1 else 'X',
                    'y_residue': triplet[2] if len(triplet) > 2 else 'X'
                })

        total_triplets = len(sequence) // 3
        gly_triplets = len(gxy_triplets)
        gly_percentage = (gly_triplets / total_triplets * 100) if total_triplets > 0 else 0

        # Analyze X and Y position preferences
        x_residues = [t['x_residue'] for t in gxy_triplets]
        y_residues = [t['y_residue'] for t in gxy_triplets]

        x_counts = defaultdict(int)
        y_counts = defaultdict(int)

        for x in x_residues:
            x_counts[x] += 1
        for y in y_residues:
            y_counts[y] += 1

        return {
            'total_triplets': total_triplets,
            'gly_triplets': gly_triplets,
            'gly_percentage': gly_percentage,
            'triplet_details': gxy_triplets,
            'x_position_preferences': dict(x_counts),
            'y_position_preferences': dict(y_counts),
            'proline_in_y': y_counts.get('P', 0),
            'hydroxyproline_predicted': len(re.findall(self.hydroxyproline_pattern, sequence)),
            'is_collagen_like': gly_percentage > 25.0  # Threshold for collagen-like
        }

    def _extract_collagen_domains(self, sequence: str, gene_symbol: str) -> Dict[str, Any]:
        """
        Extract collagen domains: N-telopeptide, triple helix, C-telopeptide.
        """
        params = self.collagen_types.get(gene_symbol, {})
        if not params:
            return {'error': f'No parameters found for {gene_symbol}'}

        n_telo_len = params.get('n_telopeptide_length', {}).get('default', 0)
        c_telo_len = params.get('c_telopeptide_length', {}).get('default', 0)
        seq_len = len(sequence)

        # Extract domains
        n_telopeptide = sequence[:n_telo_len] if n_telo_len > 0 and seq_len > n_telo_len else None

        helix_start = n_telo_len
        helix_end = seq_len - c_telo_len if c_telo_len > 0 and (seq_len - n_telo_len) > c_telo_len else seq_len

        triple_helix_core = sequence[helix_start:helix_end] if helix_end > helix_start else None
        c_telopeptide = sequence[helix_end:] if c_telo_len > 0 and helix_end < seq_len else None

        return {
            'n_telopeptide': {
                'sequence': n_telopeptide,
                'length': len(n_telopeptide) if n_telopeptide else 0,
                'expected_length': n_telo_len
            },
            'triple_helix': {
                'sequence': triple_helix_core,
                'length': len(triple_helix_core) if triple_helix_core else 0,
                'expected_length': params.get('triple_helix_length', 'unknown'),
                'start_position': helix_start,
                'end_position': helix_end
            },
            'c_telopeptide': {
                'sequence': c_telopeptide,
                'length': len(c_telopeptide) if c_telopeptide else 0,
                'expected_length': c_telo_len
            }
        }

    def _classify_collagen_type(self, sequence: str, gene_symbol: str = None) -> Dict[str, Any]:
        """
        Classify collagen type based on sequence and gene symbol.
        """
        if gene_symbol and gene_symbol.upper() in self.collagen_types:
            known_type = self.collagen_types[gene_symbol.upper()]
            return {
                'classification': 'known_collagen',
                'type': known_type['type'],
                'description': known_type['description'],
                'confidence': 'high',
                'gene_symbol': gene_symbol.upper()
            }

        # Sequence-based classification for unknown genes
        gxy_analysis = self._analyze_gly_x_y_repeats(sequence)

        if gxy_analysis['gly_percentage'] > 30:
            if len(sequence) > 1000:
                predicted_type = 'Fibrillar collagen (I, II, III, V, XI-like)'
            elif 200 < len(sequence) < 500:
                predicted_type = 'Network collagen (IV, VIII, X-like)'
            else:
                predicted_type = 'Short collagen or collagen-like'
        else:
            predicted_type = 'Non-collagen or atypical collagen'

        return {
            'classification': 'predicted',
            'type': predicted_type,
            'confidence': 'medium',
            'gly_percentage': gxy_analysis['gly_percentage'],
            'sequence_length': len(sequence)
        }

    def _predict_ptm_sites(self, sequence: str) -> Dict[str, Any]:
        """
        Predict post-translational modification sites.
        """
        # Hydroxylation sites (P in Y position of Gly-X-Y)
        hydroxyproline_sites = []
        hydroxylysine_sites = []

        for i in range(0, len(sequence) - 2, 3):
            if i + 2 < len(sequence):
                triplet = sequence[i:i+3]
                if triplet.startswith('G') and len(triplet) == 3:
                    if triplet[2] == 'P':  # Proline in Y position
                        hydroxyproline_sites.append(i + 2)
                    elif triplet[2] == 'K':  # Lysine in Y position
                        hydroxylysine_sites.append(i + 2)

        # Glycosylation sites (N-linked: NxS/T, O-linked: S/T)
        n_glycosylation = []
        o_glycosylation = []

        # N-linked glycosylation: N-X-S/T
        for match in re.finditer(r'N[A-Z][ST]', sequence):
            n_glycosylation.append(match.start())

        # O-linked glycosylation: S or T
        for i, aa in enumerate(sequence):
            if aa in 'ST':
                o_glycosylation.append(i)

        return {
            'hydroxyproline_sites': hydroxyproline_sites,
            'hydroxylysine_sites': hydroxylysine_sites,
            'n_glycosylation_sites': n_glycosylation,
            'o_glycosylation_sites': o_glycosylation[:20],  # Limit output
            'total_hydroxyproline_predicted': len(hydroxyproline_sites),
            'total_hydroxylysine_predicted': len(hydroxylysine_sites),
            'total_n_glycosylation_predicted': len(n_glycosylation),
            'total_o_glycosylation_predicted': len(o_glycosylation)
        }

print("✅ Enhanced Collagen class defined")

class Keratin(ProteinClass):
    """
    Enhanced Keratin analysis class with detailed structural analysis.
    Includes coiled-coil analysis, cysteine analysis, and KAP classification.
    """

    def __init__(self, config_instance):
        """Initialize Keratin-specific analysis."""
        super().__init__(config_instance)
        self.keratin_types = self._load_keratin_classification()

        # Keratin-specific parameters
        self.coiled_coil_pattern = re.compile(r'[ALEV][A-Z]{6}[ALEV]')  # Simplified heptad repeat
        self.cysteine_pattern = re.compile(r'C')

        # Alpha-helix propensity scores (Chou-Fasman)
        self.alpha_helix_propensity = {
            'A': 1.42, 'R': 0.98, 'N': 0.67, 'D': 1.01, 'C': 0.70,
            'Q': 1.11, 'E': 1.51, 'G': 0.57, 'H': 1.00, 'I': 1.08,
            'L': 1.21, 'K': 1.16, 'M': 1.45, 'F': 1.13, 'P': 0.57,
            'S': 0.77, 'T': 0.83, 'W': 1.08, 'Y': 0.69, 'V': 1.06
        }

        print("  🧬 Keratin subclass initialized")
        print(f"  📚 Loaded {len(self.keratin_types)} keratin type classifications")

    def _load_keratin_classification(self) -> Dict[str, Any]:
        """Load keratin type classification data."""
        return {
            'KRT1': {
                'type': 'Type II Intermediate Filament',
                'description': 'Epidermis-specific, pairs with KRT10',
                'typical_tissues': ['epidermis', 'skin'],
                'molecular_weight': 66000,
                'isoelectric_point': 8.15,
                'cysteine_content': 'low'
            },
            'KRT5': {
                'type': 'Type II Intermediate Filament',
                'description': 'Basal epithelial cells, pairs with KRT14',
                'typical_tissues': ['basal_epithelium', 'skin'],
                'molecular_weight': 58000,
                'isoelectric_point': 7.59,
                'cysteine_content': 'low'
            },
            'KRT10': {
                'type': 'Type I Intermediate Filament',
                'description': 'Epidermis-specific, pairs with KRT1',
                'typical_tissues': ['epidermis', 'skin'],
                'molecular_weight': 56500,
                'isoelectric_point': 5.13,
                'cysteine_content': 'low'
            },
            'KRT14': {
                'type': 'Type I Intermediate Filament',
                'description': 'Basal epithelial cells, pairs with KRT5',
                'typical_tissues': ['basal_epithelium', 'skin'],
                'molecular_weight': 51600,
                'isoelectric_point': 5.09,
                'cysteine_content': 'low'
            },
            'KRT31': {
                'type': 'Hair Keratin',
                'description': 'Hard keratin of hair cortex',
                'typical_tissues': ['hair', 'wool', 'fur'],
                'molecular_weight': 45000,
                'isoelectric_point': 5.4,
                'cysteine_content': 'high'
            },
            'KRT85': {
                'type': 'Hair Keratin',
                'description': 'Hard keratin of hair cortex',
                'typical_tissues': ['hair', 'wool', 'fur'],
                'molecular_weight': 50000,
                'isoelectric_point': 5.8,
                'cysteine_content': 'high'
            }
        }

    def analyze_keratin_structure(self, sequence: str, gene_symbol: str = None) -> Dict[str, Any]:
        """
        Comprehensive keratin structural analysis.
        """
        analysis = {}

        # Basic composition analysis
        basic_analysis = self.analyze_protein_composition(sequence)
        analysis.update(basic_analysis)

        # Coiled-coil analysis
        coiled_coil_analysis = self._analyze_coiled_coil_structure(sequence)
        analysis['coiled_coil_analysis'] = coiled_coil_analysis

        # Cysteine analysis
        cysteine_analysis = self._analyze_cysteine_content(sequence)
        analysis['cysteine_analysis'] = cysteine_analysis

        # Domain extraction (Head-Rod-Tail)
        domains = self._extract_keratin_domains(sequence)
        analysis['domains'] = domains

        # Keratin type classification
        keratin_type = self._classify_keratin_type(sequence, gene_symbol)
        analysis['keratin_classification'] = keratin_type

        # KAP detection
        kap_analysis = self._analyze_kap_features(sequence, gene_symbol)
        analysis['kap_analysis'] = kap_analysis

        return analysis

    def _analyze_coiled_coil_structure(self, sequence: str) -> Dict[str, Any]:
        """
        Analyze coiled-coil heptad repeat structure.
        """
        # Simplified heptad repeat analysis
        heptad_positions = {'a': [], 'b': [], 'c': [], 'd': [], 'e': [], 'f': [], 'g': []}

        for i, aa in enumerate(sequence):
            heptad_pos = ['a', 'b', 'c', 'd', 'e', 'f', 'g'][i % 7]
            heptad_positions[heptad_pos].append(aa)

        # Calculate hydrophobic moments for coiled-coil prediction
        hydrophobic_aa = set('AILMFWYV')

        a_hydrophobic = sum(1 for aa in heptad_positions['a'] if aa in hydrophobic_aa)
        d_hydrophobic = sum(1 for aa in heptad_positions['d'] if aa in hydrophobic_aa)

        total_a = len(heptad_positions['a'])
        total_d = len(heptad_positions['d'])

        a_hydrophobic_fraction = a_hydrophobic / total_a if total_a > 0 else 0
        d_hydrophobic_fraction = d_hydrophobic / total_d if total_d > 0 else 0

        coiled_coil_score = (a_hydrophobic_fraction + d_hydrophobic_fraction) / 2

        # Alpha-helix propensity
        helix_score = sum(self.alpha_helix_propensity.get(aa, 1.0) for aa in sequence) / len(sequence)

        return {
            'heptad_analysis': {pos: len(residues) for pos, residues in heptad_positions.items()},
            'a_position_hydrophobic_fraction': a_hydrophobic_fraction,
            'd_position_hydrophobic_fraction': d_hydrophobic_fraction,
            'coiled_coil_score': coiled_coil_score,
            'alpha_helix_propensity': helix_score,
            'predicted_coiled_coil': coiled_coil_score > 0.5 and helix_score > 1.0
        }

    def _analyze_cysteine_content(self, sequence: str) -> Dict[str, Any]:
        """
        Analyze cysteine content and distribution.
        """
        cys_positions = [i for i, aa in enumerate(sequence) if aa == 'C']
        total_cys = len(cys_positions)
        cys_percentage = (total_cys / len(sequence) * 100) if len(sequence) > 0 else 0

        # Analyze cysteine spacing
        cys_spacings = []
        for i in range(len(cys_positions) - 1):
            spacing = cys_positions[i + 1] - cys_positions[i]
            cys_spacings.append(spacing)

        avg_spacing = sum(cys_spacings) / len(cys_spacings) if cys_spacings else 0

        # Classify based on cysteine content
        if cys_percentage > 15:
            cys_class = 'Ultra-high cysteine (>15%)'
        elif cys_percentage > 8:
            cys_class = 'High cysteine (8-15%)'
        elif cys_percentage > 3:
            cys_class = 'Moderate cysteine (3-8%)'
        else:
            cys_class = 'Low cysteine (<3%)'

        return {
            'total_cysteines': total_cys,
            'cysteine_percentage': cys_percentage,
            'cysteine_positions': cys_positions,
            'cysteine_spacings': cys_spacings,
            'average_spacing': avg_spacing,
            'cysteine_classification': cys_class,
            'is_high_cysteine': cys_percentage > 8
        }

    def _extract_keratin_domains(self, sequence: str) -> Dict[str, Any]:
        """
        Extract keratin domains: Head, Rod, Tail.
        Simplified domain prediction based on sequence features.
        """
        seq_len = len(sequence)

        # Rough domain boundaries (these would need refinement with real data)
        # Head domain: typically first 100-150 residues
        # Rod domain: central coiled-coil region
        # Tail domain: C-terminal region

        head_end = min(150, seq_len // 4)
        tail_start = max(seq_len - 100, seq_len * 3 // 4)

        head_domain = sequence[:head_end]
        rod_domain = sequence[head_end:tail_start]
        tail_domain = sequence[tail_start:]

        return {
            'head': {
                'sequence': head_domain,
                'length': len(head_domain),
                'region': f'1-{head_end}'
            },
            'rod': {
                'sequence': rod_domain,
                'length': len(rod_domain),
                'region': f'{head_end + 1}-{tail_start}'
            },
            'tail': {
                'sequence': tail_domain,
                'length': len(tail_domain),
                'region': f'{tail_start + 1}-{seq_len}'
            }
        }

    def _classify_keratin_type(self, sequence: str, gene_symbol: str = None) -> Dict[str, Any]:
        """
        Classify keratin type based on sequence and gene symbol.
        """
        if gene_symbol and gene_symbol.upper() in self.keratin_types:
            known_type = self.keratin_types[gene_symbol.upper()]
            return {
                'classification': 'known_keratin',
                'type': known_type['type'],
                'description': known_type['description'],
                'confidence': 'high',
                'gene_symbol': gene_symbol.upper()
            }

        # Sequence-based classification
        cys_analysis = self._analyze_cysteine_content(sequence)
        coiled_coil_analysis = self._analyze_coiled_coil_structure(sequence)

        seq_length = len(sequence)
        cys_percentage = cys_analysis['cysteine_percentage']
        coiled_coil_score = coiled_coil_analysis['coiled_coil_score']

        if cys_percentage > 8.0:
            predicted_type = "Hard keratin (high cysteine)"
        elif (coiled_coil_score > 0.5 and
              coiled_coil_analysis['alpha_helix_propensity'] > 1.0 and
              seq_length > 400):
            predicted_type = "Intermediate filament keratin"
        elif seq_length < 200:
            predicted_type = "Keratin-like protein (short)"
        else:
            predicted_type = "Keratin-like protein (atypical)"

        return {
            'classification': 'predicted',
            'type': predicted_type,
            'confidence': 'medium',
            'cysteine_percentage': cys_percentage,
            'coiled_coil_score': coiled_coil_score,
            'sequence_length': seq_length
        }

    def _analyze_kap_features(self, sequence: str, gene_symbol: str = None) -> Dict[str, Any]:
        """
        Analyze Keratin-Associated Protein (KAP) features.
        """
        # KAP detection based on your research
        is_kap_candidate = False
        kap_type = "Not KAP"

        cys_analysis = self._analyze_cysteine_content(sequence)
        cys_percentage = cys_analysis['cysteine_percentage']
        seq_length = len(sequence)

        # KAP classification criteria
        if gene_symbol and 'KRTAP' in gene_symbol.upper():
            is_kap_candidate = True
            if cys_percentage > 20:
                kap_type = "High-sulfur KAP"
            elif cys_percentage > 10:
                kap_type = "High-glycine-tyrosine KAP"
            else:
                kap_type = "KAP (low cysteine)"
        elif cys_percentage > 15 and seq_length < 300:
            is_kap_candidate = True
            kap_type = "KAP-like (high cysteine, small)"

        # Analyze characteristic amino acid patterns
        gly_percentage = (sequence.count('G') / len(sequence) * 100) if len(sequence) > 0 else 0
        tyr_percentage = (sequence.count('Y') / len(sequence) * 100) if len(sequence) > 0 else 0
        ser_percentage = (sequence.count('S') / len(sequence) * 100) if len(sequence) > 0 else 0

        return {
            'is_kap_candidate': is_kap_candidate,
            'kap_type': kap_type,
            'cysteine_percentage': cys_percentage,
            'glycine_percentage': gly_percentage,
            'tyrosine_percentage': tyr_percentage,
            'serine_percentage': ser_percentage,
            'sequence_length': seq_length,
            'characteristic_composition': {
                'high_cysteine': cys_percentage > 15,
                'high_glycine': gly_percentage > 15,
                'high_tyrosine': tyr_percentage > 8,
                'small_protein': seq_length < 300
            }
        }

print("✅ Enhanced Keratin class defined")
print("✅ All enhanced protein classes ready!")
print("🔬 Features added:")
print("   - Detailed Collagen analysis (Gly-X-Y, domains, PTM)")
print("   - Comprehensive Keratin analysis (coiled-coil, cysteine)")
print("   - KAP (Keratin-Associated Protein) detection")
print("   - Advanced structural domain extraction")
print("   - Sequence-based classification algorithms")
print("-" * 60)

🧬 Defining Enhanced Protein Classes...
✅ Base ProteinClass defined
✅ Enhanced Collagen class defined
✅ Enhanced Keratin class defined
✅ All enhanced protein classes ready!
🔬 Features added:
   - Detailed Collagen analysis (Gly-X-Y, domains, PTM)
   - Comprehensive Keratin analysis (coiled-coil, cysteine)
   - KAP (Keratin-Associated Protein) detection
   - Advanced structural domain extraction
   - Sequence-based classification algorithms
------------------------------------------------------------


## Cell 6: Data Fetching & Processing Infrastructure

### Smart Data Fetchers and Main Processor

This cell defines the `DataFetcher` for handling external API calls and the main `ProteinProcessor` which orchestrates the entire workflow, including data loading, analysis, and the full Nexyme digestion pipeline.

In [16]:
# ===== Cell 6: Data Fetching & Processing Infrastructure =====

import pandas as pd
import re
from pathlib import Path
from Bio import SeqIO
from typing import List, Dict, Any
import concurrent.futures
from collections import defaultdict

print("--- Cell 6: Data Fetching & Processing Infrastructure ---")

class DataFetcher:
    """Handles all data fetching operations from various sources."""

    def __init__(self, config: PhASTMConfig):
        self.config = config

    def get_fasta_sequences(self) -> List[Dict[str, Any]]:
        """Load sequences from FASTA files."""
        sequences = []

        # Load from additional FASTA if specified
        if hasattr(self.config, 'additional_fasta') and self.config.additional_fasta:
            fasta_path = Path(self.config.additional_fasta)
            if fasta_path.exists():
                print(f"Loading sequences from: {fasta_path}")
                try:
                    with open(fasta_path, 'r') as handle:
                        for record in SeqIO.parse(handle, "fasta"):
                            taxonomy = self._extract_taxonomy_from_header(record.description)
                            sequences.append({
                                'sequence': str(record.seq),
                                'protein_id': record.id,
                                'description': record.description,
                                'gene': taxonomy.get('gene', 'Unknown'),
                                'species': taxonomy.get('species', 'Unknown_species'),
                                'accession': taxonomy.get('accession', record.id),
                                'source': 'additional_fasta'
                            })
                    print(f"✅ Loaded {len(sequences)} sequences from additional FASTA")
                except Exception as e:
                    print(f"❌ Error loading FASTA file {fasta_path}: {e}")
            else:
                print(f"⚠️ Additional FASTA file not found: {fasta_path}")

        return sequences

    def _extract_taxonomy_from_header(self, header: str) -> Dict[str, str]:
        """Extract taxonomy information from FASTA header"""
        try:
            # Common patterns for extracting gene and species from headers
            if '|' in header:
                parts = header.split('|')
                gene = parts[0].strip('>')
                species = parts[1] if len(parts) > 1 else 'Unknown_species'
            elif '_' in header:
                parts = header.split('_', 1)
                gene = parts[0].strip('>')
                species = parts[1] if len(parts) > 1 else 'Unknown_species'
            else:
                # Fallback
                gene = header.strip('>').split()[0]
                species = 'Unknown_species'

            return {
                'gene': gene,
                'species': species.replace(' ', '_'),
                'accession': gene
            }

        except Exception as e:
            print(f"Warning: Could not extract taxonomy from header '{header}': {e}")
            return {
                'gene': 'Unknown',
                'species': 'Unknown_species',
                'accession': 'Unknown'
            }

class ProteinProcessor:
    """Main orchestrator of the PhASTM pipeline."""

    def __init__(self, config: PhASTMConfig, protein_class_instance):
        self.config = config
        self.protein_class = protein_class_instance
        self.fetcher = DataFetcher(config)
        self.master_df = pd.DataFrame()

    def load_data(self):
        """Load sequence data from all configured sources."""
        print("Loading data from configured sources...")

        # Load from FASTA
        fasta_seqs = self.fetcher.get_fasta_sequences()
        if fasta_seqs:
            self.master_df = pd.DataFrame(fasta_seqs)
            print(f"✅ Loaded {len(self.master_df)} total sequences.")
        else:
            print("⚠️ No sequences loaded from FASTA sources.")
            self.master_df = pd.DataFrame()

    def analyze_sequences(self):
        """Run protein-family specific analysis on loaded sequences."""
        if self.master_df.empty:
            print("⚠️ No sequences to analyze.")
            return

        print("Running advanced analysis on sequences...")

        # Apply protein-family specific analysis
        if isinstance(self.protein_class, Keratin):
            print("Applying Keratin-specific analysis...")
            self.master_df['subtype'] = self.master_df.apply(
                lambda row: self.protein_class.classify_keratin_subtype(
                    row['sequence'], row.get('protein_id', '')
                ), axis=1
            )
        elif isinstance(self.protein_class, Collagen):
            print("Applying Collagen-specific analysis...")
            # Add collagen-specific analysis here
            pass

        print(f"✅ Analysis complete for {len(self.master_df)} sequences.")

    def run_digestion_pipeline(self) -> pd.DataFrame:
        """Run the complete digestion pipeline and return peptide results."""
        if self.master_df.empty:
            print("⚠️ No sequences available for digestion.")
            return pd.DataFrame()

        print(f"Starting digestion pipeline with {self.config.enzyme} enzyme...")

        # Simple digestion implementation
        all_peptides = []

        for idx, row in self.master_df.iterrows():
            sequence = row['sequence']

            # Simple trypsin-like digestion (cleave after K or R, not before P)
            if self.config.enzyme.lower() == 'trypsin':
                sites = [0]
                for i, aa in enumerate(sequence):
                    if aa in ['K', 'R'] and i + 1 < len(sequence) and sequence[i + 1] != 'P':
                        sites.append(i + 1)
                sites.append(len(sequence))

                # Generate peptides
                for i in range(len(sites) - 1):
                    start, end = sites[i], sites[i + 1]
                    peptide_seq = sequence[start:end]

                    # Filter by length
                    if self.config.min_len <= len(peptide_seq) <= self.config.max_len:
                        all_peptides.append({
                            'peptide_sequence': peptide_seq,
                            'protein_id': row['protein_id'],
                            'gene': row.get('gene', 'Unknown'),
                            'species': row.get('species', 'Unknown'),
                            'start_pos': start,
                            'end_pos': end,
                            'length': len(peptide_seq)
                        })

        peptide_df = pd.DataFrame(all_peptides)
        print(f"✅ Generated {len(peptide_df)} peptides from digestion.")

        return peptide_df

print("✅ DataFetcher and ProteinProcessor classes defined.")

--- Cell 6: Data Fetching & Processing Infrastructure ---
✅ DataFetcher and ProteinProcessor classes defined.


In [17]:
# ===== Test Cell 6: Data Loading Infrastructure =====
# Test the data loading pipeline and DataFetcher components

print("🧪 Testing Cell 6: Data Loading Infrastructure...")
print("=" * 60)

# Test 1: Check if data loading functions exist
print("TEST 1: Data Loading Functions Availability")
required_functions = [
    'initialize_data_loading_system',
    'load_target_genes_from_tissues',
    'load_sequences_from_ensembl_trees',
    'load_sequences_from_additional_fasta',
    'extract_species_from_fasta_header'
]

missing_functions = []
available_functions = []

for func_name in required_functions:
    try:
        func = globals()[func_name]
        available_functions.append(func_name)
        print(f"   ✅ {func_name}")
    except KeyError:
        missing_functions.append(func_name)
        print(f"   ❌ {func_name}")

cell6_functions = len(missing_functions) == 0

print("-" * 40)

# Test 2: Test initialize_data_loading_system
print("TEST 2: Data Loading System Initialization")
if 'initialize_data_loading_system' in available_functions and 'test_config' in globals():
    try:
        gene_tree_generator, protein_processor = initialize_data_loading_system(test_config)

        print("✅ Data loading system initialized successfully")
        print(f"   - Gene tree generator type: {type(gene_tree_generator).__name__}")
        print(f"   - Protein processor type: {type(protein_processor).__name__}")
        print(f"   - Entrez email configured: {test_config.entrez_email}")

        # Store for later tests
        globals()['test_gene_tree_gen'] = gene_tree_generator
        globals()['test_protein_proc'] = protein_processor

        cell6_initialization = True

    except Exception as e:
        print(f"❌ Data loading system initialization failed: {e}")
        cell6_initialization = False
else:
    print("❌ initialize_data_loading_system function or test_config not available")
    cell6_initialization = False

print("-" * 40)

# Test 3: Test target genes loading
print("TEST 3: Target Genes Loading")
if 'load_target_genes_from_tissues' in available_functions and 'test_config' in globals():
    try:
        target_genes = load_target_genes_from_tissues(test_config)

        print(f"✅ Target genes loaded: {len(target_genes)} genes")
        print(f"   - Genes: {target_genes}")

        # Store for later tests
        globals()['test_target_genes'] = target_genes

        cell6_target_genes = True

    except Exception as e:
        print(f"❌ Target genes loading failed: {e}")
        cell6_target_genes = False
else:
    print("❌ load_target_genes_from_tissues function or test_config not available")
    cell6_target_genes = False

print("-" * 40)

# Test 4: Test FASTA header parsing
print("TEST 4: FASTA Header Parsing")
if 'extract_species_from_fasta_header' in available_functions:
    try:
        test_headers = [
            "COL1A1_HUMAN Collagen alpha-1(I) chain [Homo sapiens]",
            "sp|P02452|CO1A1_HUMAN Collagen alpha-1(I) chain OS=Homo sapiens",
            "COL1A1|Homo_sapiens",
            "gi|123456|ref|NP_000089.3| collagen alpha-1(I) [Homo sapiens]"
        ]

        parsed_species = []
        for header in test_headers:
            species = extract_species_from_fasta_header(header)
            parsed_species.append(species)
            print(f"   ✅ '{header[:30]}...' -> '{species}'")

        # Check if we got reasonable species names
        valid_parses = sum(1 for s in parsed_species if s != "Unknown_species")

        if valid_parses >= len(test_headers) // 2:  # At least half should parse correctly
            print(f"✅ FASTA header parsing: {valid_parses}/{len(test_headers)} parsed correctly")
            cell6_header_parsing = True
        else:
            print(f"❌ FASTA header parsing: only {valid_parses}/{len(test_headers)} parsed correctly")
            cell6_header_parsing = False

    except Exception as e:
        print(f"❌ FASTA header parsing test failed: {e}")
        cell6_header_parsing = False
else:
    print("❌ extract_species_from_fasta_header function not available")
    cell6_header_parsing = False

print("-" * 40)

# Test 5: Test data loading functions (without actual file access)
print("TEST 5: Data Loading Function Structure")
if 'load_sequences_from_additional_fasta' in available_functions and 'test_config' in globals():
    try:
        # Test the function with our config (will likely return empty since files don't exist)
        fasta_sequences = load_sequences_from_additional_fasta(test_config)

        print(f"✅ Additional FASTA loading completed")
        print(f"   - Sequences loaded: {len(fasta_sequences)}")
        print(f"   - Function executed without errors")

        cell6_fasta_loading = True

    except Exception as e:
        print(f"❌ Additional FASTA loading test failed: {e}")
        cell6_fasta_loading = False
else:
    print("❌ load_sequences_from_additional_fasta function or test_config not available")
    cell6_fasta_loading = False

print("-" * 40)

# Test 6: Test checkpoint loading functions (if available)
print("TEST 6: Data Integration Pipeline Structure")
if cell6_initialization and 'test_protein_proc' in globals():
    try:
        # Check if protein processor has required methods
        required_methods = ['load_data', 'analyze_sequences', 'run_digestion_pipeline']

        available_methods = []
        for method_name in required_methods:
            if hasattr(test_protein_proc, method_name):
                available_methods.append(method_name)
                print(f"   ✅ {method_name}")
            else:
                print(f"   ❌ {method_name}")

        if len(available_methods) >= 2:  # At least 2 core methods should be available
            print(f"✅ Data integration pipeline: {len(available_methods)}/{len(required_methods)} methods available")
            cell6_integration = True
        else:
            print(f"❌ Data integration pipeline: only {len(available_methods)}/{len(required_methods)} methods available")
            cell6_integration = False

    except Exception as e:
        print(f"❌ Data integration pipeline test failed: {e}")
        cell6_integration = False
else:
    print("❌ Data integration pipeline test skipped (processor not available)")
    cell6_integration = False

print("=" * 60)

# Summary for Cell 6
print("📊 CELL 6 TESTING SUMMARY:")
print(f"✅ Functions Available: {'PASS' if cell6_functions else 'FAIL'}")
print(f"✅ System Initialization: {'PASS' if cell6_initialization else 'FAIL'}")
print(f"✅ Target Genes Loading: {'PASS' if cell6_target_genes else 'FAIL'}")
print(f"✅ Header Parsing: {'PASS' if cell6_header_parsing else 'FAIL'}")
print(f"✅ FASTA Loading: {'PASS' if cell6_fasta_loading else 'FAIL'}")
print(f"✅ Integration Pipeline: {'PASS' if cell6_integration else 'FAIL'}")

cell6_overall = (cell6_functions and cell6_initialization and cell6_target_genes and
                cell6_header_parsing and cell6_fasta_loading)
print(f"\n🎯 CELL 6 STATUS: {'READY' if cell6_overall else 'NEEDS FIXES'}")

if cell6_overall:
    print("\n✅ Data Loading Infrastructure is ready!")
    print("   - All data loading functions available")
    print("   - System initialization working")
    print("   - Target gene resolution functional")
    print("   - FASTA parsing capabilities verified")
    print("   - Ready for actual data loading")
else:
    print("\n🔧 Fix Cell 6 issues before proceeding")

print("=" * 60)

🧪 Testing Cell 6: Data Loading Infrastructure...
TEST 1: Data Loading Functions Availability
   ❌ initialize_data_loading_system
   ❌ load_target_genes_from_tissues
   ❌ load_sequences_from_ensembl_trees
   ❌ load_sequences_from_additional_fasta
   ❌ extract_species_from_fasta_header
----------------------------------------
TEST 2: Data Loading System Initialization
❌ initialize_data_loading_system function or test_config not available
----------------------------------------
TEST 3: Target Genes Loading
❌ load_target_genes_from_tissues function or test_config not available
----------------------------------------
TEST 4: FASTA Header Parsing
❌ extract_species_from_fasta_header function not available
----------------------------------------
TEST 5: Data Loading Function Structure
❌ load_sequences_from_additional_fasta function or test_config not available
----------------------------------------
TEST 6: Data Integration Pipeline Structure
❌ Data integration pipeline test skipped (proce

## Cell 7: Pipeline Construction


In [18]:
# ===== Cell 7: Complete Data Loading Pipeline =====
# Multi-source data integration: Ensembl, FASTA, NCBI
# Restores ALL June 28th data loading functionality

import pickle
import json
from Bio import SeqIO, Entrez
from pathlib import Path
import pandas as pd
from typing import Dict, List, Any, Optional
from tqdm.notebook import tqdm
import time

print("📊 Defining Complete Data Loading Pipeline...")

def initialize_data_loading_system(config):
    """
    Initialize the complete data loading system.
    Sets up all required components and validates paths.
    """
    print("🔧 Initializing data loading system...")

    # Set up Entrez for NCBI access
    Entrez.email = config.entrez_email
    print(f"✅ Entrez email set: {config.entrez_email}")

    # Initialize SmartGeneTreeGenerator
    gene_tree_generator = SmartGeneTreeGenerator(config)
    print("✅ SmartGeneTreeGenerator initialized")

    # Initialize appropriate protein processor
    if config.target_gene_family.upper().startswith('COL'):
        protein_processor = Collagen(config)
        print("✅ Collagen processor initialized")
    elif config.target_gene_family.upper().startswith('KRT') or config.target_gene_family.upper().startswith('KRTAP'):
        protein_processor = Keratin(config)
        print("✅ Keratin processor initialized")
    else:
        protein_processor = ProteinClass(config)
        print("✅ Generic protein processor initialized")

    return gene_tree_generator, protein_processor

def load_target_genes_from_tissues(config) -> List[str]:
    """
    Load target genes based on tissue selection.
    """
    print(f"🎯 Loading target genes for tissue: {config.reference_tissue}")

    try:
        with open(config.tissues_json_path, 'r') as f:
            tissues_data = json.load(f)

        if config.reference_tissue in tissues_data:
            target_genes = tissues_data[config.reference_tissue]
            print(f"✅ Found {len(target_genes)} target genes: {target_genes}")
            return target_genes
        else:
            print(f"⚠️ Tissue '{config.reference_tissue}' not found in tissues file")
            # Fallback to common collagen genes
            fallback_genes = ["COL1A1", "COL1A2", "COL2A1", "COL3A1"]
            print(f"📋 Using fallback genes: {fallback_genes}")
            return fallback_genes

    except FileNotFoundError:
        print(f"❌ Tissues file not found: {config.tissues_json_path}")
        fallback_genes = ["COL1A1", "COL1A2"]
        print(f"📋 Using minimal fallback genes: {fallback_genes}")
        return fallback_genes
    except json.JSONDecodeError as e:
        print(f"❌ Error parsing tissues JSON: {e}")
        return ["COL1A1"]

def load_sequences_from_ensembl_trees(config, gene_tree_generator, target_genes) -> Dict[str, List[Dict]]:
    """
    Load sequences from Ensembl gene trees for target genes.
    """
    print("🌳 Loading sequences from Ensembl gene trees...")

    # Check for checkpoint
    checkpoint_path = config.checkpoint_base_path / "ensembl_tree_sequences.pkl"

    if (config.enable_checkpointing and
        checkpoint_path.exists() and
        not config.force_recompute_checkpoints):

        print("📂 Loading Ensembl sequences from checkpoint...")
        try:
            with open(checkpoint_path, 'rb') as f:
                ensembl_sequences = pickle.load(f)
            print(f"✅ Loaded checkpoint: {sum(len(seqs) for seqs in ensembl_sequences.values())} sequences")
            return ensembl_sequences
        except Exception as e:
            print(f"❌ Failed to load checkpoint: {e}")

    # Fetch sequences from Ensembl
    print("🔍 Fetching sequences from Ensembl API...")
    ensembl_sequences = gene_tree_generator.fetch_sequences_for_gene_list(target_genes)

    # Save checkpoint
    if config.enable_checkpointing and ensembl_sequences:
        try:
            with open(checkpoint_path, 'wb') as f:
                pickle.dump(ensembl_sequences, f)
            print(f"💾 Saved Ensembl sequences checkpoint")
        except Exception as e:
            print(f"⚠️ Failed to save checkpoint: {e}")

    total_sequences = sum(len(seqs) for seqs in ensembl_sequences.values())
    print(f"✅ Ensembl loading complete: {total_sequences} sequences from {len(ensembl_sequences)} genes")

    return ensembl_sequences

def load_sequences_from_additional_fasta(config) -> List[Dict[str, Any]]:
    """
    Load sequences from additional FASTA files.
    """
    print("📄 Loading sequences from additional FASTA files...")

    fasta_sequences = []

    # Try the configured additional FASTA path
    if config.additional_collagen_fasta_path and config.additional_collagen_fasta_path.exists():
        print(f"📂 Loading from: {config.additional_collagen_fasta_path}")

        try:
            with open(config.additional_collagen_fasta_path, 'r') as f:
                for record in SeqIO.parse(f, 'fasta'):
                    sequence_data = {
                        'protein_id': record.id,
                        'description': record.description,
                        'sequence': str(record.seq),
                        'length': len(record.seq),
                        'source': 'additional_fasta',
                        'gene_family': config.target_gene_family,
                        'species': extract_species_from_fasta_header(record.description)
                    }
                    fasta_sequences.append(sequence_data)

            print(f"✅ Loaded {len(fasta_sequences)} sequences from additional FASTA")

        except Exception as e:
            print(f"❌ Error loading additional FASTA: {e}")
    else:
        print("⚠️ No additional FASTA file specified or file not found")

    return fasta_sequences

def extract_species_from_fasta_header(header: str) -> str:
    """Extract species name from FASTA header."""
    # Try different patterns for species extraction
    patterns = [
        r'\[([A-Za-z]+\s+[A-Za-z]+)\]',  # [Homo sapiens]
        r'OS=([A-Za-z]+\s+[A-Za-z]+)',   # OS=Homo sapiens
        r'_([A-Z]{3,5})\s',              # _HUMAN
    ]

    for pattern in patterns:
        match = re.search(pattern, header)
        if match:
            species = match.group(1)
            # Convert organism codes to species names
            if species.isupper():
                organism_codes = {
                    'HUMAN': 'Homo sapiens',
                    'MOUSE': 'Mus musculus',
                    'RAT': 'Rattus norvegicus',
                    'CHICK': 'Gallus gallus',
                    'ZEBRA': 'Danio rerio',
                    'BOVIN': 'Bos taurus'
                }
                return organism_codes.get(species, f"Unknown_{species}")
            else:
                return species.replace('_', ' ')

    return "Unknown_species"

def load_sequences_from_ncbi_nr(config, target_genes) -> List[Dict[str, Any]]:
    """
    Load additional sequences from NCBI NR database.
    """
    if not config.include_ncbi_nr_novel_species:
        print("⏭️ NCBI NR fetching disabled")
        return []

    print("🔬 Loading sequences from NCBI NR...")

    # Check for checkpoint
    checkpoint_path = config.checkpoint_base_path / "ncbi_nr_sequences.pkl"

    if (config.enable_checkpointing and
        checkpoint_path.exists() and
        not config.force_recompute_checkpoints):

        print("📂 Loading NCBI sequences from checkpoint...")
        try:
            with open(checkpoint_path, 'rb') as f:
                ncbi_sequences = pickle.load(f)
            print(f"✅ Loaded checkpoint: {len(ncbi_sequences)} sequences")
            return ncbi_sequences
        except Exception

SyntaxError: expected ':' (ipython-input-18-2923471440.py, line 201)

In [None]:
# ===== Test Cell 7: Digestion Pipeline =====
# Test the complete digestion pipeline and related functions

print("🧪 Testing Cell 7: Digestion Pipeline...")
print("=" * 60)

# Test 1: Check if digestion pipeline functions exist
print("TEST 1: Digestion Pipeline Functions Availability")
expected_functions = [
    'run_complete_digestion_pipeline',
    'generate_peptide_library',
    'analyze_peptide_conservation',
    'generate_final_reports'
]

# Note: Some functions might be methods of classes instead of standalone functions
available_pipeline_functions = []
missing_pipeline_functions = []

for func_name in expected_functions:
    try:
        func = globals()[func_name]
        available_pipeline_functions.append(func_name)
        print(f"   ✅ {func_name}")
    except KeyError:
        missing_pipeline_functions.append(func_name)
        print(f"   ❌ {func_name} (may be implemented as class method)")

# Don't fail if some are missing - they might be class methods
cell7_functions = len(available_pipeline_functions) >= 1

print("-" * 40)

# Test 2: Test enzyme digestion capabilities
print("TEST 2: Enzyme Digestion Capabilities")
if 'test_config' in globals():
    try:
        # Test enzyme rules access
        available_enzymes = list(test_config.enzyme_rules.keys())
        print(f"✅ Available enzymes: {len(available_enzymes)}")
        print(f"   - Enzymes: {available_enzymes[:5]}{'...' if len(available_enzymes) > 5 else ''}")

        # Test enzyme property access
        test_enzyme = "trypsin"
        if test_enzyme in test_config.enzyme_rules:
            enzyme_props = test_config.get_enzyme_properties(test_enzyme)
            if enzyme_props:
                print(f"✅ Enzyme properties accessible for {test_enzyme}")
                print(f"   - Cleavage rule: {enzyme_props.get('cleavage_rule', 'N/A')}")
                print(f"   - Description: {enzyme_props.get('description', 'N/A')}")
            else:
                print(f"❌ Could not get enzyme properties for {test_enzyme}")

        cell7_enzymes = True

    except Exception as e:
        print(f"❌ Enzyme digestion test failed: {e}")
        cell7_enzymes = False
else:
    print("❌ test_config not available")
    cell7_enzymes = False

print("-" * 40)

# Test 3: Test digestion with protein processor
print("TEST 3: Protein Processor Digestion")
if 'test_protein_proc' in globals():
    try:
        # Test if the protein processor has digestion capabilities
        if hasattr(test_protein_proc, 'run_digestion_pipeline'):
            print("✅ run_digestion_pipeline method available")
        else:
            print("❌ run_digestion_pipeline method not found")

        # Test basic sequence digestion
        if hasattr(test_protein_proc, 'digest_sequence_with_enzyme'):
            test_sequence = "MKRGKRGKRGPGPGPGPGPGPG"
            peptides = test_protein_proc.digest_sequence_with_enzyme(test_sequence, "trypsin")
            print(f"✅ Sequence digestion working: {len(peptides)} peptides generated")
            print(f"   - Example peptides: {peptides[:3] if len(peptides) >= 3 else peptides}")
        else:
            print("❌ digest_sequence_with_enzyme method not found")

        cell7_processor_digestion = True

    except Exception as e:
        print(f"❌ Protein processor digestion test failed: {e}")
        cell7_processor_digestion = False
else:
    print("❌ test_protein_proc not available from previous tests")
    cell7_processor_digestion = False

print("-" * 40)

# Test 4: Test checkpoint system for digestion
print("TEST 4: Digestion Checkpoint System")
if 'test_config' in globals():
    try:
        # Test checkpoint configuration
        print(f"✅ Checkpointing enabled: {test_config.enable_checkpointing}")
        print(f"✅ Force recompute: {test_config.force_recompute_checkpoints}")
        print(f"✅ Checkpoint path: {test_config.checkpoint_base_path}")

        # Test checkpoint methods
        if hasattr(test_config, 'save_checkpoint') and hasattr(test_config, 'load_checkpoint'):
            print("✅ Checkpoint save/load methods available")

            # Test checkpoint functionality with dummy data
            test_data = {"test": "data", "peptides": ["PEPTIDE1", "PEPTIDE2"]}
            checkpoint_name = "test_digestion_checkpoint"

            # Try to save and load
            saved = test_config.save_checkpoint(test_data, checkpoint_name)
            if saved:
                loaded_data = test_config.load_checkpoint(checkpoint_name)
                if loaded_data and loaded_data.get("test") == "data":
                    print("✅ Checkpoint save/load functional")
                else:
                    print("❌ Checkpoint load failed or data corrupted")
            else:
                print("⚠️ Checkpoint save disabled or failed")
        else:
            print("❌ Checkpoint methods not available")

        cell7_checkpoints = True

    except Exception as e:
        print(f"❌ Checkpoint system test failed: {e}")
        cell7_checkpoints = False
else:
    print("❌ test_config not available")
    cell7_checkpoints = False

print("-" * 40)

# Test 5: Test parallel processing configuration
print("TEST 5: Parallel Processing Configuration")
if 'test_config' in globals():
    try:
        # Test parallel processing settings
        print(f"✅ Parallel digestion enabled: {test_config.enable_parallel_digestion}")
        print(f"✅ Digestion workers: {test_config.num_digestion_workers}")
        print(f"✅ Parallel MRCA enabled: {test_config.enable_parallel_mrca}")
        print(f"✅ MRCA workers: {test_config.num_mrca_workers}")

        # Check if concurrent.futures is available for parallel processing
        try:
            import concurrent.futures
            print("✅ concurrent.futures available for parallel processing")
        except ImportError:
            print("❌ concurrent.futures not available")

        cell7_parallel = True

    except Exception as e:
        print(f"❌ Parallel processing configuration test failed: {e}")
        cell7_parallel = False
else:
    print("❌ test_config not available")
    cell7_parallel = False

print("-" * 40)

# Test 6: Test peptide filtering parameters
print("TEST 6: Peptide Filtering Parameters")
if 'test_config' in globals():
    try:
        # Test peptide length filtering
        print(f"✅ Min peptide length: {test_config.min_len}")
        print(f"✅ Max peptide length: {test_config.max_len}")
        print(f"✅ Missed cleavages allowed: {test_config.missed_cleavages}")
        print(f"✅ Convert I/L to B: {test_config.convert_il_to_b}")

        # Test display parameters
        print(f"✅ Min conserved peptide length (display): {test_config.min_conserved_peptide_length_display}")

        # Test anchor peptide parameters
        if hasattr(test_config, 'min_anchor_peptide_length_num'):
            print(f"✅ Min anchor peptide length: {test_config.min_anchor_peptide_length_num}")
            print(f"✅ Max anchor peptide length: {test_config.max_anchor_peptide_length_num}")
            print(f"✅ Min Gly proportion at 3rd pos: {test_config.min_gly_at_third_pos_proportion}")

        # Validate parameter ranges
        valid_params = (
            6 <= test_config.min_len <= test_config.max_len <= 50 and
            0 <= test_config.missed_cleavages <= 5 and
            0 <= test_config.min_conserved_peptide_length_display <= 20
        )

        if valid_params:
            print("✅ All peptide filtering parameters in valid ranges")
        else:
            print("⚠️ Some peptide filtering parameters may be outside expected ranges")

        cell7_filtering = True

    except Exception as e:
        print(f"❌ Peptide filtering parameters test failed: {e}")
        cell7_filtering = False
else:
    print("❌ test_config not available")
    cell7_filtering = False

print("-" * 40)

# Test 7: Test complete pipeline integration readiness
print("TEST 7: Pipeline Integration Readiness")
integration_ready = True
integration_issues = []

# Check if all prerequisite components are available
prerequisites = [
    ('test_config', 'Configuration'),
    ('test_protein_proc', 'Protein Processor'),
    ('test_gene_tree_gen', 'Gene Tree Generator'),
    ('test_target_genes', 'Target Genes')
]

for var_name, component_name in prerequisites:
    if var_name in globals():
        print(f"   ✅ {component_name} available")
    else:
        print(f"   ❌ {component_name} missing")
        integration_ready = False
        integration_issues.append(component_name)

# Check if protein processor has loaded data capability
if 'test_protein_proc' in globals():
    if hasattr(test_protein_proc, 'master_df'):
        print("   ✅ Master DataFrame initialized")
    else:
        print("   ❌ Master DataFrame not found")
        integration_ready = False
        integration_issues.append("Master DataFrame")

if integration_ready:
    print("✅ All components ready for full pipeline execution")
    cell7_integration = True
else:
    print(f"❌ Integration issues: {', '.join(integration_issues)}")
    cell7_integration = False

print("=" * 60)

# Summary for Cell 7
print("📊 CELL 7 TESTING SUMMARY:")
print(f"✅ Pipeline Functions: {'PASS' if cell7_functions else 'FAIL'}")
print(f"✅ Enzyme Capabilities: {'PASS' if cell7_enzymes else 'FAIL'}")
print(f"✅ Processor Digestion: {'PASS' if cell7_processor_digestion else 'FAIL'}")
print(f"✅ Checkpoint System: {'PASS' if cell7_checkpoints else 'FAIL'}")
print(f"✅ Parallel Processing: {'PASS' if cell7_parallel else 'FAIL'}")
print(f"✅ Filtering Parameters: {'PASS' if cell7_filtering else 'FAIL'}")
print(f"✅ Integration Ready: {'PASS' if cell7_integration else 'FAIL'}")

cell7_overall = (cell7_functions and cell7_enzymes and cell7_processor_digestion and
                cell7_checkpoints and cell7_parallel and cell7_filtering and cell7_integration)

print(f"\n🎯 CELL 7 STATUS: {'READY' if cell7_overall else 'NEEDS FIXES'}")

if cell7_overall:
    print("\n✅ Digestion Pipeline is ready!")
    print("   - All enzyme digestion capabilities functional")
    print("   - Checkpoint system operational")
    print("   - Parallel processing configured")
    print("   - Peptide filtering parameters validated")
    print("   - Integration with previous components verified")
    print("   - Ready for full pipeline execution")
else:
    print("\n🔧 Fix Cell 7 issues before running full pipeline")

print("=" * 60)

In [None]:
# ===== Test Cell 7: Digestion Pipeline =====
# Test the complete digestion pipeline and related functions

print("🧪 Testing Cell 7: Digestion Pipeline...")
print("=" * 60)

# Test 1: Check if digestion pipeline functions exist
print("TEST 1: Digestion Pipeline Functions Availability")
expected_functions = [
    'run_complete_digestion_pipeline',
    'generate_peptide_library',
    'analyze_peptide_conservation',
    'generate_final_reports'
]

# Note: Some functions might be methods of classes instead of standalone functions
available_pipeline_functions = []
missing_pipeline_functions = []

for func_name in expected_functions:
    try:
        func = globals()[func_name]
        available_pipeline_functions.append(func_name)
        print(f"   ✅ {func_name}")
    except KeyError:
        missing_pipeline_functions.append(func_name)
        print(f"   ❌ {func_name} (may be implemented as class method)")

# Don't fail if some are missing - they might be class methods
cell7_functions = len(available_pipeline_functions) >= 1

print("-" * 40)

# Test 2: Test enzyme digestion capabilities
print("TEST 2: Enzyme Digestion Capabilities")
if 'test_config' in globals():
    try:
        # Test enzyme rules access
        available_enzymes = list(test_config.enzyme_rules.keys())
        print(f"✅ Available enzymes: {len(available_enzymes)}")
        print(f"   - Enzymes: {available_enzymes[:5]}{'...' if len(available_enzymes) > 5 else ''}")

        # Test enzyme property access
        test_enzyme = "trypsin"
        if test_enzyme in test_config.enzyme_rules:
            enzyme_props = test_config.get_enzyme_properties(test_enzyme)
            if enzyme_props:
                print(f"✅ Enzyme properties accessible for {test_enzyme}")
                print(f"   - Cleavage rule: {enzyme_props.get('cleavage_rule', 'N/A')}")
                print(f"   - Description: {enzyme_props.get('description', 'N/A')}")
            else:
                print(f"❌ Could not get enzyme properties for {test_enzyme}")

        cell7_enzymes = True

    except Exception as e:
        print(f"❌ Enzyme digestion test failed: {e}")
        cell7_enzymes = False
else:
    print("❌ test_config not available")
    cell7_enzymes = False

print("-" * 40)

# Test 3: Test digestion with protein processor
print("TEST 3: Protein Processor Digestion")
if 'test_protein_proc' in globals():
    try:
        # Test if the protein processor has digestion capabilities
        if hasattr(test_protein_proc, 'run_digestion_pipeline'):
            print("✅ run_digestion_pipeline method available")
        else:
            print("❌ run_digestion_pipeline method not found")

        # Test basic sequence digestion
        if hasattr(test_protein_proc, 'digest_sequence_with_enzyme'):
            test_sequence = "MKRGKRGKRGPGPGPGPGPGPG"
            peptides = test_protein_proc.digest_sequence_with_enzyme(test_sequence, "trypsin")
            print(f"✅ Sequence digestion working: {len(peptides)} peptides generated")
            print(f"   - Example peptides: {peptides[:3] if len(peptides) >= 3 else peptides}")
        else:
            print("❌ digest_sequence_with_enzyme method not found")

        cell7_processor_digestion = True

    except Exception as e:
        print(f"❌ Protein processor digestion test failed: {e}")
        cell7_processor_digestion = False
else:
    print("❌ test_protein_proc not available from previous tests")
    cell7_processor_digestion = False

print("-" * 40)

# Test 4: Test checkpoint system for digestion
print("TEST 4: Digestion Checkpoint System")
if 'test_config' in globals():
    try:
        # Test checkpoint configuration
        print(f"✅ Checkpointing enabled: {test_config.enable_checkpointing}")
        print(f"✅ Force recompute: {test_config.force_recompute_checkpoints}")
        print(f"✅ Checkpoint path: {test_config.checkpoint_base_path}")

        # Test checkpoint methods
        if hasattr(test_config, 'save_checkpoint') and hasattr(test_config, 'load_checkpoint'):
            print("✅ Checkpoint save/load methods available")

            # Test checkpoint functionality with dummy data
            test_data = {"test": "data", "peptides": ["PEPTIDE1", "PEPTIDE2"]}
            checkpoint_name = "test_digestion_checkpoint"

            # Try to save and load
            saved = test_config.save_checkpoint(test_data, checkpoint_name)
            if saved:
                loaded_data = test_config.load_checkpoint(checkpoint_name)
                if loaded_data and loaded_data.get("test") == "data":
                    print("✅ Checkpoint save/load functional")
                else:
                    print("❌ Checkpoint load failed or data corrupted")
            else:
                print("⚠️ Checkpoint save disabled or failed")
        else:
            print("❌ Checkpoint methods not available")

        cell7_checkpoints = True

    except Exception as e:
        print(f"❌ Checkpoint system test failed: {e}")
        cell7_checkpoints = False
else:
    print("❌ test_config not available")
    cell7_checkpoints = False

print("-" * 40)

# Test 5: Test parallel processing configuration
print("TEST 5: Parallel Processing Configuration")
if 'test_config' in globals():
    try:
        # Test parallel processing settings
        print(f"✅ Parallel digestion enabled: {test_config.enable_parallel_digestion}")
        print(f"✅ Digestion workers: {test_config.num_digestion_workers}")
        print(f"✅ Parallel MRCA enabled: {test_config.enable_parallel_mrca}")
        print(f"✅ MRCA workers: {test_config.num_mrca_workers}")

        # Check if concurrent.futures is available for parallel processing
        try:
            import concurrent.futures
            print("✅ concurrent.futures available for parallel processing")
        except ImportError:
            print("❌ concurrent.futures not available")

        cell7_parallel = True

    except Exception as e:
        print(f"❌ Parallel processing configuration test failed: {e}")
        cell7_parallel = False
else:
    print("❌ test_config not available")
    cell7_parallel = False

print("-" * 40)

# Test 6: Test peptide filtering parameters
print("TEST 6: Peptide Filtering Parameters")
if 'test_config' in globals():
    try:
        # Test peptide length filtering
        print(f"✅ Min peptide length: {test_config.min_len}")
        print(f"✅ Max peptide length: {test_config.max_len}")
        print(f"✅ Missed cleavages allowed: {test_config.missed_cleavages}")
        print(f"✅ Convert I/L to B: {test_config.convert_il_to_b}")

        # Test display parameters
        print(f"✅ Min conserved peptide length (display): {test_config.min_conserved_peptide_length_display}")

        # Test anchor peptide parameters
        if hasattr(test_config, 'min_anchor_peptide_length_num'):
            print(f"✅ Min anchor peptide length: {test_config.min_anchor_peptide_length_num}")
            print(f"✅ Max anchor peptide length: {test_config.max_anchor_peptide_length_num}")
            print(f"✅ Min Gly proportion at 3rd pos: {test_config.min_gly_at_third_pos_proportion}")

        # Validate parameter ranges
        valid_params = (
            6 <= test_config.min_len <= test_config.max_len <= 50 and
            0 <= test_config.missed_cleavages <= 5 and
            0 <= test_config.min_conserved_peptide_length_display <= 20
        )

        if valid_params:
            print("✅ All peptide filtering parameters in valid ranges")
        else:
            print("⚠️ Some peptide filtering parameters may be outside expected ranges")

        cell7_filtering = True

    except Exception as e:
        print(f"❌ Peptide filtering parameters test failed: {e}")
        cell7_filtering = False
else:
    print("❌ test_config not available")
    cell7_filtering = False

print("-" * 40)

# Test 7: Test complete pipeline integration readiness
print("TEST 7: Pipeline Integration Readiness")
integration_ready = True
integration_issues = []

# Check if all prerequisite components are available
prerequisites = [
    ('test_config', 'Configuration'),
    ('test_protein_proc', 'Protein Processor'),
    ('test_gene_tree_gen', 'Gene Tree Generator'),
    ('test_target_genes', 'Target Genes')
]

for var_name, component_name in prerequisites:
    if var_name in globals():
        print(f"   ✅ {component_name} available")
    else:
        print(f"   ❌ {component_name} missing")
        integration_ready = False
        integration_issues.append(component_name)

# Check if protein processor has loaded data capability
if 'test_protein_proc' in globals():
    if hasattr(test_protein_proc, 'master_df'):
        print("   ✅ Master DataFrame initialized")
    else:
        print("   ❌ Master DataFrame not found")
        integration_ready = False
        integration_issues.append("Master DataFrame")

if integration_ready:
    print("✅ All components ready for full pipeline execution")
    cell7_integration = True
else:
    print(f"❌ Integration issues: {', '.join(integration_issues)}")
    cell7_integration = False

print("=" * 60)

# Summary for Cell 7
print("📊 CELL 7 TESTING SUMMARY:")
print(f"✅ Pipeline Functions: {'PASS' if cell7_functions else 'FAIL'}")
print(f"✅ Enzyme Capabilities: {'PASS' if cell7_enzymes else 'FAIL'}")
print(f"✅ Processor Digestion: {'PASS' if cell7_processor_digestion else 'FAIL'}")
print(f"✅ Checkpoint System: {'PASS' if cell7_checkpoints else 'FAIL'}")
print(f"✅ Parallel Processing: {'PASS' if cell7_parallel else 'FAIL'}")
print(f"✅ Filtering Parameters: {'PASS' if cell7_filtering else 'FAIL'}")
print(f"✅ Integration Ready: {'PASS' if cell7_integration else 'FAIL'}")

cell7_overall = (cell7_functions and cell7_enzymes and cell7_processor_digestion and
                cell7_checkpoints and cell7_parallel and cell7_filtering and cell7_integration)

print(f"\n🎯 CELL 7 STATUS: {'READY' if cell7_overall else 'NEEDS FIXES'}")

if cell7_overall:
    print("\n✅ Digestion Pipeline is ready!")
    print("   - All enzyme digestion capabilities functional")
    print("   - Checkpoint system operational")
    print("   - Parallel processing configured")
    print("   - Peptide filtering parameters validated")
    print("   - Integration with previous components verified")
    print("   - Ready for full pipeline execution")
else:
    print("\n🔧 Fix Cell 7 issues before running full pipeline")

print("=" * 60)

# Cell 8 Pipeline Exection
## Running the PhASTM Workflow

This final cell initializes all the classes and runs the complete, restored pipeline from data loading through to advanced analysis and digestion.

In [None]:

def run_full_pipeline():
    print("🚀 Initializing PhASTM System...")
    try:
        # Instead of using globals(), explicitly pass the required parameters
        config_params = {
            'workspace_root': workspace_root,
            'additional_fasta': additional_fasta,  # Add the missing additional_fasta parameter
            'target_gene_family': target_gene_family,
            'reference_tissue': reference_tissue,
            'enzyme': enzyme,
            'entrez_email': entrez_email,
            'enable_checkpointing': enable_checkpointing,
            'force_recompute_checkpoints': force_recompute_checkpoints,
            'force_update_architecture_cache': force_update_architecture_cache,
            'include_ncbi_nr_novel_species': include_ncbi_nr_novel_species,
            'max_ncbi_nr_sequences': max_ncbi_nr_sequences,
            'missed_cleavages': missed_cleavages,
            'min_len': min_len,
            'max_len': max_len,
            'convert_il_to_b': convert_il_to_b,
            'enable_parallel_digestion': enable_parallel_digestion,
            'num_digestion_workers': num_digestion_workers,
            'enable_parallel_mrca': enable_parallel_mrca,
            'num_mrca_workers': num_mrca_workers,
            'use_full_model_species_list': use_full_model_species_list,
            'model_species': model_species,
            'min_anchor_peptide_length_num': min_anchor_peptide_length_num,
            'max_anchor_peptide_length_num': max_anchor_peptide_length_num,
            'min_gly_at_third_pos_proportion': min_gly_at_third_pos_proportion,
            'top_n_anchor_candidates_report': top_n_anchor_candidates_report,
            'reference_species_for_anchoring': reference_species_for_anchoring,
            'min_conserved_peptide_length_display': min_conserved_peptide_length_display
        }

        config = PhASTMConfig(**config_params)

        if config.target_gene_family == 'Collagen':
            protein_class_instance = Collagen(config)
        elif config.target_gene_family == 'Keratin':
            protein_class_instance = Keratin(config)
        else:
            protein_class_instance = ProteinClass(config)

        processor = ProteinProcessor(config, protein_class_instance)

        # --- Run Workflow ---
        print("\n--- Starting Data Loading ---")
        processor.load_data()

        print("\n--- Starting Sequence Analysis ---")
        processor.analyze_sequences()

        print("\n--- Starting Digestion Pipeline ---")
        peptide_map = processor.run_digestion_pipeline()

        print("\n🎉 PhASTM Pipeline Completed Successfully! 🎉")
        if peptide_map is not None:
            print(f"Generated {len(peptide_map)} peptides.")
            display(peptide_map.head())

    except Exception as e:
        print(f"❌ A fatal error occurred in the pipeline: {e}")
        import traceback
        traceback.print_exc()

# --- Execute the pipeline ---
if __name__ == '__main__' and 'get_ipython' in locals():
    if entrez_email == "your.email@example.com":
        print("‼️ ACTION REQUIRED: Please update 'entrez_email' in Cell 2.")
    else:
        run_full_pipeline()