# Global FPGrowth Feature Importance Analysis

**Purpose:** Population-level frequent pattern mining for ML feature engineering  
**Updated:** November 23, 2025  
**Hardware:** Optimized for EC2 (32 cores, 1TB RAM)  
**Output:** `s3://pgxdatalake/gold/fpgrowth/global/{item_type}/`

## Key Features

✅ **Three Item Types** - Drugs, ICD codes, CPT codes  
✅ **Global Patterns** - Discovers patterns across all 5.7M patients  
✅ **ML Feature Engineering** - Creates encoding maps for CatBoost  
✅ **Association Rules** - Identifies co-occurrence relationships  
✅ **Memory Optimized** - Configurable support thresholds

## Methodology

For each item type (drug_name, icd_code, cpt_code):
1. Extract all unique items from cohort data
2. Create patient-level transactions (lists of items per patient)
3. Encode transactions into binary matrix
4. Run FP-Growth algorithm to find frequent itemsets
5. Generate association rules from frequent itemsets
6. Create encoding map for ML models
7. Save all outputs to S3

## Expected Runtime (EC2: 32 cores, 1TB RAM)

- **MIN_SUPPORT=0.01**: ~2-3 hours (12,783 drugs → 300-500 itemsets)
- **MIN_SUPPORT=0.005**: ~4-6 hours (more itemsets, more rules)
- **Total for all 3 types**: ~6-18 hours

## Data Scale

- **Total Events**: 947 million
- **Patients**: 5.7 million
- **Unique Drugs**: 12,783
- **Unique ICD Codes**: ~10,000
- **Unique CPT Codes**: ~5,000


## 1. Setup and Imports


In [None]:
import sys
import time
import json
import logging
from pathlib import Path
from typing import List, Dict
import pandas as pd
import boto3
from mlxtend.frequent_patterns import fpgrowth, association_rules
from mlxtend.preprocessing import TransactionEncoder

# Add project root to path
project_root = Path.cwd().parent if Path.cwd().name == '3_fpgrowth_analysis' else Path.cwd()
sys.path.insert(0, str(project_root))

from helpers_1997_13.duckdb_utils import get_duckdb_connection

print("✓ All imports successful")
print(f"✓ Project root: {project_root}")


## 2. Configuration


In [None]:
# =============================================================================
# EC2 CONFIGURATION (32 cores, 1TB RAM)
# =============================================================================

# FP-Growth parameters
MIN_SUPPORT = 0.01      # Items must appear in 1% of patients
MIN_CONFIDENCE = 0.01   # Rules must have 1% confidence

# Item types to process
ITEM_TYPES = ['drug_name', 'icd_code', 'cpt_code']

# Paths
S3_OUTPUT_BASE = "s3://pgxdatalake/gold/fpgrowth/global"
LOCAL_DATA_PATH = project_root / "data" / "gold" / "cohorts_F1120"

# Setup logger
logger = logging.getLogger('global_fpgrowth')
logger.setLevel(logging.INFO)
if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

print(f"✓ Min Support: {MIN_SUPPORT}")
print(f"✓ Min Confidence: {MIN_CONFIDENCE}")
print(f"✓ Item Types: {ITEM_TYPES}")
print(f"✓ S3 Output: {S3_OUTPUT_BASE}")
print(f"✓ Local Data: {LOCAL_DATA_PATH}")
print(f"✓ Local Data Exists: {LOCAL_DATA_PATH.exists()}")


## 3. Define Helper Functions


In [None]:
def extract_global_items(local_data_path: Path, item_type: str, logger: logging.Logger) -> List[str]:
    """Extract all unique items of specified type from local cohort data."""
    logger.info(f"Extracting global {item_type}s from local cohort data...")
    start_time = time.time()
    
    con = get_duckdb_connection(logger=logger)
    parquet_pattern = str(local_data_path / "**" / "cohort.parquet")
    
    if item_type == 'drug_name':
        query = f"""
        SELECT DISTINCT drug_name as item
        FROM read_parquet('{parquet_pattern}', hive_partitioning=1)
        WHERE drug_name IS NOT NULL AND drug_name != '' AND event_type = 'pharmacy'
        ORDER BY item
        """
    elif item_type == 'icd_code':
        query = f"""
        WITH all_icds AS (
            SELECT primary_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE primary_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
            UNION ALL
            SELECT two_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE two_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
            UNION ALL
            SELECT three_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE three_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
            UNION ALL
            SELECT four_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE four_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
            UNION ALL
            SELECT five_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE five_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
        )
        SELECT DISTINCT icd as item FROM all_icds WHERE icd != '' ORDER BY item
        """
    elif item_type == 'cpt_code':
        query = f"""
        SELECT DISTINCT procedure_code as item
        FROM read_parquet('{parquet_pattern}', hive_partitioning=1)
        WHERE procedure_code IS NOT NULL AND procedure_code != '' AND event_type = 'medical'
        ORDER BY item
        """
    else:
        raise ValueError(f"Unknown item_type: {item_type}")
    
    logger.info(f"Running query for {item_type}...")
    df = con.execute(query).df()
    con.close()
    items = df['item'].tolist()
    
    elapsed = time.time() - start_time
    logger.info(f"✓ Extracted {len(items):,} unique {item_type}s in {elapsed:.1f}s")
    return items


def create_global_transactions(local_data_path: Path, item_type: str, logger: logging.Logger) -> List[List[str]]:
    """Create patient-level transactions from local cohort data."""
    logger.info(f"Creating global {item_type} transactions...")
    start_time = time.time()
    
    con = get_duckdb_connection(logger=logger)
    parquet_pattern = str(local_data_path / "**" / "cohort.parquet")
    
    if item_type == 'drug_name':
        query = f"""
        SELECT mi_person_key, drug_name as item
        FROM read_parquet('{parquet_pattern}', hive_partitioning=1)
        WHERE drug_name IS NOT NULL AND drug_name != '' AND event_type = 'pharmacy'
        """
    elif item_type == 'icd_code':
        query = f"""
        WITH all_icds AS (
            SELECT mi_person_key, primary_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE primary_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
            UNION ALL
            SELECT mi_person_key, two_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE two_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
            UNION ALL
            SELECT mi_person_key, three_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE three_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
            UNION ALL
            SELECT mi_person_key, four_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE four_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
            UNION ALL
            SELECT mi_person_key, five_icd_diagnosis_code as icd FROM read_parquet('{parquet_pattern}', hive_partitioning=1) 
            WHERE five_icd_diagnosis_code IS NOT NULL AND event_type = 'medical'
        )
        SELECT mi_person_key, icd as item FROM all_icds WHERE icd != ''
        """
    elif item_type == 'cpt_code':
        query = f"""
        SELECT mi_person_key, procedure_code as item
        FROM read_parquet('{parquet_pattern}', hive_partitioning=1)
        WHERE procedure_code IS NOT NULL AND procedure_code != '' AND event_type = 'medical'
        """
    else:
        raise ValueError(f"Unknown item_type: {item_type}")
    
    logger.info(f"Loading {item_type} events...")
    df = con.execute(query).df()
    con.close()
    
    logger.info(f"Grouping by patient...")
    transactions = (
        df.groupby('mi_person_key')['item']
        .apply(lambda x: sorted(set(x.tolist())))
        .tolist()
    )
    
    elapsed = time.time() - start_time
    logger.info(f"✓ Created {len(transactions):,} patient transactions in {elapsed:.1f}s")
    return transactions

print("✓ Helper functions defined")


In [None]:
def process_item_type(item_type, local_data_path, s3_output_base, min_support, min_confidence, logger):
    """Process a single item type: extract, encode, FP-Growth, save to S3."""
    logger.info(f"\n{'='*80}")
    logger.info(f"Processing {item_type.upper()}")
    logger.info(f"{'='*80}")
    overall_start = time.time()
    
    try:
        # Extract items
        items = extract_global_items(local_data_path, item_type, logger)
        
        # Create transactions
        transactions = create_global_transactions(local_data_path, item_type, logger)
        
        # Encode transactions
        logger.info(f"Encoding {len(transactions):,} transactions...")
        encode_start = time.time()
        te = TransactionEncoder()
        te_ary = te.fit(transactions).transform(transactions)
        df_encoded = pd.DataFrame(te_ary, columns=te.columns_)
        encode_time = time.time() - encode_start
        logger.info(f"✓ Encoded to {df_encoded.shape} matrix in {encode_time:.1f}s")
        
        # Run FP-Growth
        logger.info(f"Running FP-Growth (min_support={min_support})...")
        fpgrowth_start = time.time()
        itemsets = fpgrowth(df_encoded, min_support=min_support, use_colnames=True)
        itemsets = itemsets.sort_values('support', ascending=False).reset_index(drop=True)
        fpgrowth_time = time.time() - fpgrowth_start
        logger.info(f"✓ Found {len(itemsets):,} frequent itemsets in {fpgrowth_time:.1f}s")
        
        # Generate association rules
        logger.info(f"Generating association rules (min_confidence={min_confidence})...")
        rules_start = time.time()
        rules = association_rules(itemsets, metric="confidence", min_threshold=min_confidence)
        rules = rules.sort_values('lift', ascending=False).reset_index(drop=True)
        rules_time = time.time() - rules_start
        logger.info(f"✓ Generated {len(rules):,} association rules in {rules_time:.1f}s")
        
        # Create encoding map
        encoding_map = {}
        for idx, row in itemsets.iterrows():
            if len(row['itemsets']) == 1:
                item = list(row['itemsets'])[0]
                encoding_map[item] = {'support': float(row['support']), 'rank': int(idx)}
        logger.info(f"✓ Created encoding map with {len(encoding_map):,} items")
        
        # Save to S3
        logger.info(f"Saving results to S3...")
        s3_client = boto3.client('s3')
        prefix = f"gold/fpgrowth/global/{item_type}"
        
        # Convert frozensets to lists
        itemsets_json = itemsets.copy()
        itemsets_json['itemsets'] = itemsets_json['itemsets'].apply(list)
        rules_json = rules.copy()
        rules_json['antecedents'] = rules_json['antecedents'].apply(list)
        rules_json['consequents'] = rules_json['consequents'].apply(list)
        
        # Upload files
        s3_client.put_object(Bucket='pgxdatalake', Key=f"{prefix}/encoding_map.json", 
                            Body=json.dumps(encoding_map, indent=2))
        s3_client.put_object(Bucket='pgxdatalake', Key=f"{prefix}/itemsets.json", 
                            Body=itemsets_json.to_json(orient='records', indent=2))
        s3_client.put_object(Bucket='pgxdatalake', Key=f"{prefix}/rules.json", 
                            Body=rules_json.to_json(orient='records', indent=2))
        
        # Save metrics
        metrics = {
            'item_type': item_type, 'min_support': min_support, 'min_confidence': min_confidence,
            'unique_items': len(items), 'total_transactions': len(transactions),
            'frequent_itemsets': len(itemsets), 'association_rules': len(rules),
            'encoding_map_size': len(encoding_map),
            'processing_time_seconds': {'total': time.time() - overall_start}
        }
        s3_client.put_object(Bucket='pgxdatalake', Key=f"{prefix}/metrics.json", 
                            Body=json.dumps(metrics, indent=2))
        
        logger.info(f"✓ {item_type.upper()} COMPLETE - {len(itemsets):,} itemsets, {len(rules):,} rules")
        return metrics
    except Exception as e:
        logger.error(f"✗ Failed: {e}", exc_info=True)
        return {'item_type': item_type, 'error': str(e)}

print("✓ process_item_type function defined")


## 5. Execute Analysis

Process all item types sequentially.


In [None]:
logger.info(f"\n{'='*80}")
logger.info(f"GLOBAL FPGROWTH ANALYSIS - START")
logger.info(f"{'='*80}")
logger.info(f"Item types: {ITEM_TYPES}")
logger.info(f"Min support: {MIN_SUPPORT}")
logger.info(f"Min confidence: {MIN_CONFIDENCE}")

overall_start = time.time()
all_metrics = []

for item_type in ITEM_TYPES:
    metrics = process_item_type(
        item_type=item_type,
        local_data_path=LOCAL_DATA_PATH,
        s3_output_base=S3_OUTPUT_BASE,
        min_support=MIN_SUPPORT,
        min_confidence=MIN_CONFIDENCE,
        logger=logger
    )
    all_metrics.append(metrics)

total_elapsed = time.time() - overall_start

logger.info(f"\n{'='*80}")
logger.info(f"GLOBAL FPGROWTH ANALYSIS - COMPLETE")
logger.info(f"{'='*80}")
logger.info(f"Total processing time: {total_elapsed:.1f}s ({total_elapsed/60:.1f}min)")
logger.info(f"\nResults Summary:")
for m in all_metrics:
    if 'error' not in m:
        logger.info(f"  {m['item_type']}: {m['frequent_itemsets']:,} itemsets, {m['association_rules']:,} rules")
    else:
        logger.info(f"  {m['item_type']}: ERROR - {m['error']}")

print("\n✓ Analysis complete!")
