In [0]:
# # Get all tables from the schema
# tables = spark.catalog.listTables("eldenringcatalog.silver")

# for t in tables:
#     table_name = f"eldenringcatalog.silver.{t.name}"
#     print(f"Dropping table: {table_name}")
#     spark.sql(f"DROP TABLE IF EXISTS {table_name}")

%md
###**1-IMPORTS AND CONFIGURATION**

In [0]:
from pyspark.sql.functions import (
    col, lit, current_timestamp, when, coalesce, explode, split,
    from_json, regexp_extract, regexp_replace, trim, upper, lower,
    udf, struct, array, size, concat_ws, substring,expr,
    sum as spark_sum, count as spark_count,
    monotonically_increasing_id, row_number)

from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    DoubleType, BooleanType, ArrayType, MapType
)
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
import logging
import ast
import json
import re
from datetime import datetime
from typing import Dict, List, Optional,Any




In [0]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [0]:
# Configuration
CONFIG = {
    "catalog": "eldenringcatalog",
    "bronze_schema": "bronze",
    "silver_schema": "silver",
    "checkpoint_path": "/mnt/delta/checkpoints/silver/",
    "batch_id": datetime.now().strftime("%Y%m%d_%H%M%S")
}

%md
###**2-UNIVERSAL DICT PARSER (Handles Python dict strings and clean numeric values)**

In [0]:
def safe_parse_dict(dict_str: str) -> Dict[str, str]:
    "parser for Python dict strings"

    if not dict_str or dict_str in ['null', 'None', '-', '']:
        return {}
    
    try :
        # Remove extra whitespace and standardize
        cleaned = dict_str.strip()

        # Handle list wrapper [{}]
        if cleaned.startswith('[') and cleaned.endswith(']'):
            cleaned = cleaned[1:-1].strip()
            # If multiple dicts in list, take first one
            if '},{' in cleaned:
                cleaned = cleaned.split('},{')[0] + '}'
        
        # Try ast.literal_eval (safest for Python dicts)
        try:
            parsed = ast.literal_eval(cleaned)
            if isinstance(parsed, dict):
                return parsed
            elif isinstance(parsed, list) and len(parsed) > 0:
                return parsed[0] if isinstance(parsed[0], dict) else {}
        except:
            pass

        # Try json.loads as fallback
        try :
            parsed = json.loads(cleaned)
            if isinstance(parsed, dict):
                return parsed
            elif isinstance(parsed, list) and len(parsed) > 0:
                return parsed[0] if isinstance(parsed[0], dict) else {}
        except:
            pass

        # Manual regex parsing as last resort

        if '{' in cleaned and '}' in cleaned:
            # Extract key-value pairs
            pattern = r"['\"]?(\w+(?:\s+\w+\.?)?)['\"]?\s*:\s*['\"]?([^,'\"]+)['\"]?"
            matches = re.findall(pattern, cleaned)
            if matches:
                return {k.strip(): v.strip() for k, v in matches}
        return {}
    except Exception as e:
        logger.warning(f"Failed to parse dict: {dict_str[:100]} - {e}")
        return {}    
            
def clean_numeric_value(value: str) -> Optional[float]:
    "Clean and convert string numbers to float"
    if not value or value in ['-', 'null', 'None']:
        return None
    try:
         # Remove spaces and convert
        cleaned = str(value).strip()
        return float(cleaned) if cleaned else None
    except :
        return None
    

## **3-STAT PARSING UDFs (With correct key mappings)**

In [0]:
def parse_attack_stats(attack_str: str) -> Dict[str, int]:

    "Parse attack stats - handles multiple key names"
    default = {'physical': 0, 'magic': 0, 'fire': 0, 'lightning': 0, 'holy': 0, 'stamina': 0, 'critical': 0}

    parsed = safe_parse_dict(attack_str)
    if not parsed:
        return default
    
    # Key mapping (handles different abbreviations)
    key_map = {
        'phy': 'physical', 'physical': 'physical',
        'mag': 'magic', 'magic': 'magic',
        'fir': 'fire', 'fire': 'fire',
        'lit': 'lightning', 'lig': 'lightning', 'lightning': 'lightning',
        'hol': 'holy', 'holy': 'holy',
        'sta': 'stamina', 'stamina': 'stamina',
        'cri': 'critical', 'critical': 'critical'
    }

    for key, value in parsed.items():
        key_lower = key.lower().strip()[:3]  # First 3 chars
        if key_lower in key_map:
            stat_name = key_map[key_lower]
            cleaned_val = clean_numeric_value(value)
            if cleaned_val is not None:
                default[stat_name] = int(cleaned_val)

    return default


def parse_defense_stats(defense_str: str) -> Dict[str, float]:

    "Parse defense/guard stats"

    default = {'physical': 0.0, 'magic': 0.0, 'fire': 0.0, 'lightning': 0.0, 'holy': 0.0, 'boost': 0.0,'resistance': 0.0}

    parsed = safe_parse_dict(defense_str)

    if not parsed:
        return default
    
    key_map = {
        'phy': 'physical',
        'mag': 'magic',
        'fir': 'fire',
        'lit': 'lightning', 'lig': 'lightning',
        'hol': 'holy',
        'bst': 'boost', 'boost': 'boost', 'gua': 'boost',
        'rst': 'resistance', 'resistance': 'resistance'  # NEW: Added Rst
    }

    for key, value in parsed.items():
        key_lower = key.lower().strip()[:3]
        if key_lower in key_map:
            stat_name = key_map[key_lower]
            cleaned_val = clean_numeric_value(value)
            if cleaned_val is not None:
                default[stat_name] = float(cleaned_val)
    
    return default

def parse_dmg_negation_stats(negation_str: str) -> Dict[str, float]:

    "Parse damage negation (armor) - handles extended format"

    default = {
        'physical': 0.0, 'vs_strike': 0.0, 'vs_slash': 0.0, 'vs_pierce': 0.0,
        'magic': 0.0, 'fire': 0.0, 'lightning': 0.0, 'holy': 0.0
    }

    parsed = safe_parse_dict(negation_str)
    if not parsed:
        return default
    
    for key, value in parsed.items():
        key_clean = key.lower().strip()
        cleaned_val = clean_numeric_value(value)

        if cleaned_val is None:
            continue

        if key_clean.startswith('phy'):
            default['physical'] = float(cleaned_val)
        elif 'str' in key_clean:
            default['vs_strike'] = float(cleaned_val)
        elif 'sla' in key_clean:
            default['vs_slash'] = float(cleaned_val)
        elif 'pie' in key_clean:
            default['vs_pierce'] = float(cleaned_val)
        elif key_clean.startswith('mag'):
            default['magic'] = float(cleaned_val)
        elif key_clean.startswith('fir'):
            default['fire'] = float(cleaned_val)
        elif key_clean.startswith('lit') or key_clean.startswith('lig'):
            default['lightning'] = float(cleaned_val)
        elif key_clean.startswith('hol'):
            default['holy'] = float(cleaned_val)
    
    return default

def parse_scaling_stats(scaling_str: str) -> Dict[str, str]:
    "Parse scaling grades"

    default = {'str': '-', 'dex': '-', 'int': '-', 'fai': '-', 'arc': '-'}

    parsed = safe_parse_dict(scaling_str)
    if not parsed:
        return default
    
    for key, value in parsed.items():
        key_lower = key.lower().strip()[:3]
        if key_lower in default:
            default[key_lower] = str(value).strip() if value else '-'
    
    return default

def parse_requirements_stats(req_str: str) -> Dict[str, int]:
    "Parse attribute requirements"
    default = {'str': 0, 'dex': 0, 'int': 0, 'fai': 0, 'arc': 0}
    parsed = safe_parse_dict(req_str)
    if not parsed:
        return default
    
    for key, value in parsed.items():
        key_lower = key.lower().strip()[:3]
        if key_lower in default:
            cleaned_val = clean_numeric_value(value)
            if cleaned_val is not None:
                default[key_lower] = int(cleaned_val)
    
    return default


def parse_resistance_stats(resistance_str: str) -> Dict[str, float]:
    "Parse resistance stats (armor)"

    default = {'immunity': 0.0, 'robustness': 0.0, 'focus': 0.0, 'vitality': 0.0, 'poise': 0.0}

    parsed = safe_parse_dict(resistance_str)
    if not parsed:
        return default
    
    key_map = {
        'imm': 'immunity', 'immunity': 'immunity',
        'rob': 'robustness', 'robustness': 'robustness',
        'foc': 'focus', 'focus': 'focus',
        'vit': 'vitality', 'vitality': 'vitality',
        'poi': 'poise', 'poise': 'poise'
    }
     
    for key, value in parsed.items():
        key_lower = key.lower().strip()[:3]
        if key_lower in key_map:
            stat_name = key_map[key_lower]
            cleaned_val = clean_numeric_value(value)
            if cleaned_val is not None:
                default[stat_name] = float(cleaned_val)
    
    return default
def parse_fp_cost(fp_str) -> Dict[str, any]:
    """
    Parse FP cost from all possible formats in Elden Ring dataset
    
    Input formats and their meaning:
    1. 0 or "0" → No FP cost
    2. 6 or "6" → Single fixed cost (6 FP)
    3. "10 - 25" → Dual mode: 10 FP (light), 25 FP (heavy)
    4. "6 (-12)" → Combo: 6 FP initial, 12 FP follow-up
    5. "- (6 8)" → Multi-phase: 6 FP or 8 FP
    6. "2 (per swing)" → Continuous: 2 FP per hit/tick
    7. "- 6" → Badly formatted, treat as 6 FP
    
    Output: {'min': int, 'max': int, 'per_use': int, 'type': str}
    """
    default = {'min': 0, 'max': 0, 'per_use': 0, 'type': 'none'}
    
    # Handle None/null
    if fp_str is None:
        return default
    
    # Handle numeric types (int/float)
    if isinstance(fp_str, (int, float)):
        cost = int(fp_str)
        return {
            'min': cost,
            'max': cost,
            'per_use': 0,
            'type': 'single'
        }
    
    # Convert to string for string operations
    try:
        cleaned = str(fp_str).strip().lower()
    except:
        return default
    
    # Handle empty/dash values
    if not cleaned or cleaned in ['-', '', 'null', 'none', 'nan']:
        return default
    
    try:
        # PATTERN 1: "2 (per swing)" or "3 (per hit)"
        if 'per swing' in cleaned or 'per hit' in cleaned or 'per tick' in cleaned:
            match = re.search(r'(\d+)', cleaned)
            if match:
                per_use_cost = int(match.group(1))
                return {
                    'min': per_use_cost,
                    'max': per_use_cost,
                    'per_use': per_use_cost,
                    'type': 'continuous'
                }
        
        # PATTERN 2: "10 - 25" (dual mode: light/heavy)
        if ' - ' in cleaned or ' – ' in cleaned:
            parts = re.findall(r'\d+', cleaned)
            if len(parts) >= 2:
                return {
                    'min': int(parts[0]),
                    'max': int(parts[1]),
                    'per_use': 0,
                    'type': 'dual'
                }
        
        # PATTERN 3: "6 (-12)" or "5(-8)" (combo: initial + follow-up)
        match = re.search(r'(\d+)\s*\(\s*-?\s*(\d+)\s*\)', cleaned)
        if match:
            initial = int(match.group(1))
            followup = int(match.group(2))
            return {
                'min': initial,
                'max': followup,
                'per_use': 0,
                'type': 'combo'
            }
        
        # PATTERN 4: "- (6 8)" or "(6 8)" (multi-phase)
        match = re.search(r'\(\s*(\d+)\s+(\d+)\s*\)', cleaned)
        if match:
            phase1 = int(match.group(1))
            phase2 = int(match.group(2))
            return {
                'min': min(phase1, phase2),
                'max': max(phase1, phase2),
                'per_use': 0,
                'type': 'multiphase'
            }
        
        # PATTERN 5: Single number (including "- 6" badly formatted)
        # Extract ALL numbers and take the first valid one
        numbers = [int(n) for n in re.findall(r'\d+', cleaned)]
        if numbers:
            cost = numbers[0]
            return {
                'min': cost,
                'max': cost,
                'per_use': 0,
                'type': 'single'
            }
        
        return default
        
    except Exception as e:
        logger.warning(f"Failed to parse FP cost: {fp_str} - {e}")
        return default

def parse_boss_hp(hp_str) -> Dict[str, any]:
    """
    Parse boss HP from various formats
    
    Handles:
    1. "22,571" → single HP
    2. "7,560 (phase 1) 13,608 (phase 2)" → multi-phase
    3. "≈ 13,339 (GOD)" → approximate with classification
    4. "TBD" → unknown
    5. None/empty → 0
    
    Returns: {'hp_min': int, 'hp_max': int, 'hp_type': str, 'phases': int, 'classification': str}
    """
    default = {
        'hp_min': 0,
        'hp_max': 0,
        'hp_type': 'unknown',
        'phases': 1,
        'classification': 'Normal'
    }
    
    # Handle None, null, TBD
    if not hp_str or hp_str in ['TBD', 'null', 'None', '']:
        return default
    
    # Handle numeric types
    if isinstance(hp_str, (int, float)):
        hp_val = int(hp_str)
        return {
            'hp_min': hp_val,
            'hp_max': hp_val,
            'hp_type': 'single',
            'phases': 1,
            'classification': 'Normal'
        }
    
    try:
        hp_clean = str(hp_str).strip()
        
        # PATTERN 1: Approximate with classification "≈ 13,339 (GOD)"
        if '≈' in hp_clean:
            classification_match = re.search(r'\(([^)]+)\)', hp_clean)
            classification = classification_match.group(1) if classification_match else 'Approximate'
            
            numbers = re.findall(r'[\d,]+', hp_clean.replace('≈', ''))
            if numbers:
                hp_value = int(numbers[0].replace(',', ''))
                return {
                    'hp_min': hp_value,
                    'hp_max': hp_value,
                    'hp_type': 'approximate',
                    'phases': 1,
                    'classification': classification
                }
        
        # PATTERN 2: Phase-based "7,560 (phase 1) 13,608 (phase 2)"
        if 'phase' in hp_clean.lower():
            phase_matches = re.findall(r'([\d,]+)\s*\(phase\s+\d+\)', hp_clean, re.IGNORECASE)
            if phase_matches:
                hp_values = [int(m.replace(',', '')) for m in phase_matches]
                return {
                    'hp_min': min(hp_values),
                    'hp_max': max(hp_values),
                    'hp_type': 'phased',
                    'phases': len(hp_values),
                    'classification': 'Multi-Phase'
                }
        
        # PATTERN 3: Classification in parentheses
        classification_match = re.search(r'\(([^)]+)\)', hp_clean)
        classification = classification_match.group(1) if classification_match else 'Normal'
        
        # PATTERN 4: Simple numeric with commas "22,571"
        numbers = re.findall(r'[\d,]+', hp_clean)
        if numbers:
            hp_value = int(numbers[0].replace(',', ''))
            return {
                'hp_min': hp_value,
                'hp_max': hp_value,
                'hp_type': 'single',
                'phases': 1,
                'classification': classification
            }
        
        return default
        
    except Exception as e:
        logger.warning(f"Failed to parse HP: {hp_str} - {e}")
        return default
    
def parse_passive_effects(effects_str: str) -> Dict[str, Any]:
    """
    Parse passive effects JSON string to structured dict
    
    Handles all formats:
    - "{'Any': '- '}"                          → No effect
    - "{'Any': '(66) Frostbite'}"             → Single buildup: 66
    - "{'Any': '(88)(38) Poison'}"            → Dual buildup: 88 primary, 38 secondary
    - "{'Any': '(73)(38) Frostbite'}"         → Dual buildup: 73 primary, 38 secondary
    - "{'Any': '(66) Frostbite (45) Poison'}" → Multiple different effects
    
    Output: {
        'effect_type': 'poison',             # Effect name
        'buildup_primary': 88,               # Primary buildup value
        'buildup_secondary': 38,             # Secondary buildup value (0 if none)
        'has_dual_buildup': True,            # Flag for dual values
        'has_effect': True                   # Boolean flag
    }
    """
    default_result = {
        'effect_type': None,
        'buildup_primary': 0,
        'buildup_secondary': 0,
        'has_dual_buildup': False,
        'has_effect': False
    }
    
    if not effects_str or effects_str == 'null' or str(effects_str).strip() == '':
        return default_result
    
    try:
        # Clean and parse JSON
        cleaned = str(effects_str).replace("'", '"').strip()
        effects_dict = json.loads(cleaned)
        
        # Get the 'Any' value
        effect_value = effects_dict.get('Any', '').strip()
        
        # Check if no effect
        if effect_value == '-' or effect_value == '' or effect_value == '- ':
            return default_result
        
        import re
        
        # Pattern 1: Dual buildup with SAME effect type
        # Example: "(88)(38) Poison" or "(73)(38) Frostbite"
        dual_match = re.search(r'\((\d+)\)\((\d+)\)\s*([A-Za-z\s]+)$', effect_value)
        
        if dual_match:
            primary_buildup = int(dual_match.group(1))
            secondary_buildup = int(dual_match.group(2))
            effect_name = dual_match.group(3).strip().lower()
            
            # Normalize effect names
            effect_mapping = {
                'hemorrhage': 'bleed',
                'scarlet rot': 'scarlet_rot',
                'death': 'death_blight'
            }
            effect_name = effect_mapping.get(effect_name, effect_name)
            
            return {
                'effect_type': effect_name,
                'buildup_primary': primary_buildup,
                'buildup_secondary': secondary_buildup,
                'has_dual_buildup': True,
                'has_effect': True
            }
        
        # Pattern 2: Single buildup value
        # Example: "(66) Frostbite" or "(57) Hemorrhage"
        single_match = re.search(r'\((\d+)\)\s*([A-Za-z\s]+)', effect_value)
        
        if single_match:
            buildup = int(single_match.group(1))
            effect_name = single_match.group(2).strip().lower()
            
            # Normalize effect names
            effect_mapping = {
                'hemorrhage': 'bleed',
                'scarlet rot': 'scarlet_rot',
                'death': 'death_blight'
            }
            effect_name = effect_mapping.get(effect_name, effect_name)
            
            return {
                'effect_type': effect_name,
                'buildup_primary': buildup,
                'buildup_secondary': 0,
                'has_dual_buildup': False,
                'has_effect': True
            }
        
        return default_result
        
    except Exception as e:
        logger.warning(f"Failed to parse passive effects: {effects_str[:100]}... - {e}")
        return default_result

fp_cost_schema = StructType([
    StructField("min", IntegerType(), False),
    StructField("max", IntegerType(), False),
    StructField("per_use", IntegerType(), False),
    StructField("type", StringType(), False)
])

hp_schema = StructType([
    StructField("hp_min", IntegerType(), False),
    StructField("hp_max", IntegerType(), False),
    StructField("hp_type", StringType(), False),
    StructField("phases", IntegerType(), False),
    StructField("classification", StringType(), False)
])

passive_effects_schema = StructType([
    StructField('effect_type', StringType(), True),
    StructField('buildup_primary', IntegerType(), True),
    StructField('buildup_secondary', IntegerType(), True),
    StructField('has_dual_buildup', BooleanType(), True),
    StructField('has_effect', BooleanType(), True)
])
# Register UDFs
parse_attack_udf = udf(parse_attack_stats, MapType(StringType(), IntegerType()))
parse_defense_udf = udf(parse_defense_stats, MapType(StringType(), DoubleType()))
parse_negation_udf = udf(parse_dmg_negation_stats, MapType(StringType(), DoubleType()))
parse_scaling_udf = udf(parse_scaling_stats, MapType(StringType(), StringType()))
parse_required_udf = udf(parse_requirements_stats, MapType(StringType(), IntegerType()))
parse_resistance_udf = udf(parse_resistance_stats, MapType(StringType(), DoubleType()))   
parse_fp_udf = udf(parse_fp_cost, fp_cost_schema)  
parse_boss_hp_udf = udf(parse_boss_hp, hp_schema)   
parse_passive_effects_udf = udf(parse_passive_effects, passive_effects_schema)







## **4-SILVER TRANSFORMATIONS - ALL 28 TABLES**

In [0]:
class SilverTransformer:
    """Handles ALL transformations from Bronze to Silver"""
    def __init__(self, config: Dict):
        self.config = config
        self.catalog = config["catalog"]
        self.bronze_schema = config["bronze_schema"]
        self.bronze_schema = config["bronze_schema"]
    
    def transform_weapons(self) -> DataFrame:
        """Transform weapons (table 1/28)"""
        logger.info("Transforming weapons...")

        df = spark.table(f"{self.catalog}.{self.bronze_schema}.weapons")

        df_parsed = df \
            .withColumn("required_parsed", parse_required_udf(col("requirements"))) \
            .withColumn("fp_parsed", parse_fp_udf(col("fp_cost")))
        
        df_silver = df_parsed.select(
            col("id").alias("weapon_id"),
            col("name").alias("weapon_name"),
            col("category"),
            coalesce(col("weight").cast(DoubleType()), lit(0.0)).alias("weight"),
            col("description"),
            col("image"),
            col("skill"),
           # FP cost structure
            col("fp_parsed.min").alias("fp_cost_min"),
            col("fp_parsed.max").alias("fp_cost_max"),
            col("fp_parsed.per_use").alias("fp_cost_per_use"),
            col("fp_parsed.type").alias("fp_cost_type"),
            col("damage_type"),
            col("passive_effect"),
            # Requirements only (no attack/scaling in base weapons.csv)
            coalesce(col("required_parsed")['str'], lit(0)).alias("required_str"),
            coalesce(col("required_parsed")['dex'], lit(0)).alias("required_dex"),
            coalesce(col("required_parsed")['int'], lit(0)).alias("required_int"),
            coalesce(col("required_parsed")['fai'], lit(0)).alias("required_fai"),
            coalesce(col("required_parsed")['arc'], lit(0)).alias("required_arc"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )

        logger.info(f"  ✅ {df_silver.count():,} weapons")
        return df_silver
    
    def transform_shields(self) -> DataFrame:
        """Transform shields (table 2/28)"""
        logger.info("Transforming shields...")
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.shields")

        df_parsed = df \
            .withColumn("required_parsed", parse_required_udf(col("requirements")))\
            .withColumn("fp_parsed", parse_fp_udf(col("fp_cost")))
        
        df_silver = df_parsed.select(
            col("id").alias("shield_id"),
            col("name").alias("shield_name"),
            col("category"),
            coalesce(col("weight").cast(DoubleType()), lit(0.0)).alias("weight"),
            col("description"),
            col("image"),
            col("skill"),
            coalesce(col("fp_cost").cast(DoubleType()), lit(0.0)).alias("fp_cost"),
            col("damage_type"),
            col("passive_effect"),
            # Requirements
            col("fp_parsed.min").alias("fp_cost_min"),
            col("fp_parsed.max").alias("fp_cost_max"),
            col("fp_parsed.per_use").alias("fp_cost_per_use"),
            col("fp_parsed.type").alias("fp_cost_type"),
            coalesce(col("required_parsed")['str'], lit(0)).alias("required_str"),
            coalesce(col("required_parsed")['dex'], lit(0)).alias("required_dex"),
            coalesce(col("required_parsed")['int'], lit(0)).alias("required_int"),
            coalesce(col("required_parsed")['fai'], lit(0)).alias("required_fai"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} shields")
        return df_silver

    def transform_armors(self) -> DataFrame:
        """Transform armors (table 3/28)"""
        logger.info("Transforming armors...")
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.armors")
        df_parsed = df \
            .withColumn("negation_parsed", parse_negation_udf(col("damage_negation"))) \
            .withColumn("resistance_parsed", parse_resistance_udf(col("resistance")))
    

        df_silver = df_parsed.select(
            col("id").alias("armor_id"),
            col("name").alias("armor_name"),
            col("type").alias("category"),
            coalesce(col("weight").cast(DoubleType()), lit(0.0)).alias("weight"),
            col("description"),
            col("image"),
            # Damage negation
            coalesce(col("negation_parsed")['physical'], lit(0.0)).alias("dmg_negation_physical"),
            coalesce(col("negation_parsed")['vs_strike'], lit(0.0)).alias("dmg_negation_vs_strike"),
            coalesce(col("negation_parsed")['vs_slash'], lit(0.0)).alias("dmg_negation_vs_slash"),
            coalesce(col("negation_parsed")['vs_pierce'], lit(0.0)).alias("dmg_negation_vs_pierce"),
            coalesce(col("negation_parsed")['magic'], lit(0.0)).alias("dmg_negation_magic"),
            coalesce(col("negation_parsed")['fire'], lit(0.0)).alias("dmg_negation_fire"),
            coalesce(col("negation_parsed")['lightning'], lit(0.0)).alias("dmg_negation_lightning"),
            coalesce(col("negation_parsed")['holy'], lit(0.0)).alias("dmg_negation_holy"),
            # Resistances
            coalesce(col("resistance_parsed")['immunity'], lit(0.0)).alias("resistance_immunity"),
            coalesce(col("resistance_parsed")['robustness'], lit(0.0)).alias("resistance_robustness"),
            coalesce(col("resistance_parsed")['focus'], lit(0.0)).alias("resistance_focus"),
            coalesce(col("resistance_parsed")['vitality'], lit(0.0)).alias("resistance_vitality"),
            coalesce(col("resistance_parsed")['poise'], lit(0.0)).alias("resistance_poise"),
            # DLC flag - inline mapping for "Base Game" → 0, everything else → 1
            when(col("dlc").isNull(), lit(0))
            .when(lower(col("dlc")).contains("base"), lit(0))
            .when(lower(col("dlc")) == "0", lit(0))
            .otherwise(lit(1))
            .alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} armors")
        return df_silver
    
    def transform_bosses(self) -> DataFrame:
        """Transform bosses (table 4/28)"""
        logger.info("Transforming bosses...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.bosses")
        df_parsed = df.withColumn("hp_parsed", parse_boss_hp_udf(col("hp")))
        
        df_silver = df_parsed.select(
            col("id").alias("boss_id"),
            col("name").alias("boss_name"),
            col("image"),
            col("hp_parsed.hp_min").alias("hp_min"),
            col("hp_parsed.hp_max").alias("hp_max"),
            col("hp_parsed.hp_type").alias("hp_type"),
            col("hp_parsed.phases").alias("phase_count"),
            col("hp_parsed.classification").alias("boss_classification"),
            col("locations_&_drops").alias("locations_and_drops"),
            col("blockquote"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} bosses")
        return df_silver
    
    def transform_npcs(self) -> DataFrame:
        """Transform NPCs (table 5/28)"""
        logger.info("Transforming NPCs...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.npcs")
        
        df_silver = df.select(
            col("id").alias("npc_id"),
            col("name").alias("npc_name"),
            col("image"),
            col("location").alias("location_text"),
            col("role"),
            col("voiced_by"),
            col("description"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} NPCs")
        return df_silver
    
    def transform_creatures(self) -> DataFrame:
        """Transform creatures (table 6/28)"""
        logger.info("Transforming creatures...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.creatures")
        
        df_silver = df.select(
            col("id").alias("creature_id"),
            col("name").alias("creature_name"),
            col("image"),
            col("locations").alias("location_text"),
            col("drops"),
            col("blockquote"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} creatures")
        return df_silver


    def transform_locations(self) -> DataFrame:
        """Transform locations (table 7/28)"""
        logger.info("Transforming locations...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.locations")
        
        df_silver = df.select(
            col("id").alias("location_id"),
            col("name").alias("location_name"),
            col("image"),
            col("region"),
            col("items"),
            col("npcs"),
            col("creatures"),
            col("bosses"),
            col("description"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} locations")
        return df_silver
    
    def transform_skills(self) -> DataFrame:
        """Transform skills (table 8/28)"""
        logger.info("Transforming skills...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.skills")

        df_parsed =df \
            .withColumn("fp_parsed", parse_fp_udf(col("fp")))
        
        df_silver = df_parsed.select(
            col("id").alias("skill_id"),
            col("name").alias("skill_name"),
            col("image"),
            col("type"),
            col("equipament").alias("equipment"),
            col("charge"),
            col("fp_parsed.min").alias("fp_cost_min"),
            col("fp_parsed.max").alias("fp_cost_max"),
            col("fp_parsed.per_use").alias("fp_cost_per_use"),
            col("fp_parsed.type").alias("fp_cost_type"),
            col("effect"),
            col("locations"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} skills")
        return df_silver
    

    def transform_talismans(self) -> DataFrame:
        """Transform talismans (table 9/28)"""
        logger.info("Transforming talismans...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.talismans")
        
        df_silver = df.select(
            col("id").alias("talisman_id"),
            col("name").alias("talisman_name"),
            col("image"),
            col("effect"),
            coalesce(col("weight").cast(DoubleType()), lit(0.0)).alias("weight"),
            coalesce(expr("try_cast(value as int)"),lit(0)).alias("value"),   
            col("description"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} talismans")
        return df_silver

    def transform_sorceries(self) -> DataFrame:
        """Transform sorceries (table 10/28)"""
        logger.info("Transforming sorceries...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.sorceries")
        df_parsed = df \
            .withColumn("fp_parsed", parse_fp_udf(col("fp")))
           
        
        # Sorceries.csv has separate INT, FAI, ARC columns (not a dict)
        df_silver = df_parsed.select(
            col("id").alias("sorcery_id"),
            col("name").alias("sorcery_name"),
            col("image"),
            col("description"),
            col("effect"),
            col("fp_parsed.min").alias("fp_cost_min"),
            col("fp_parsed.max").alias("fp_cost_max"),
            col("fp_parsed.per_use").alias("fp_cost_per_use"),
            col("fp_parsed.type").alias("fp_cost_type"),
            coalesce(col("slot").cast(IntegerType()), lit(0)).alias("memory_slots"),
            coalesce(col("int").cast(IntegerType()), lit(0)).alias("required_int"),
            coalesce(col("fai").cast(IntegerType()), lit(0)).alias("required_fai"),
            coalesce(col("arc").cast(IntegerType()), lit(0)).alias("required_arc"),
            coalesce(col("stamina_cost").cast(IntegerType()), lit(0)).alias("stamina_cost"),
            col("bonus"),
            col("location"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} sorceries")
        return df_silver
    
    def transform_incantations(self) -> DataFrame:
        """Transform incantations (table 11/28)"""
        logger.info("Transforming incantations...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.incantations")
        df_parsed = df \
            .withColumn("fp_parsed", parse_fp_udf(col("fp")))
        
        # Incantations.csv has separate INT, FAI, ARC columns (not a dict)
        df_silver = df_parsed.select(
            col("id").alias("incantation_id"),
            col("name").alias("incantation_name"),
            col("image"),
            col("description"),
            col("effect"),
            col("fp_parsed.min").alias("fp_cost_min"),
            col("fp_parsed.max").alias("fp_cost_max"),
            col("fp_parsed.per_use").alias("fp_cost_per_use"),
            col("fp_parsed.type").alias("fp_cost_type"),
            coalesce(col("slot").cast(IntegerType()), lit(0)).alias("memory_slots"),
            coalesce(col("int").cast(IntegerType()), lit(0)).alias("required_int"),
            coalesce(col("fai").cast(IntegerType()), lit(0)).alias("required_fai"),
            coalesce(col("arc").cast(IntegerType()), lit(0)).alias("required_arc"),
            coalesce(col("stamina_cost").cast(IntegerType()), lit(0)).alias("stamina_cost"),
            col("bonus"),
            col("group"),
            col("location"),
            when(col("dlc").isNull(), lit(0))
            .when(lower(col("dlc")).contains("base"), lit(0))
            .when(lower(col("dlc")) == "0", lit(0))
            .otherwise(lit(1))
            .alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} incantations")
        return df_silver
    
    def transform_ashes_of_war(self) -> DataFrame:
        """Transform ashes of war (table 12/28)"""
        logger.info("Transforming ashes of war...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.ashes_of_war")
        
        df_silver = df.select(
            col("id").alias("ash_id"),
            col("name").alias("ash_name"),
            col("image"),
            col("affinity"),
            col("skill"),
            col("description"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} ashes of war")
        return df_silver

    def transform_spirit_ashes(self) -> DataFrame:
        """Transform spirit ashes (table 13/28)"""
        logger.info("Transforming spirit ashes...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.spirit_ashes")
        df_parsed = df \
            .withColumn("fp_parsed", parse_fp_udf(col("fp_cost")))
        
        df_silver = df_parsed.select(
            col("id").alias("spirit_id"),
            col("name").alias("spirit_name"),
            col("image"),
            col("type"),
            col("fp_parsed.min").alias("fp_cost_min"),
            col("fp_parsed.max").alias("fp_cost_max"),
            col("fp_parsed.per_use").alias("fp_cost_per_use"),
            col("fp_parsed.type").alias("fp_cost_type"),
            coalesce(col("hp_cost").cast(IntegerType()), lit(0)).alias("hp_cost"),
            col("effect"),
            col("description"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} spirit ashes")
        return df_silver
    
    def transform_ammos(self) -> DataFrame:
        """Transform ammos with attack power parsing (table 16/28)"""
        logger.info("Transforming ammos...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.items_ammos")
        
        # Parse attack power
        df_parsed = df.withColumn("attack_parsed", parse_attack_udf(col("attack_power")))
        
        df_silver = df_parsed.select(
            col("id").alias("ammo_id"),
            col("name").alias("ammo_name"),
            col("image"),
            col("type").alias("ammo_type"),
            col("damage_type"),
            # Attack stats with critical
            coalesce(col("attack_parsed")['physical'], lit(0)).alias("attack_physical"),
            coalesce(col("attack_parsed")['magic'], lit(0)).alias("attack_magic"),
            coalesce(col("attack_parsed")['fire'], lit(0)).alias("attack_fire"),
            coalesce(col("attack_parsed")['lightning'], lit(0)).alias("attack_lightning"),
            coalesce(col("attack_parsed")['holy'], lit(0)).alias("attack_holy"),
            coalesce(col("attack_parsed")['critical'], lit(100)).alias("critical_damage"),
            col("passive_effect"),
            col("description"),
            coalesce(col("dlc").cast(IntegerType()), lit(0)).alias("is_dlc"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} ammos")
        return df_silver
    
    def transform_weapons_upgrades(self) -> DataFrame:
        """Transform weapon upgrades (table 14/28)"""
        logger.info("Transforming weapon upgrades...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.weapons_upgrades")
        
        # Parse attack and defense stats
        df_parsed = df \
            .withColumn("attack_parsed", parse_attack_udf(col("attack_power"))) \
            .withColumn("defense_parsed", parse_defense_udf(col("damage_reduction_%")))\
            .withColumn("scaling_parsed", parse_scaling_udf(col("stat_scaling")))\
            .withColumn("passive_effects_parsed", parse_passive_effects_udf(col("passive_effects")))
        
        df_silver = df_parsed.select(
            col("id").alias("upgrade_id"),
            col("weapon_name"),
            col("upgrade").alias("upgrade_level"),
            # Attack stats
            coalesce(col("attack_parsed")['physical'], lit(0)).alias("attack_physical"),
            coalesce(col("attack_parsed")['magic'], lit(0)).alias("attack_magic"),
            coalesce(col("attack_parsed")['fire'], lit(0)).alias("attack_fire"),
            coalesce(col("attack_parsed")['lightning'], lit(0)).alias("attack_lightning"),
            coalesce(col("attack_parsed")['holy'], lit(0)).alias("attack_holy"),
            coalesce(col("attack_parsed")['stamina'], lit(0)).alias("stamina_cost"),
            coalesce(col("attack_parsed")['critical'], lit(0)).alias("critical_damage"),
            # Scaling grades
            coalesce(col("scaling_parsed")['str'], lit('-')).alias("scaling_str"),
            coalesce(col("scaling_parsed")['dex'], lit('-')).alias("scaling_dex"),
            coalesce(col("scaling_parsed")['int'], lit('-')).alias("scaling_int"),
            coalesce(col("scaling_parsed")['fai'], lit('-')).alias("scaling_fai"),
            coalesce(col("scaling_parsed")['arc'], lit('-')).alias("scaling_arc"),
            # Guard/Defense stats (for weapons with shields like Dueling Shield)
            coalesce(col("defense_parsed")['physical'], lit(0.0)).alias("guard_physical"),
            coalesce(col("defense_parsed")['magic'], lit(0.0)).alias("guard_magic"),
            coalesce(col("defense_parsed")['fire'], lit(0.0)).alias("guard_fire"),
            coalesce(col("defense_parsed")['lightning'], lit(0.0)).alias("guard_lightning"),
            coalesce(col("defense_parsed")['holy'], lit(0.0)).alias("guard_holy"),
            coalesce(col("defense_parsed")['boost'], lit(0.0)).alias("guard_boost"),
            coalesce(col("defense_parsed")['resistance'], lit(0.0)).alias("guard_resistance"),
            # PASSIVE EFFECTS 
            col("passive_effects_parsed.effect_type").alias("passive_effect_type"),
            col("passive_effects_parsed.buildup_primary").alias("passive_buildup_primary"),
            col("passive_effects_parsed.buildup_secondary").alias("passive_buildup_secondary"),
            col("passive_effects_parsed.has_dual_buildup").alias("has_dual_buildup"),
            col("passive_effects_parsed.has_effect").alias("has_passive_effect"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} weapon upgrades")
        return df_silver
    
    def transform_shields_upgrades(self) -> DataFrame:
        """Transform shield upgrades (table 15/28)"""
        logger.info("Transforming shield upgrades...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.shields_upgrades")
        
        df_parsed = df \
            .withColumn("attack_parsed", parse_attack_udf(col("attack_power"))) \
            .withColumn("defense_parsed", parse_defense_udf(col("damage_reduction_%"))) \
            .withColumn("scaling_parsed", parse_scaling_udf(col("stat_scaling")))\
            .withColumn("passive_effects_parsed", parse_passive_effects_udf(col("passive_effects")))

        df_silver = df_parsed.select(
            col("id").alias("upgrade_id"),
            col("shield_name").alias("shield_name"),
            col("upgrade").alias("upgrade_level"),
            # Attack stats (shield bash damage)
            coalesce(col("attack_parsed")['physical'], lit(0)).alias("attack_physical"),
            coalesce(col("attack_parsed")['magic'], lit(0)).alias("attack_magic"),
            coalesce(col("attack_parsed")['fire'], lit(0)).alias("attack_fire"),
            coalesce(col("attack_parsed")['lightning'], lit(0)).alias("attack_lightning"),
            coalesce(col("attack_parsed")['holy'], lit(0)).alias("attack_holy"),
            coalesce(col("attack_parsed")['stamina'], lit(0)).alias("stamina_cost"),
            # Guard stats (primary shield function)
            coalesce(col("defense_parsed")['physical'], lit(0.0)).alias("guard_physical"),
            coalesce(col("defense_parsed")['magic'], lit(0.0)).alias("guard_magic"),
            coalesce(col("defense_parsed")['fire'], lit(0.0)).alias("guard_fire"),
            coalesce(col("defense_parsed")['lightning'], lit(0.0)).alias("guard_lightning"),
            coalesce(col("defense_parsed")['holy'], lit(0.0)).alias("guard_holy"),
            coalesce(col("defense_parsed")['boost'], lit(0.0)).alias("guard_boost"),
            coalesce(col("defense_parsed")['resistance'], lit(0.0)).alias("guard_resistance"),
            # Scaling grades
            coalesce(col("scaling_parsed")['str'], lit('-')).alias("scaling_str"),
            coalesce(col("scaling_parsed")['dex'], lit('-')).alias("scaling_dex"),
            coalesce(col("scaling_parsed")['int'], lit('-')).alias("scaling_int"),
            coalesce(col("scaling_parsed")['fai'], lit('-')).alias("scaling_fai"),
            coalesce(col("scaling_parsed")['arc'], lit('-')).alias("scaling_arc"),
            # PASSIVE EFFECTS (UPDATED - handles dual buildup)
            col("passive_effects_parsed.effect_type").alias("passive_effect_type"),
            col("passive_effects_parsed.buildup_primary").alias("passive_buildup_primary"),
            col("passive_effects_parsed.buildup_secondary").alias("passive_buildup_secondary"),
            col("passive_effects_parsed.has_dual_buildup").alias("has_dual_buildup"),
            col("passive_effects_parsed.has_effect").alias("has_passive_effect"),
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        )
        
        logger.info(f"  ✅ {df_silver.count():,} shield upgrades")
        return df_silver
    
    def transform_item_table(self, table_suffix: str, item_type: str) -> DataFrame:
        """Generic transform for item tables (tables 16-28)"""
        logger.info(f"Transforming {table_suffix}...")
        
        df = spark.table(f"{self.catalog}.{self.bronze_schema}.{table_suffix}")
        
        available_cols = df.columns
        
        select_cols = [
            col("id").alias("item_id"),
            col("name").alias("item_name"),
            lit(item_type).alias("item_type")
        ]
        
        if "description" in available_cols:
            select_cols.append(col("description"))
        else:
            select_cols.append(lit(None).cast(StringType()).alias("description"))
        
        if "effect" in available_cols:
            select_cols.append(col("effect"))
        else:
            select_cols.append(lit(None).cast(StringType()).alias("effect"))
        
        if "image" in available_cols:
            select_cols.append(col("image"))
        else:
            select_cols.append(lit(None).cast(StringType()).alias("image"))
        
        select_cols.extend([
            col("ingestion_timestamp"),
            current_timestamp().alias("silver_timestamp")
        ])
        
        df_silver = df.select(*select_cols)
        
        logger.info(f"  ✅ {df_silver.count():,} {item_type}s")
        return df_silver

In [0]:
class ArrayExploder:
    """Handles array explosions"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.catalog = config["catalog"]
        self.silver_schema = config["silver_schema"]
    
    def explode_location_items(self) -> DataFrame:
        """Explode location items"""
        logger.info("Exploding location items...")
        
        df = spark.table(f"{self.catalog}.{self.silver_schema}.locations")
        
        df_exploded = df \
            .filter(col("items").isNotNull()) \
            .select(
                col("location_id"),
                col("location_name"),
                split(regexp_replace(col("items"), r"[\[\]'\"]", ""), ",\\s*").alias("items_array")
            ) \
            .select(
                col("location_id"),
                explode(col("items_array")).alias("item_name")
            ) \
            .filter(col("item_name") != "")
        
        logger.info(f"  ✅ {df_exploded.count():,} relationships")
        return df_exploded
    
    def explode_location_npcs(self) -> DataFrame:
        """Explode location NPCs"""
        logger.info("Exploding location NPCs...")
        
        df = spark.table(f"{self.catalog}.{self.silver_schema}.locations")
        
        df_exploded = df \
            .filter(col("npcs").isNotNull()) \
            .select(
                col("location_id"),
                split(regexp_replace(col("npcs"), r"[\[\]'\"]", ""), ",\\s*").alias("npcs_array")
            ) \
            .select(
                col("location_id"),
                explode(col("npcs_array")).alias("npc_name")
            ) \
            .filter(col("npc_name") != "")
        
        logger.info(f"  ✅ {df_exploded.count():,} relationships")
        return df_exploded
    
    def explode_location_creatures(self) -> DataFrame:
        """Explode location creatures"""
        logger.info("Exploding location creatures...")
        
        df = spark.table(f"{self.catalog}.{self.silver_schema}.locations")
        
        df_exploded = df \
            .filter(col("creatures").isNotNull()) \
            .select(
                col("location_id"),
                split(regexp_replace(col("creatures"), r"[\[\]'\"]", ""), ",\\s*").alias("creatures_array")
            ) \
            .select(
                col("location_id"),
                explode(col("creatures_array")).alias("creature_name")
            ) \
            .filter(col("creature_name") != "")
        
        logger.info(f"  ✅ {df_exploded.count():,} relationships")
        return df_exploded
    
    def explode_location_bosses(self) -> DataFrame:
        """Explode location bosses"""
        logger.info("Exploding location bosses...")
        
        df = spark.table(f"{self.catalog}.{self.silver_schema}.locations")
        
        df_exploded = df \
            .filter(col("bosses").isNotNull()) \
            .select(
                col("location_id"),
                split(regexp_replace(col("bosses"), r"[\[\]'\"]", ""), ",\\s*").alias("bosses_array")
            ) \
            .select(
                col("location_id"),
                explode(col("bosses_array")).alias("boss_name")
            ) \
            .filter(col("boss_name") != "")
        
        logger.info(f"  ✅ {df_exploded.count():,} relationships")
        return df_exploded
    
    def explode_boss_drops(self) -> DataFrame:
        """
        Explode boss drops from locations_and_drops column
        
        Handles complex nested dict format with multiple locations:
        {'Location1': ['drop1', 'drop2'], 'Location2': ['drop3', 'drop4']}
        
        Also handles HTML anchor tags in location names and comma-formatted numbers
        
        Classifies drops into:
        - 'runes': Numeric values or text containing 'Runes'
        - 'remembrance': Items containing 'Remembrance'
        - 'great_rune': Great Runes
        - 'weapon': Weapon-related drops
        - 'armor': Armor-related drops
        - 'material': Crafting/upgrade materials
        - 'item': Everything else
        """
        logger.info("Exploding boss drops...")
        
        df = spark.table(f"{self.catalog}.{self.silver_schema}.bosses")
        
        # Step 1: Parse the dict-like string structure
        df_processed = df \
            .filter(col("locations_and_drops").isNotNull()) \
            .filter(col("locations_and_drops") != "") \
            .select(
                col("boss_id"),
                col("boss_name"),
                # Remove outer braces
                regexp_replace(col("locations_and_drops"), r"^\{|\}$", "").alias("locations_text")
            )
        
        # Step 2: Split by location patterns (look for ': [' to identify new locations)
        df_split = df_processed \
            .withColumn(
                "location_entries",
                split(col("locations_text"), r"(?=(?:'[^']*'|\"[^\"]*\"|<a[^>]*>[^<]*</a>):\s*\[)")
            ) \
            .select(
                col("boss_id"),
                col("boss_name"),
                explode(col("location_entries")).alias("location_entry")
            ) \
            .filter(col("location_entry") != "")
        
        # Step 3: Extract location name and drops array
        df_extracted = df_split \
            .withColumn(
                "location_raw",
                regexp_extract(col("location_entry"), r"^([^:]+):\s*\[", 1)
            ) \
            .withColumn(
                "drops_text",
                regexp_extract(col("location_entry"), r":\s*\[([^\]]+)\]", 1)
            ) \
            .filter(col("location_raw") != "") \
            .filter(col("drops_text") != "")
        
        # Step 4: Clean location names (remove HTML tags, quotes, extra spaces)
        df_cleaned_location = df_extracted \
            .withColumn(
                "location_name",
                trim(
                    regexp_replace(
                        regexp_replace(
                            regexp_replace(col("location_raw"), r"<a[^>]*>([^<]*)</a>", "$1"),
                            r"['\"]+", ""
                        ),
                        r"\s+", " "
                    )
                )
            )
        
        # Step 5: Split drops array - USE LOOKAHEAD TO AVOID SPLITTING COMMA-FORMATTED NUMBERS
        # Split on: ', ' followed by a quote OR comma-space ONLY if not between digits
        df_exploded = df_cleaned_location \
            .withColumn(
                "drops_array",
                # Remove quotes first, then split on pattern that preserves comma-formatted numbers
                split(
                    regexp_replace(col("drops_text"), r"['\"]+", ""),
                    r",\s*(?=\D)"  # Split on comma-space ONLY if followed by non-digit
                )
            ) \
            .select(
                col("boss_id"),
                col("boss_name"),
                col("location_name"),
                explode(col("drops_array")).alias("drop_item")
            ) \
            .withColumn("drop_item", trim(col("drop_item"))) \
            .filter(col("drop_item") != "") \
            .filter(~col("drop_item").contains(":"))  # Filter out any remaining location fragments
        
        # Step 6: Classify drop types
        df_typed = df_exploded.withColumn(
            "drop_type",
            when(
                # RUNES: Contains 'Rune' (case-insensitive) OR is comma-formatted number
                (lower(col("drop_item")).contains("rune")) |
                (col("drop_item").rlike(r"^\d{1,3}(,\d{3})*$")),  # Matches: 240,000 or 90,000
                lit("runes")
            )
            .when(
                # REMEMBRANCE: Contains "Remembrance"
                lower(col("drop_item")).contains("remembrance"),
                lit("remembrance")
            )
            .when(
                # GREAT RUNE: Contains "Great Rune"
                lower(col("drop_item")).contains("great rune"),
                lit("great_rune")
            )
            .when(
                # DRAGON HEART: Special material
                lower(col("drop_item")).contains("dragon heart"),
                lit("material")
            )
            .when(
                # WEAPON: Common weapon keywords
                (lower(col("drop_item")).contains("sword")) |
                (lower(col("drop_item")).contains("blade")) |
                (lower(col("drop_item")).contains("axe")) |
                (lower(col("drop_item")).contains("spear")) |
                (lower(col("drop_item")).contains("katana")) |
                (lower(col("drop_item")).contains("bow")) |
                (lower(col("drop_item")).contains("staff")) |
                (lower(col("drop_item")).contains("halberd")) |
                (lower(col("drop_item")).contains("hammer")) |
                (lower(col("drop_item")).contains("flail")) |
                (lower(col("drop_item")).contains("seal")),
                lit("weapon")
            )
            .when(
                # ARMOR: Common armor keywords
                (lower(col("drop_item")).contains("armor")) |
                (lower(col("drop_item")).contains("helm")) |
                (lower(col("drop_item")).contains("gauntlet")) |
                (lower(col("drop_item")).contains("greaves")) |
                (lower(col("drop_item")).contains("chest")) |
                (lower(col("drop_item")).contains("hood")) |
                (lower(col("drop_item")).contains("mask")) |
                (lower(col("drop_item")).contains("set")) |
                (lower(col("drop_item")).contains("cloak")),
                lit("armor")
            )
            .when(
                # MATERIAL: Crafting/upgrade materials
                (lower(col("drop_item")).contains("flesh")) |
                (lower(col("drop_item")).contains("scale")) |
                (lower(col("drop_item")).contains("bone")) |
                (lower(col("drop_item")).contains("smithing stone")) |
                (lower(col("drop_item")).contains("glovewort")) |
                (lower(col("drop_item")).contains("somber")),
                lit("material")
            )
            .when(
                # TALISMAN: Talismans/medallions/charms
                (lower(col("drop_item")).contains("talisman")) |
                (lower(col("drop_item")).contains("medallion")) |
                (lower(col("drop_item")).contains("charm")),
                lit("talisman")
            )
            .when(
                # ASH OF WAR
                lower(col("drop_item")).contains("ash of war"),
                lit("ash_of_war")
            )
            .otherwise(lit("item"))  # Default: generic item
        )
        
        # Step 7: Add drop order within each boss-location combination
        df_final = df_typed.withColumn(
            "drop_order",
            row_number().over(
                Window.partitionBy("boss_id", "location_name")
                .orderBy(monotonically_increasing_id())
            )
        )
        
        logger.info(f"  ✅ {df_final.count():,} boss drop relationships")
        
        # Log distribution by drop type
        logger.info("\n  📊 Drop Type Distribution:")
        type_counts = df_final.groupBy("drop_type").count().orderBy(col("count").desc()).collect()
        for row in type_counts:
            logger.info(f"     • {row['drop_type']:<15}: {row['count']:>6,} drops")
        
        return df_final
    
    def explode_creature_drops(self) -> DataFrame:
        """Explode creature drops"""
        logger.info("Exploding creature drops...")
        
        df = spark.table(f"{self.catalog}.{self.silver_schema}.creatures")
        
        df_exploded = df \
            .filter(col("drops").isNotNull()) \
            .select(
                col("creature_id"),
                col("creature_name"),
                split(regexp_replace(col("drops"), r"[\[\]'\"]", ""), ",\\s*").alias("drops_array")
            ) \
            .select(
                col("creature_id"),
                col("creature_name"),
                explode(col("drops_array")).alias("item_name")
            ) \
            .filter(col("item_name") != "")
        
        logger.info(f"  ✅ {df_exploded.count():,} relationships")
        return df_exploded

In [0]:
def create_unified_items_table(config: Dict) -> DataFrame:
    """Create unified items lookup (13 item types)"""
    logger.info("Creating unified items table...")
    
    item_configs = [
        ("items_ammos", "Ammo"),
        ("items_bells", "Bell"),
        ("items_consumables", "Consumable"),
        ("items_cookbooks", "Cookbook"),
        ("items_crystal_tears", "Crystal Tear"),
        ("items_great_runes", "Great Rune"),
        ("items_key_items", "Key Item"),
        ("items_materials", "Material"),
        ("items_multi", "Multi"),
        ("items_remembrances", "Remembrance"),
        ("items_tools", "Tool"),
        ("items_upgrade_materials", "Upgrade Material"),
        ("items_whetblades", "Whetblade")
    ]
    
    transformer = SilverTransformer(config)
    dfs = []
    
    for table_suffix, item_type in item_configs:
        try:
            df = transformer.transform_item_table(table_suffix, item_type)
            dfs.append(df)
        except Exception as e:
            logger.warning(f"Could not load {table_suffix}: {e}")
    
    if dfs:
        df_unified = dfs[0]
        for df in dfs[1:]:
            df_unified = df_unified.unionByName(df, allowMissingColumns=True)
        
        logger.info(f"  ✅ {df_unified.count():,} total items")
        return df_unified
    else:
        return spark.createDataFrame([], StructType([
            StructField("item_id", StringType()),
            StructField("item_name", StringType()),
            StructField("item_type", StringType()),
            StructField("description", StringType()),
            StructField("effect", StringType()),
            StructField("image", StringType())
        ]))


In [0]:
def run_silver_transformations(config: Dict) -> Dict:
    """Main orchestration for ALL 28 tables"""
    logger.info("="*80)
    logger.info("STARTING SILVER LAYER - ALL 28 TABLES")
    logger.info("="*80)
    
    transformer = SilverTransformer(config)
    exploder = ArrayExploder(config)
    
    stats = {"start_time": datetime.now(), "tables_created": []}
    
    # Create schema
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {config['catalog']}.{config['silver_schema']}")
    
    # PHASE 1: Base tables (16 main entities including ammos)
    logger.info("\n📦 PHASE 1: Base transformations (16 main entities)")
    
    base_tables = [
        ("weapons", transformer.transform_weapons),
        ("shields", transformer.transform_shields),
        ("armors", transformer.transform_armors),
        ("bosses", transformer.transform_bosses),
        ("npcs", transformer.transform_npcs),
        ("creatures", transformer.transform_creatures),
        ("locations", transformer.transform_locations),
        ("skills", transformer.transform_skills),
        ("talismans", transformer.transform_talismans),
        ("sorceries", transformer.transform_sorceries),
        ("incantations", transformer.transform_incantations),
        ("ashes_of_war", transformer.transform_ashes_of_war),
        ("spirit_ashes", transformer.transform_spirit_ashes),
        ("items_ammos", transformer.transform_ammos),
        ("weapons_upgrades", transformer.transform_weapons_upgrades),
        ("shields_upgrades", transformer.transform_shields_upgrades)
    ]
    
    for table_name, transform_func in base_tables:
        try:
            df = transform_func()
            full_table_name = f"{config['catalog']}.{config['silver_schema']}.{table_name}"
            
            df.write \
                .format("delta") \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(full_table_name)
            
            stats["tables_created"].append(full_table_name)
        except Exception as e:
            logger.error(f"  ❌ Failed {table_name}: {e}")
    
    # PHASE 2: Array explosions (6 tables)
    logger.info("\n📦 PHASE 2: Array explosions")
    
    explosion_tables = [
        ("location_items", exploder.explode_location_items),
        ("location_npcs", exploder.explode_location_npcs),
        ("location_creatures", exploder.explode_location_creatures),
        ("location_bosses", exploder.explode_location_bosses),
        ("boss_drops", exploder.explode_boss_drops),
        ("creature_drops", exploder.explode_creature_drops)
    ]
    
    for table_name, explode_func in explosion_tables:
        try:
            df = explode_func()
            full_table_name = f"{config['catalog']}.{config['silver_schema']}.{table_name}"
            
            df.write \
                .format("delta") \
                .mode("overwrite") \
                .saveAsTable(full_table_name)
            
            stats["tables_created"].append(full_table_name)
        except Exception as e:
            logger.error(f"  ❌ Failed {table_name}: {e}")
    
    # PHASE 3: Unified items (1 table from 13 types)
    logger.info("\n📦 PHASE 3: Unified items")
    
    try:
        df_items = create_unified_items_table(config)
        full_table_name = f"{config['catalog']}.{config['silver_schema']}.items_unified"
        
        df_items.write \
            .format("delta") \
            .mode("overwrite") \
            .saveAsTable(full_table_name)
        
        stats["tables_created"].append(full_table_name)
    except Exception as e:
        logger.error(f"  ❌ Failed items_unified: {e}")
    
    stats["end_time"] = datetime.now()
    stats["duration"] = (stats["end_time"] - stats["start_time"]).total_seconds()
    
    logger.info("\n" + "="*80)
    logger.info("✅ SILVER LAYER COMPLETE")
    logger.info(f"Tables: {len(stats['tables_created'])}")
    logger.info(f"Duration: {stats['duration']:.2f}s")
    logger.info("="*80)
    
    return stats

In [0]:
if __name__ == "__main__":
    CONFIG = {
        "catalog": "eldenringcatalog",
        "bronze_schema": "bronze",
        "silver_schema": "silver",
        "batch_id": datetime.now().strftime("%Y%m%d_%H%M%S")
    }
    
    try:
        stats = run_silver_transformations(CONFIG)
        
        logger.info("\n📊 SUMMARY")
        logger.info(f"Total tables: {len(stats['tables_created'])}")
        for table in stats['tables_created']:
            logger.info(f"  • {table}")
        
    except Exception as e:
        logger.error(f"\n❌ Failed: {str(e)}")
        import traceback
        traceback.print_exc()
        raise

In [0]:
%sql
select* from eldenringcatalog.silver.location_items
