In [1]:
import pandas as pd
import boto3
import io
from pathlib import Path

# S3 configuration
s3_bucket = "commercial-rates"  # or use os.getenv('S3_BUCKET')
s3_prefix = "tic-mrf"
specific_fact_table = "/consolidated/centene_fidelis/fact_tables/memory_efficient_fact_table_20250802_183825.parquet"      # or use os.getenv('S3_PREFIX')
s3_key = f"{s3_prefix}{specific_fact_table}"  
# Initialize S3 client
s3_client = boto3.client('s3')

try:
    # Get object from S3
    response = s3_client.get_object(Bucket=s3_bucket, Key=s3_key)
    
    # Read parquet directly from memory buffer
    df = pd.read_parquet(io.BytesIO(response['Body'].read()))
    
    # Basic info
    print(f"Shape: {df.shape}")
    print(f"Columns: {list(df.columns)}")
    print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024 / 1024:.1f} MB")
    print(f"Successfully loaded from s3://{s3_bucket}/{s3_key}")

    # Show first few rows
    df.head()
    
except Exception as e:
    print(f"Error loading from S3: {str(e)}")

Shape: (658968, 54)
Columns: ['rate_uuid', 'payer_uuid', 'organization_uuid', 'service_code', 'service_description', 'billing_code_type', 'negotiated_rate', 'billing_class', 'rate_type', 'service_codes', 'plan_details', 'contract_period', 'provider_network', 'geographic_scope', 'data_lineage', 'created_at', 'updated_at', 'quality_flags', 'rate_npis', 'tin', 'organization_name', 'organization_type', 'parent_system', 'npi_count', 'primary_specialty', 'is_facility', 'headquarters_address', 'service_areas', 'created_at_org', 'updated_at_org', 'data_quality_score', 'npi', 'nppes_provider_type', 'nppes_primary_specialty', 'nppes_gender', 'nppes_addresses', 'nppes_credentials', 'nppes_provider_name', 'nppes_enumeration_date', 'nppes_last_updated', 'nppes_secondary_specialties', 'nppes_metadata', 'nppes_city', 'nppes_state', 'nppes_zip', 'nppes_country', 'nppes_street', 'nppes_phone', 'nppes_fax', 'nppes_address_type', 'nppes_address_purpose', 'rate_category', 'service_category', 'fact_key']
M

In [None]:
import ast
import numpy as np

def parse_nppes_addresses(addr_str):
    """Parse the nppes_addresses string into a list of dictionaries."""
    if pd.isna(addr_str) or addr_str == '':
        return []
    
    # Remove the numpy array wrapper if present
    addr_str = addr_str.strip()
    if addr_str.startswith('array('):
        # Extract the content between the outer brackets
        addr_str = addr_str[addr_str.find('['):addr_str.rfind(']')+1]
    
    try:
        # Now we can safely evaluate the list of dictionaries
        return ast.literal_eval(addr_str)
    except:
        # Return empty list if parsing fails
        return []

# Parse the nppes_addresses column
df['nppes_address_list'] = df['nppes_addresses'].apply(parse_nppes_addresses)

# Verify the results
print("Sample of parsed addresses:")
print(df['nppes_address_list'].head())


In [None]:
# Extract first address fields (if you need them)
df['first_address'] = df['nppes_address_list'].apply(lambda x: x[0] if len(x) > 0 else {})

# Or extract specific fields from first address
df['address_street'] = df['nppes_address_list'].apply(lambda x: x[0].get('street', '') if len(x) > 0 else '')
df['address_city'] = df['nppes_address_list'].apply(lambda x: x[0].get('city', '') if len(x) > 0 else '')
df['address_state'] = df['nppes_address_list'].apply(lambda x: x[0].get('state', '') if len(x) > 0 else '')
df['address_zip'] = df['nppes_address_list'].apply(lambda x: x[0].get('zip', '') if len(x) > 0 else '')

# Show the extracted fields
print("\nSample of extracted address fields:")
print(df[['address_street', 'address_city', 'address_state', 'address_zip']].head())


--inspect s3 parquets

In [6]:
#!/usr/bin/env python3
"""Simple script to inspect payer parquet files in S3."""

import boto3
import pandas as pd
from pathlib import Path
import tempfile

def inspect_payer_parquets(bucket="commercial-rates", prefix="tic-mrf/test"):
    """Inspect payer parquet files in S3."""
    s3_client = boto3.client('s3')
    
    # List payer files
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
    
    payer_files = []
    for page in pages:
        if 'Contents' in page:
            for obj in page['Contents']:
                if 'payers' in obj['Key'] and obj['Key'].endswith('.parquet'):
                    payer_files.append(obj['Key'])
    
    print(f"Found {len(payer_files)} payer files:")
    for i, file_key in enumerate(payer_files, 1):
        print(f"  {i}. {file_key}")
    
    # Inspect first file in detail
    if payer_files:
        print(f"\nInspecting first file: {payer_files[0]}")
        
        # Download and read
        with tempfile.NamedTemporaryFile(suffix='.parquet') as tmp_file:
            s3_client.download_file(bucket, payer_files[0], tmp_file.name)
            df = pd.read_parquet(tmp_file.name)
        
        print(f"Shape: {df.shape}")
        print(f"Columns: {list(df.columns)}")
        print(f"\nSample data:")
        print(df.head(3).to_string())
        
        # Check payer_uuid mapping
        if 'payer_uuid' in df.columns:
            print(f"\nUnique payers: {df['payer_uuid'].nunique()}")
            print(f"Sample UUIDs: {df['payer_uuid'].head().tolist()}")
        
        # Check payer names
        if 'payer_name' in df.columns:
            print(f"\nPayer names: {df['payer_name'].unique()}")

if __name__ == "__main__":
    inspect_payer_parquets()

Found 0 payer files:


In [1]:
#!/usr/bin/env python3
"""
Review S3 streaming fact table results.
Analyzes the streaming chunks and provides insights on the data.
"""

import pandas as pd
import boto3
import logging
from pathlib import Path
import json
from datetime import datetime
import io

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def review_s3_streaming_results():
    """Review the S3 streaming fact table results."""
    
    # S3 configuration
    s3_bucket = "commercial-rates"
    s3_prefix = "tic-mrf/test"
    chunk_key = "tic-mrf/test/fact_tables/streaming_chunks/chunk_0000_20250724_091055.parquet"
    summary_key = "tic-mrf/test/s3_streaming_fact_table_summary.json"
    
    # Initialize S3 client
    try:
        s3_client = boto3.client('s3')
        logger.info("✅ S3 client initialized successfully")
    except Exception as e:
        logger.error(f"❌ Failed to initialize S3 client: {e}")
        return
    
    # Load the streaming chunk
    logger.info(f"Loading streaming chunk from s3://{s3_bucket}/{chunk_key}")
    try:
        response = s3_client.get_object(Bucket=s3_bucket, Key=chunk_key)
        df = pd.read_parquet(io.BytesIO(response['Body'].read()))
        logger.info(f"✅ Loaded {len(df):,} records from streaming chunk")
    except Exception as e:
        logger.error(f"❌ Failed to load streaming chunk: {e}")
        return
    
    # Basic statistics
    print("\n" + "="*60)
    print("📊 S3 STREAMING FACT TABLE ANALYSIS")
    print("="*60)
    
    print(f"\n📈 RECORD COUNT: {len(df):,} records")
    print(f"📁 FILE SIZE: {response['ContentLength'] / 1024 / 1024:.1f} MB")
    
    # Column analysis
    print(f"\n📋 COLUMNS ({len(df.columns)} total):")
    for i, col in enumerate(df.columns, 1):
        print(f"  {i:2d}. {col}")
    
    # Sample data
    print(f"\n🔍 SAMPLE DATA (first 3 records):")
    print(df.head(3).to_string())
    
    # NPPES enrichment analysis
    if 'npi' in df.columns:
        npi_count = df['npi'].notna().sum()
        print(f"\n🏥 NPPES ENRICHMENT:")
        print(f"  Records with NPI: {npi_count:,} ({npi_count/len(df)*100:.1f}%)")
        print(f"  Records without NPI: {len(df) - npi_count:,} ({(len(df) - npi_count)/len(df)*100:.1f}%)")
    
    # Service code analysis
    if 'service_code' in df.columns:
        service_counts = df['service_code'].value_counts().head(10)
        print(f"\n🏥 TOP 10 SERVICE CODES:")
        for code, count in service_counts.items():
            print(f"  {code}: {count:,} records")
    
    # Service category analysis
    if 'service_category' in df.columns:
        category_counts = df['service_category'].value_counts()
        print(f"\n📊 SERVICE CATEGORIES:")
        for category, count in category_counts.items():
            print(f"  {category}: {count:,} records ({count/len(df)*100:.1f}%)")
    
    # Organization analysis
    if 'organization_name' in df.columns:
        org_counts = df['organization_name'].value_counts().head(5)
        print(f"\n🏢 TOP 5 ORGANIZATIONS:")
        for org, count in org_counts.items():
            print(f"  {org}: {count:,} records")
    
    # Rate analysis
    if 'negotiated_rate' in df.columns:
        rates = df['negotiated_rate'].dropna()
        if len(rates) > 0:
            print(f"\n💰 RATE ANALYSIS:")
            print(f"  Min rate: ${rates.min():,.2f}")
            print(f"  Max rate: ${rates.max():,.2f}")
            print(f"  Mean rate: ${rates.mean():,.2f}")
            print(f"  Median rate: ${rates.median():,.2f}")
    
    # NPI analysis
    if 'npi' in df.columns:
        unique_npis = df['npi'].nunique()
        print(f"\n👨‍⚕️ PROVIDER ANALYSIS:")
        print(f"  Unique NPIs: {unique_npis:,}")
        print(f"  Average records per NPI: {len(df)/unique_npis:.1f}")
    
    # Load and display summary
    try:
        summary_response = s3_client.get_object(Bucket=s3_bucket, Key=summary_key)
        summary_data = json.loads(summary_response['Body'].read().decode('utf-8'))
        print(f"\n📋 S3 SUMMARY METADATA:")
        for key, value in summary_data.items():
            print(f"  {key}: {value}")
    except Exception as e:
        logger.warning(f"Could not load summary: {e}")
    
    # Data quality checks
    print(f"\n🔍 DATA QUALITY CHECKS:")
    
    # Check for missing values
    missing_counts = df.isnull().sum()
    columns_with_missing = missing_counts[missing_counts > 0]
    if len(columns_with_missing) > 0:
        print(f"  Columns with missing values:")
        for col, count in columns_with_missing.items():
            print(f"    {col}: {count:,} missing ({count/len(df)*100:.1f}%)")
    else:
        print("  ✅ No missing values found")
    
    # Check for duplicates
    duplicates = df.duplicated().sum()
    print(f"  Duplicate records: {duplicates:,} ({duplicates/len(df)*100:.1f}%)")
    
    print("\n" + "="*60)
    print("✅ S3 STREAMING REVIEW COMPLETE")
    print("="*60)

def compare_with_original():
    """Compare streaming results with original fact table."""
    
    print("\n" + "="*60)
    print("🔄 COMPARISON WITH ORIGINAL FACT TABLE")
    print("="*60)
    
    # Load original fact table
    original_file = Path("dashboard_data/memory_efficient_fact_table.parquet")
    if original_file.exists():
        original_df = pd.read_parquet(original_file)
        print(f"\n📊 ORIGINAL FACT TABLE:")
        print(f"  Records: {len(original_df):,}")
        print(f"  File size: {original_file.stat().st_size / 1024 / 1024:.1f} MB")
        print(f"  Columns: {len(original_df.columns)}")
        
        # Compare record counts
        streaming_count = 17067  # From the logs
        print(f"\n📈 COMPARISON:")
        print(f"  Original records: {len(original_df):,}")
        print(f"  Streaming records: {streaming_count:,}")
        print(f"  Difference: {len(original_df) - streaming_count:,} ({((len(original_df) - streaming_count)/len(original_df)*100:.1f}%)")
        
        # Check if streaming is subset
        if streaming_count < len(original_df):
            print(f"  ⚠️  Streaming processed fewer records (test mode limited to 2 files)")
        else:
            print(f"  ✅ Streaming processed more records")
    else:
        print("❌ Original fact table not found")

if __name__ == "__main__":
    print("🚀 Starting S3 streaming results review...")
    review_s3_streaming_results()
    compare_with_original()

SyntaxError: f-string: expecting a valid expression after '{' (3457547917.py, line 160)