In [13]:
# 02-dataflow-feature-pipeline.ipynb
# Openbook ML Demo - Dataflow Feature Engineering Pipeline

"""
=============================================================================
DATAFLOW IN THIS PROJECT - TWO SEPARATE JOBS
=============================================================================

JOB 1: Training Data Preparation (THIS NOTEBOOK)
-------------------------------------------------
When: Run once to prepare training data (or periodically when new claims arrive)
Input: Raw claims from GCS (gs://openbook-data-lake/raw/claims/)
Process: 
    - Engineer features from raw claims
    - Encode categorical variables
    - Create procedure embeddings
    - Aggregate patient-level features
Output: Processed features to GCS (gs://openbook-data-lake/processed/features/)
Then: Features feed into model training (notebook 04)


JOB 2: Batch Inference (NOTEBOOK 08 - deployment)
-------------------------------------------------
When: On-demand when dental office submits patients
Input: Batch of patients needing copay predictions
Process:
    - Fetch features from Feature Store
    - Run model predictions in parallel
    - Generate treatment plans via LLM
Output: Predicted copays for all patients
Scale: 
    - Production: 100 patients via Dataflow workers
    - Demo: 5 patients (simplified)

=============================================================================
THIS NOTEBOOK: We write and submit JOB 1 to Dataflow service
=============================================================================
"""

import os
from google.cloud import storage

# Configuration
PROJECT_ID = "openbook-ml-demo"
REGION = "us-central1"
BUCKET_NAME = "openbook-data-lake"

print("DATAFLOW JOB 1: Training Data Preparation")
print("=" * 50)
print(f"Project: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"Input:  gs://{BUCKET_NAME}/raw/claims/")
print(f"Output: gs://{BUCKET_NAME}/processed/features/")
print("\n✓ Configuration set")

DATAFLOW JOB 1: Training Data Preparation
Project: openbook-ml-demo
Region: us-central1
Input:  gs://openbook-data-lake/raw/claims/
Output: gs://openbook-data-lake/processed/features/

✓ Configuration set


In [16]:
from google.cloud import storage

client = storage.Client(project=PROJECT_ID)
bucket = client.bucket(BUCKET_NAME)

In [17]:
# Write Dataflow pipeline script to file
# This script will be submitted to Dataflow service (runs on GCP workers)

dataflow_script = '''
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions, SetupOptions
import json
import csv
from io import StringIO

# ============================================================================
# FEATURE ENGINEERING TRANSFORMS
# ============================================================================

class ParseClaim(beam.DoFn):
    """Parse CSV row into claim dictionary"""
    def __init__(self, header):
        self.header = header
    
    def process(self, row):
        reader = csv.reader(StringIO(row))
        values = next(reader)
        if values[0] == 'claim_id':  # Skip header
            return
        yield dict(zip(self.header, values))


class EngineerFeatures(beam.DoFn):
    """Engineer features for each claim"""
    
    def process(self, claim):
        # Numeric conversions
        claim['procedure_cost'] = float(claim['procedure_cost'])
        claim['charged_amount'] = float(claim['charged_amount'])
        claim['insurance_paid'] = float(claim['insurance_paid'])
        claim['patient_copay'] = float(claim['patient_copay'])
        claim['annual_maximum'] = float(claim['annual_maximum'])
        claim['remaining_maximum'] = float(claim['remaining_maximum'])
        claim['deductible_remaining'] = float(claim['deductible_remaining'])
        claim['preventive_coverage'] = float(claim['preventive_coverage'])
        claim['basic_coverage'] = float(claim['basic_coverage'])
        claim['major_coverage'] = float(claim['major_coverage'])
        claim['months_enrolled'] = int(claim['months_enrolled'])
        claim['is_in_network'] = claim['is_in_network'] == 'True'
        
        # === ENGINEERED FEATURES ===
        
        # 1. Coverage ratio for this procedure category
        category = claim['procedure_category']
        if category == 'preventive':
            claim['coverage_ratio'] = claim['preventive_coverage'] / 100
        elif category == 'basic':
            claim['coverage_ratio'] = claim['basic_coverage'] / 100
        else:
            claim['coverage_ratio'] = claim['major_coverage'] / 100
        
        # 2. Max utilization ratio (how much of annual max is used)
        if claim['annual_maximum'] > 0:
            claim['max_utilization'] = 1 - (claim['remaining_maximum'] / claim['annual_maximum'])
        else:
            claim['max_utilization'] = 0  # DHMO has no max
        
        # 3. Cost to remaining max ratio (will this exhaust the max?)
        if claim['remaining_maximum'] > 0:
            claim['cost_to_max_ratio'] = claim['procedure_cost'] / claim['remaining_maximum']
        else:
            claim['cost_to_max_ratio'] = 999  # Max already exhausted
        
        # 4. Deductible impact
        claim['deductible_applies'] = 1 if claim['deductible_remaining'] > 0 else 0
        
        # 5. Network penalty indicator
        claim['network_penalty'] = 0 if claim['is_in_network'] else 1
        
        # 6. Waiting period risk (for newer enrollees)
        claim['waiting_period_risk'] = 1 if claim['months_enrolled'] < 12 else 0
        
        # 7. Procedure category encoding (one-hot)
        claim['is_preventive'] = 1 if category == 'preventive' else 0
        claim['is_basic'] = 1 if category == 'basic' else 0
        claim['is_major'] = 1 if category == 'major' else 0
        
        # 8. High cost flag
        claim['high_cost_procedure'] = 1 if claim['procedure_cost'] > 500 else 0
        
        # 9. Plan type encoding
        claim['is_ppo'] = 1 if claim['plan_type'] == 'PPO' else 0
        claim['is_dhmo'] = 1 if claim['plan_type'] == 'DHMO' else 0
        claim['is_indemnity'] = 1 if claim['plan_type'] == 'Indemnity' else 0
        
        # 10. Carrier encoding
        carriers = ['Delta Dental', 'Cigna', 'Aetna', 'MetLife', 'Guardian']
        for c in carriers:
            claim[f'carrier_{c.lower().replace(" ", "_")}'] = 1 if claim['carrier'] == c else 0
        
        # 11. Expected copay (naive formula - model learns deviation from this)
        expected_insurance = claim['procedure_cost'] * claim['coverage_ratio']
        if claim['annual_maximum'] > 0:
            expected_insurance = min(expected_insurance, claim['remaining_maximum'])
        claim['expected_copay'] = claim['procedure_cost'] - expected_insurance + claim['deductible_remaining']
        
        # 12. Target: Actual copay deviation from expected
        claim['copay_deviation'] = claim['patient_copay'] - claim['expected_copay']
        
        yield claim


class FormatOutput(beam.DoFn):
    """Format features as CSV row for output"""
    
    FEATURE_COLUMNS = [
        'claim_id', 'patient_id', 'procedure_code',
        # Raw features
        'procedure_cost', 'annual_maximum', 'remaining_maximum',
        'deductible_remaining', 'coverage_ratio', 'months_enrolled',
        # Engineered features
        'max_utilization', 'cost_to_max_ratio', 'deductible_applies',
        'network_penalty', 'waiting_period_risk',
        'is_preventive', 'is_basic', 'is_major',
        'high_cost_procedure', 'is_ppo', 'is_dhmo', 'is_indemnity',
        'carrier_delta_dental', 'carrier_cigna', 'carrier_aetna',
        'carrier_metlife', 'carrier_guardian',
        'expected_copay', 'copay_deviation',
        # Target
        'patient_copay'
    ]
    
    def process(self, claim):
        values = [str(claim.get(col, '')) for col in self.FEATURE_COLUMNS]
        yield ','.join(values)


def run_pipeline(argv=None):
    """Main pipeline execution"""
    
    # Pipeline options
    options = PipelineOptions(argv)
    gcp_options = options.view_as(GoogleCloudOptions)
    gcp_options.project = 'openbook-ml-demo'
    gcp_options.region = 'us-central1'
    gcp_options.job_name = 'openbook-feature-engineering'
    gcp_options.staging_location = 'gs://openbook-data-lake/staging'
    gcp_options.temp_location = 'gs://openbook-data-lake/temp'
    
    options.view_as(StandardOptions).runner = 'DataflowRunner'
    options.view_as(SetupOptions).save_main_session = True
    
    # CSV header
    header = [
        'claim_id', 'patient_id', 'procedure_code', 'procedure_category',
        'claim_date', 'plan_id', 'carrier', 'plan_type', 'is_in_network',
        'annual_maximum', 'remaining_maximum', 'deductible_remaining',
        'preventive_coverage', 'basic_coverage', 'major_coverage',
        'months_enrolled', 'procedure_cost', 'charged_amount',
        'insurance_paid', 'patient_copay', 'claim_status'
    ]
    
    # Build and run pipeline
    with beam.Pipeline(options=options) as p:
        # Read raw claims
        claims = (
            p
            | 'ReadClaims' >> beam.io.ReadFromText('gs://openbook-data-lake/raw/claims/claims.csv')
            | 'ParseCSV' >> beam.ParDo(ParseClaim(header))
            | 'EngineerFeatures' >> beam.ParDo(EngineerFeatures())
            | 'FormatOutput' >> beam.ParDo(FormatOutput())
        )
        
        # Write processed features
        claims | 'WriteFeatures' >> beam.io.WriteToText(
            'gs://openbook-data-lake/processed/features/features',
            file_name_suffix='.csv',
            header=','.join(FormatOutput.FEATURE_COLUMNS)
        )


if __name__ == '__main__':
    run_pipeline()
'''

# Save to file
with open('dataflow_feature_pipeline.py', 'w') as f:
    f.write(dataflow_script)

# Upload to GCS
blob = bucket.blob('pipeline/dataflow_feature_pipeline.py')
blob.upload_from_filename('dataflow_feature_pipeline.py')

print("✓ Dataflow pipeline script created")
print("✓ Uploaded to gs://openbook-data-lake/pipeline/dataflow_feature_pipeline.py")
print("\nFeatures engineered:")
print("  - coverage_ratio: Coverage % for procedure category")
print("  - max_utilization: % of annual max already used")
print("  - cost_to_max_ratio: Will procedure exhaust remaining max?")
print("  - deductible_applies: Binary flag")
print("  - network_penalty: Out-of-network indicator")
print("  - waiting_period_risk: New enrollee flag")
print("  - expected_copay: Naive formula calculation")
print("  - copay_deviation: Actual - Expected (what model learns)")

✓ Dataflow pipeline script created
✓ Uploaded to gs://openbook-data-lake/pipeline/dataflow_feature_pipeline.py

Features engineered:
  - coverage_ratio: Coverage % for procedure category
  - max_utilization: % of annual max already used
  - cost_to_max_ratio: Will procedure exhaust remaining max?
  - deductible_applies: Binary flag
  - network_penalty: Out-of-network indicator
  - waiting_period_risk: New enrollee flag
  - expected_copay: Naive formula calculation
  - copay_deviation: Actual - Expected (what model learns)


In [19]:
# Submit pipeline to Dataflow service
# Runs the Python script on Dataflow workers

print("Submitting Dataflow job...")
print("=" * 50)

# Download script locally and run with DataflowRunner
!python dataflow_feature_pipeline.py \
    --runner=DataflowRunner \
    --project={PROJECT_ID} \
    --region={REGION} \
    --staging_location=gs://{BUCKET_NAME}/staging \
    --temp_location=gs://{BUCKET_NAME}/temp \
    --job_name=openbook-feature-engineering

print("\n✓ Job submitted! Monitor at:")
print(f"https://console.cloud.google.com/dataflow/jobs/{REGION}?project={PROJECT_ID}")

Submitting Dataflow job...

✓ Job submitted! Monitor at:
https://console.cloud.google.com/dataflow/jobs/us-central1?project=openbook-ml-demo


In [20]:
# Submit via gcloud dataflow flex-template or direct Python execution
# First, check if job exists

!gcloud dataflow jobs list --region={REGION} --project={PROJECT_ID} --limit=5


JOB_ID                                   NAME                          TYPE   CREATION_TIME        STATE  REGION
2025-12-14_11_20_24-1872466375908897589  openbook-feature-engineering  Batch  2025-12-14 19:20:25  Done   us-central1


In [21]:
# Check if processed features were created
!gsutil ls gs://{BUCKET_NAME}/processed/features/

gs://openbook-data-lake/processed/features/.keep
gs://openbook-data-lake/processed/features/features-00000-of-00001.csv


In [22]:
# Verify processed features
import pandas as pd
from io import StringIO

# Load processed features
blob = bucket.blob('processed/features/features-00000-of-00001.csv')
content = blob.download_as_text()
features_df = pd.read_csv(StringIO(content))

print(f"✓ Processed {len(features_df):,} claims")
print(f"✓ {len(features_df.columns)} features created")
print("\nFeature columns:")
print(features_df.columns.tolist())
print("\nSample data:")
features_df.head()

✓ Processed 10,000 claims
✓ 29 features created

Feature columns:
['claim_id', 'patient_id', 'procedure_code', 'procedure_cost', 'annual_maximum', 'remaining_maximum', 'deductible_remaining', 'coverage_ratio', 'months_enrolled', 'max_utilization', 'cost_to_max_ratio', 'deductible_applies', 'network_penalty', 'waiting_period_risk', 'is_preventive', 'is_basic', 'is_major', 'high_cost_procedure', 'is_ppo', 'is_dhmo', 'is_indemnity', 'carrier_delta_dental', 'carrier_cigna', 'carrier_aetna', 'carrier_metlife', 'carrier_guardian', 'expected_copay', 'copay_deviation', 'patient_copay']

Sample data:


Unnamed: 0,claim_id,patient_id,procedure_code,procedure_cost,annual_maximum,remaining_maximum,deductible_remaining,coverage_ratio,months_enrolled,max_utilization,...,is_dhmo,is_indemnity,carrier_delta_dental,carrier_cigna,carrier_aetna,carrier_metlife,carrier_guardian,expected_copay,copay_deviation,patient_copay
0,CLM_000001,PAT_0903,D7140,220.0,2000.0,1967.10822,0.0,0.8,12,0.016446,...,0,0,1,0,0,0,0,44.0,1.72,45.72
1,CLM_000002,PAT_0091,D2391,195.0,1000.0,540.450051,100.0,0.6,36,0.45955,...,0,0,0,0,1,0,0,178.0,102.7,280.7
2,CLM_000003,PAT_0414,D1110,130.0,1500.0,1440.043682,0.0,0.8,12,0.039971,...,0,1,0,0,0,0,1,26.0,3.96,29.96
3,CLM_000004,PAT_0650,D2950,385.0,0.0,999999.0,0.0,1.0,24,0.0,...,1,0,0,1,0,0,0,0.0,118.93,118.93
4,CLM_000005,PAT_0499,D2950,385.0,0.0,999999.0,0.0,1.0,24,0.0,...,1,0,0,1,0,0,0,0.0,435.367331,435.367331


In [23]:
# Create train/validation/test splits
from sklearn.model_selection import train_test_split

# Features for model (excluding IDs and target)
feature_cols = [
    'procedure_cost', 'annual_maximum', 'remaining_maximum', 'deductible_remaining',
    'coverage_ratio', 'months_enrolled', 'max_utilization', 'cost_to_max_ratio',
    'deductible_applies', 'network_penalty', 'waiting_period_risk',
    'is_preventive', 'is_basic', 'is_major', 'high_cost_procedure',
    'is_ppo', 'is_dhmo', 'is_indemnity',
    'carrier_delta_dental', 'carrier_cigna', 'carrier_aetna', 
    'carrier_metlife', 'carrier_guardian',
    'expected_copay'
]

target_col = 'patient_copay'
id_cols = ['claim_id', 'patient_id', 'procedure_code']

# Split: 70% train, 15% val, 15% test
train_df, temp_df = train_test_split(features_df, test_size=0.3, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

print(f"Train: {len(train_df):,} samples")
print(f"Val:   {len(val_df):,} samples")
print(f"Test:  {len(test_df):,} samples")

# Upload splits to GCS
for name, df in [('train', train_df), ('val', val_df), ('test', test_df)]:
    blob = bucket.blob(f'processed/features/{name}.csv')
    blob.upload_from_string(df.to_csv(index=False))
    print(f"✓ Uploaded gs://{BUCKET_NAME}/processed/features/{name}.csv")

print(f"\n✓ Feature engineering complete")
print(f"✓ {len(feature_cols)} input features")
print(f"✓ Target: {target_col}")

Train: 7,000 samples
Val:   1,500 samples
Test:  1,500 samples
✓ Uploaded gs://openbook-data-lake/processed/features/train.csv
✓ Uploaded gs://openbook-data-lake/processed/features/val.csv
✓ Uploaded gs://openbook-data-lake/processed/features/test.csv

✓ Feature engineering complete
✓ 24 input features
✓ Target: patient_copay
