In [None]:
import stupidf as sf
import polars as pl
import pandas as pd
from collections import defaultdict

def extract_scaling_info_and_apply(stdf_file):
    """
    Extract scaling information from raw STDF and apply it to create scaled columns
    """
    
    print("Loading STDF data...")
    
    # Get the regular DataFrame (this has the test results)
    stdf_parsed = sf.parse_stdf(stdf_file)
    df = stdf_parsed['df']
    
    # Get the raw STDF object (this has detailed PTR info including scaling)
    raw_stdf = sf.get_raw_stdf(stdf_file)
    
    print(f"DataFrame shape: {df.shape}")
    print(f"DataFrame columns: {list(df.columns)}")
    
    # Extract scaling information from the full test information
    full_test_info = raw_stdf.test_data.full_test_information
    
    print(f"Found {len(full_test_info)} test information records")
    
    # Create dictionaries to store scaling info by test_num
    scaling_info = {}
    
    # Extract scaling data from each test information record
    for (test_num, site_num, head_num), test_info in full_test_info.items():
        if test_num not in scaling_info:
            scaling_info[test_num] = {
                'units': getattr(test_info, 'units', ''),
                'res_scal': None,
                'llm_scal': None, 
                'hlm_scal': None,
                'low_limit': getattr(test_info, 'low_limit', None),
                'high_limit': getattr(test_info, 'high_limit', None)
            }
    
    print(f"Extracted scaling info for {len(scaling_info)} unique tests")
    
    # Alternative: Parse the raw records directly to get PTR scaling info
    # This is more reliable since the test_information might not have all scaling data
    print("Parsing raw records for PTR scaling information...")
    
    ptr_scaling_info = extract_ptr_scaling_from_raw(stdf_file)
    
    # Merge the two sources of information
    for test_num in ptr_scaling_info:
        if test_num in scaling_info:
            scaling_info[test_num].update(ptr_scaling_info[test_num])
        else:
            scaling_info[test_num] = ptr_scaling_info[test_num]
    
    print(f"Total scaling info for {len(scaling_info)} tests")
    
    # Convert DataFrame to pandas for easier manipulation
    df_pandas = df.to_pandas()
    
    # Find parametric test columns (numeric column names)
    param_columns = [col for col in df_pandas.columns if col.isdigit()]
    print(f"Found {len(param_columns)} parametric test columns: {param_columns[:10]}")
    
    # Apply scaling to each parametric test
    new_columns = {}
    
    for test_num_str in param_columns:
        test_num = int(test_num_str)
        
        if test_num in scaling_info:
            info = scaling_info[test_num]
            
            # Get the raw results column
            raw_values = df_pandas[test_num_str]
            
            # Apply scaling if res_scal is available
            if info.get('res_scal') is not None:
                res_scal = info['res_scal']
                scaled_values = raw_values * (10 ** res_scal)
                new_columns[f"{test_num}_scaled"] = scaled_values
                new_columns[f"{test_num}_res_scal"] = res_scal
                print(f"Test {test_num}: Applied scaling 10^{res_scal}")
            else:
                # No scaling, but still create scaled column (same as raw)
                new_columns[f"{test_num}_scaled"] = raw_values
                new_columns[f"{test_num}_res_scal"] = 0
            
            # Add other scaling info
            if info.get('llm_scal') is not None:
                new_columns[f"{test_num}_llm_scal"] = info['llm_scal']
            
            if info.get('hlm_scal') is not None:
                new_columns[f"{test_num}_hlm_scal"] = info['hlm_scal']
            
            # Add units
            units = info.get('units', '')
            if units:
                new_columns[f"{test_num}_units"] = units
            
            # Add scaled limits if available
            if info.get('low_limit') is not None and info.get('llm_scal') is not None:
                llm_scal = info['llm_scal']
                scaled_low_limit = info['low_limit'] * (10 ** llm_scal)
                new_columns[f"{test_num}_low_limit_scaled"] = scaled_low_limit
                
            if info.get('high_limit') is not None and info.get('hlm_scal') is not None:
                hlm_scal = info['hlm_scal']
                scaled_high_limit = info['high_limit'] * (10 ** hlm_scal)
                new_columns[f"{test_num}_high_limit_scaled"] = scaled_high_limit
    
    # Add new columns to DataFrame
    for col_name, col_data in new_columns.items():
        df_pandas[col_name] = col_data
    
    print(f"Added {len(new_columns)} new scaling columns")
    
    # Convert back to Polars if needed
    enhanced_df = pl.from_pandas(df_pandas)
    
    return enhanced_df, scaling_info

def extract_ptr_scaling_from_raw(stdf_file):
    """
    Alternative method: Parse the file again and extract PTR records directly
    This gives us access to the scaling fields that are already parsed
    """
    
    # Import the internal stupidf modules to access raw records
    try:
        # This is a bit hacky but should work to get raw PTR records
        from stupidf.records import Records
        from stupidf.records.records import Record
        
        ptr_scaling = {}
        
        # Parse the file record by record
        records = Records(stdf_file)
        
        for record in records:
            if resolved := record.resolve():
                if isinstance(resolved, Record) and hasattr(resolved, '__class__'):
                    # Check if it's a PTR record
                    if resolved.__class__.__name__ == 'PTR' or str(type(resolved)).endswith('PTR'):
                        ptr = resolved
                        test_num = ptr.test_num
                        
                        ptr_scaling[test_num] = {
                            'res_scal': getattr(ptr, 'res_scal', 0),
                            'llm_scal': getattr(ptr, 'llm_scal', 0), 
                            'hlm_scal': getattr(ptr, 'hlm_scal', 0),
                            'units': getattr(ptr, 'units', ''),
                            'low_limit': getattr(ptr, 'lo_limit', None),
                            'high_limit': getattr(ptr, 'hi_limit', None)
                        }
        
        print(f"Extracted PTR scaling info for {len(ptr_scaling)} tests")
        return ptr_scaling
        
    except ImportError:
        print("Could not import internal records module, using alternative method...")
        return extract_ptr_scaling_alternative(stdf_file)

def extract_ptr_scaling_alternative(stdf_file):
    """
    Alternative method using only the public API
    """
    
    # Get rows which contain individual test results
    rows = sf.get_rows(stdf_file)
    
    # This won't give us scaling info directly, but we can try to get it
    # from the test information
    stdf_parsed = sf.parse_stdf(stdf_file)
    test_info = stdf_parsed['test_information'].to_pandas()
    
    ptr_scaling = {}
    
    # Extract what we can from test information
    for _, row in test_info.iterrows():
        if row['test_type'] == 'P':  # Parametric test
            test_num = row['test_num']
            ptr_scaling[test_num] = {
                'res_scal': 0,  # Default, since not available in merged test info
                'llm_scal': 0,
                'hlm_scal': 0, 
                'units': row.get('units', ''),
                'low_limit': row.get('low_limit', None),
                'high_limit': row.get('high_limit', None)
            }
    
    return ptr_scaling

def analyze_scaling_results(enhanced_df, scaling_info):
    """
    Analyze the results of the scaling operation
    """
    
    print("\n" + "="*50)
    print("SCALING ANALYSIS RESULTS")
    print("="*50)
    
    df_pandas = enhanced_df.to_pandas()
    
    # Find all scaling-related columns
    scaled_cols = [col for col in df_pandas.columns if '_scaled' in col]
    res_scal_cols = [col for col in df_pandas.columns if '_res_scal' in col]
    units_cols = [col for col in df_pandas.columns if '_units' in col]
    
    print(f"Created {len(scaled_cols)} scaled result columns")
    print(f"Created {len(res_scal_cols)} scaling exponent columns")
    print(f"Created {len(units_cols)} units columns")
    
    # Show some examples
    print(f"\nExample scaled columns: {scaled_cols[:5]}")
    print(f"Example units columns: {units_cols[:5]}")
    
    # Analyze scaling factors used
    scaling_factors_used = set()
    units_found = set()
    
    for test_num, info in scaling_info.items():
        if info.get('res_scal') is not None:
            scaling_factors_used.add(info['res_scal'])
        if info.get('units'):
            units_found.add(info['units'])
    
    print(f"\nScaling factors found: {sorted(scaling_factors_used)}")
    print(f"Units found: {sorted(units_found)}")
    
    # Show a detailed example for one test
    if scaled_cols:
        example_test = scaled_cols[0].replace('_scaled', '')
        raw_col = example_test
        scaled_col = f"{example_test}_scaled"
        scal_col = f"{example_test}_res_scal"
        
        if all(col in df_pandas.columns for col in [raw_col, scaled_col, scal_col]):
            print(f"\nDetailed example for test {example_test}:")
            
            sample_data = df_pandas[[raw_col, scaled_col, scal_col]].dropna().head(5)
            
            for idx, row in sample_data.iterrows():
                raw = row[raw_col]
                scaled = row[scaled_col]
                scal = row[scal_col]
                
                expected = raw * (10 ** scal) if scal != 0 else raw
                print(f"  Row {idx}: {raw:.6f} × 10^{scal} = {expected:.6f} (got {scaled:.6f})")
    
    return enhanced_df

def main():
    """
    Main function to demonstrate the scaling workaround
    """
    
    # Replace with your STDF file path
    stdf_file = "your_file.stdf"  # Change this!
    
    try:
        # Extract scaling info and create enhanced DataFrame
        enhanced_df, scaling_info = extract_scaling_info_and_apply(stdf_file)
        
        # Analyze the results
        final_df = analyze_scaling_results(enhanced_df, scaling_info)
        
        print(f"\n✅ Successfully created enhanced DataFrame with scaling!")
        print(f"   Original columns: {len(sf.parse_stdf(stdf_file)['df'].columns)}")
        print(f"   Enhanced columns: {len(final_df.columns)}")
        
        # Save the enhanced DataFrame if desired
        # final_df.write_csv("enhanced_stdf_data.csv")
        
        return final_df
        
    except Exception as e:
        print(f"❌ Error: {e}")
        import traceback
        traceback.print_exc()
        return None

if __name__ == "__main__":
    # Quick test function you can run
    def quick_test(stdf_file):
        """Quick test to see if scaling info is available"""
        
        print(f"Testing scaling extraction for: {stdf_file}")
        
        # Get raw data
        raw = sf.get_raw_stdf(stdf_file)
        
        # Check test information
        full_test_info = raw.test_data.full_test_information
        
        print(f"Found {len(full_test_info)} test information records")
        
        # Check a few records for scaling info
        sample_tests = list(full_test_info.items())[:3]
        
        for (test_num, site, head), test_info in sample_tests:
            print(f"\nTest {test_num} (site {site}, head {head}):")
            print(f"  Units: {getattr(test_info, 'units', 'N/A')}")
            print(f"  Low limit: {getattr(test_info, 'low_limit', 'N/A')}")
            print(f"  High limit: {getattr(test_info, 'high_limit', 'N/A')}")
    
    # Uncomment and modify to test:
    # quick_test("your_file.stdf")
    # main()

In [None]:
import stupidf as sf
import pandas as pd

def investigate_stdf_structure(stdf_file):
    """
    First, let's see what's actually available in the STDF data
    """
    print("🔍 Investigating STDF structure...")
    
    try:
        # Get parsed data
        stdf_parsed = sf.parse_stdf(stdf_file)
        print("✅ Successfully parsed STDF file")
        
        # Check main dataframe
        df = stdf_parsed['df']
        print(f"📊 Main DataFrame: {df.shape[0]} rows, {df.shape[1]} columns")
        
        # Check test information
        test_info = stdf_parsed['test_information']
        print(f"📋 Test Information: {test_info.shape[0]} tests")
        print(f"   Columns: {list(test_info.columns)}")
        
        # Check what's in the parsed dict
        print(f"🗂️  Parsed STDF keys: {list(stdf_parsed.keys())}")
        
        # Get raw STDF
        raw_stdf = sf.get_raw_stdf(stdf_file)
        print(f"🔧 Raw STDF type: {type(raw_stdf)}")
        
        if isinstance(raw_stdf, dict):
            print(f"   Raw STDF keys: {list(raw_stdf.keys())}")
            
            # Check test_data structure
            if 'test_data' in raw_stdf:
                test_data = raw_stdf['test_data']
                print(f"   Test data type: {type(test_data)}")
                
                # If test_data is an object, check its attributes
                if hasattr(test_data, '__dict__'):
                    print(f"   Test data attributes: {list(test_data.__dict__.keys())}")
                elif hasattr(test_data, '__dir__'):
                    attrs = [attr for attr in dir(test_data) if not attr.startswith('_')]
                    print(f"   Test data methods/attrs: {attrs}")
        
        return stdf_parsed, raw_stdf
        
    except Exception as e:
        print(f"❌ Error investigating structure: {e}")
        return None, None

def extract_available_scaling_info(stdf_file):
    """
    Extract whatever scaling information is available using the correct access patterns
    """
    print("\n🔧 Extracting available scaling information...")
    
    try:
        # Get the parsed data - this is what definitely works
        stdf_parsed = sf.parse_stdf(stdf_file)
        df = stdf_parsed['df'].to_pandas()
        test_info = stdf_parsed['test_information'].to_pandas()
        
        print(f"📊 Working with {len(df)} data rows and {len(test_info)} test definitions")
        
        # Find parametric test columns
        param_columns = [col for col in df.columns if col.isdigit()]
        print(f"🧮 Found {len(param_columns)} parametric test columns")
        
        # Extract metadata from test information
        scaling_metadata = {}
        
        for _, test_row in test_info.iterrows():
            test_num = test_row['test_num']
            test_type = test_row.get('test_type', '')
            
            if test_type == 'P':  # Parametric test
                scaling_metadata[test_num] = {
                    'test_name': test_row.get('test_name', ''),
                    'units': test_row.get('units', ''),
                    'low_limit': test_row.get('low_limit', None),
                    'high_limit': test_row.get('high_limit', None),
                    'test_text': test_row.get('test_text', ''),
                }
        
        print(f"📋 Extracted metadata for {len(scaling_metadata)} parametric tests")
        
        # Add metadata columns to the main dataframe
        new_columns = {}
        
        for col in param_columns:
            test_num = int(col)
            
            if test_num in scaling_metadata:
                metadata = scaling_metadata[test_num]
                
                # Add units
                if metadata['units']:
                    new_columns[f"{col}_units"] = metadata['units']
                
                # Add test name
                if metadata['test_name']:
                    new_columns[f"{col}_test_name"] = metadata['test_name']
                
                # Add limits
                if metadata['low_limit'] is not None and pd.notna(metadata['low_limit']):
                    new_columns[f"{col}_low_limit"] = metadata['low_limit']
                
                if metadata['high_limit'] is not None and pd.notna(metadata['high_limit']):
                    new_columns[f"{col}_high_limit"] = metadata['high_limit']
                
                # Add test description
                if metadata['test_text']:
                    new_columns[f"{col}_description"] = metadata['test_text']
        
        # Add new columns to dataframe
        for col_name, col_value in new_columns.items():
            df[col_name] = col_value
        
        print(f"✅ Added {len(new_columns)} metadata columns")
        
        return df, scaling_metadata
        
    except Exception as e:
        print(f"❌ Error extracting scaling info: {e}")
        import traceback
        traceback.print_exc()
        return None, None

def try_access_raw_test_data(stdf_file):
    """
    Attempt to access the raw test data to get PTR scaling information
    """
    print("\n🔍 Attempting to access raw test data...")
    
    try:
        raw_stdf = sf.get_raw_stdf(stdf_file)
        
        if isinstance(raw_stdf, dict) and 'test_data' in raw_stdf:
            test_data = raw_stdf['test_data']
            
            # Try different ways to access the data
            if hasattr(test_data, 'full_test_information'):
                full_test_info = test_data.full_test_information
                print(f"✅ Found full_test_information with {len(full_test_info)} records")
                
                # Sample a few records to see what's available
                sample_records = list(full_test_info.items())[:3]
                
                for (test_num, site_num, head_num), test_info in sample_records:
                    print(f"\n📝 Test {test_num} (site {site_num}, head {head_num}):")
                    
                    # Try to access scaling fields
                    attrs_to_check = ['units', 'low_limit', 'high_limit', 'res_scal', 'llm_scal', 'hlm_scal']
                    
                    for attr in attrs_to_check:
                        if hasattr(test_info, attr):
                            value = getattr(test_info, attr)
                            print(f"   {attr}: {value}")
                
                return full_test_info
                
            else:
                print("❌ Could not access full_test_information")
                return None
        else:
            print("❌ Could not access test_data from raw STDF")
            return None
            
    except Exception as e:
        print(f"❌ Error accessing raw test data: {e}")
        return None

def apply_known_scaling_patterns(df):
    """
    Apply common scaling patterns based on typical STDF usage
    """
    print("\n🔧 Applying common scaling patterns...")
    
    # Common scaling patterns in semiconductor testing
    scaling_patterns = {
        'voltage': {'keywords': ['volt', 'vdd', 'vcc', 'supply'], 'likely_scales': [-3, -6, -9]},  # mV, µV, nV
        'current': {'keywords': ['curr', 'idd', 'icc', 'leak'], 'likely_scales': [-3, -6, -9, -12]},  # mA, µA, nA, pA
        'resistance': {'keywords': ['res', 'ohm'], 'likely_scales': [0, 3, 6]},  # Ω, kΩ, MΩ
        'frequency': {'keywords': ['freq', 'hz'], 'likely_scales': [3, 6, 9]},  # kHz, MHz, GHz
        'time': {'keywords': ['time', 'delay', 'period'], 'likely_scales': [-3, -6, -9, -12]},  # ms, µs, ns, ps
    }
    
    # Find columns with units information
    units_cols = [col for col in df.columns if '_units' in col]
    
    scaling_applied = 0
    
    for units_col in units_cols:
        test_num_col = units_col.replace('_units', '')
        
        if test_num_col in df.columns:
            # Get the units value (assuming it's constant)
            units_values = df[units_col].dropna().unique()
            
            if len(units_values) > 0:
                units = str(units_values[0]).lower()
                
                # Check for scaling patterns
                for pattern_name, pattern_info in scaling_patterns.items():
                    for keyword in pattern_info['keywords']:
                        if keyword in units:
                            print(f"   🎯 Found {pattern_name} pattern in test {test_num_col}: {units}")
                            
                            # Apply most likely scaling
                            if pattern_info['likely_scales']:
                                scale = pattern_info['likely_scales'][0]  # Use most common scale
                                
                                scaled_values = df[test_num_col] * (10 ** scale)
                                df[f"{test_num_col}_scaled_{pattern_name}"] = scaled_values
                                df[f"{test_num_col}_scale_factor"] = scale
                                
                                print(f"     Applied 10^{scale} scaling")
                                scaling_applied += 1
                            break
    
    print(f"✅ Applied scaling to {scaling_applied} tests based on common patterns")
    return df

def main_scaling_workflow(stdf_file):
    """
    Main workflow to extract and apply scaling information
    """
    print("🚀 Starting scaling extraction workflow")
    print("=" * 50)
    
    # Step 1: Investigate structure
    stdf_parsed, raw_stdf = investigate_stdf_structure(stdf_file)
    if not stdf_parsed:
        return None
    
    # Step 2: Extract available scaling info
    enhanced_df, metadata = extract_available_scaling_info(stdf_file)
    if enhanced_df is None:
        return None
    
    # Step 3: Try to access raw test data for more detailed info
    raw_test_info = try_access_raw_test_data(stdf_file)
    
    # Step 4: Apply common scaling patterns
    final_df = apply_known_scaling_patterns(enhanced_df)
    
    # Step 5: Summary
    print("\n📊 FINAL RESULTS")
    print("=" * 30)
    
    original_cols = len(stdf_parsed['df'].columns)
    final_cols = len(final_df.columns)
    new_cols = final_cols - original_cols
    
    print(f"Original columns: {original_cols}")
    print(f"Final columns: {final_cols}")
    print(f"New columns added: {new_cols}")
    
    # Show new columns
    all_new_cols = [col for col in final_df.columns if any(suffix in col for suffix in 
                   ['_units', '_test_name', '_low_limit', '_high_limit', '_description', '_scaled_', '_scale_factor'])]
    
    print(f"\nNew columns created:")
    for i, col in enumerate(all_new_cols[:20]):  # Show first 20
        print(f"  {i+1:2d}. {col}")
    
    if len(all_new_cols) > 20:
        print(f"  ... and {len(all_new_cols) - 20} more")
    
    return final_df

# Example usage
if __name__ == "__main__":
    # Replace with your STDF file
    stdf_file = "your_file.stdf"
    
    # Run the workflow
    try:
        result_df = main_scaling_workflow(stdf_file)
        
        if result_df is not None:
            print(f"\n✅ SUCCESS! Enhanced DataFrame ready with {len(result_df.columns)} columns")
            
            # Optional: Save to CSV
            # result_df.to_csv("enhanced_stdf_data.csv", index=False)
            # print("💾 Saved to enhanced_stdf_data.csv")
            
        else:
            print("\n❌ Failed to create enhanced DataFrame")
            
    except Exception as e:
        print(f"\n💥 Error in main workflow: {e}")