# Diabetes Prediction Model using Apache Beam Pipelines for Batch and Streaming Data

In [49]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.ml.inference.base import RunInference, ModelHandler, PredictionResult
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.io.textio import ReadFromText, WriteToText
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score
import joblib
import json
import os
from datetime import datetime
from typing import Iterable, Any
import time

### Data Preparation for training, batch and streaming

This section prepares the raw diabetes dataset and creates three distinct splits:
- Training set: used to train the machine learning model.
- Batch set: used to demonstrate offline/batch inference with Apache Beam.
- Streaming set: used to simulate a real-time stream of incoming records.

We use stratified sampling to preserve the original class imbalance (approx. 8.5% positive rate).
All resulting CSVs are saved under the `data/` directory so downstream steps can reuse them.


In [7]:
# ============================================================================
# PART 1: DATA PREPARATION (Run Once)
# ============================================================================

def prepare_datasets(original_csv='diabetes_prediction_dataset.csv'):
    """
    Split original dataset into training, batch, and streaming sets.
    Uses stratified split to maintain natural 8.5% diabetes distribution.
    """
    print("=" * 60)
    print("PART 1: Data Preparation - Stratified Strategy")
    print("=" * 60)
    
    # Create directories
    os.makedirs('data', exist_ok=True)
    os.makedirs('models', exist_ok=True)
    os.makedirs('output', exist_ok=True)
    
    # Load original dataset
    df = pd.read_csv(original_csv)
    total_records = len(df)
    diabetes_count = df['diabetes'].sum()
    no_diabetes_count = (df['diabetes'] == 0).sum()
    diabetes_rate = df['diabetes'].mean()
    
    print(f"\nOriginal dataset: {total_records:,} records")
    print(f"  Diabetes cases:    {diabetes_count:,} ({diabetes_rate*100:.1f}%)")
    print(f"  No diabetes cases: {no_diabetes_count:,} ({(1-diabetes_rate)*100:.1f}%)")
    print(f"  Imbalance ratio:   1:{no_diabetes_count/diabetes_count:.1f}")
    
    print(f"\n✓ Using STRATIFIED split - maintains natural distribution")
    print(f"  Rationale: {int(diabetes_count * 0.7):,} positive cases is sufficient for training")
    print(f"  Will use class_weight='balanced' in model training")
    
    # Simple stratified split: 70% train, 20% batch, 10% stream
    train_df, temp_df = train_test_split(
        df, test_size=0.3, random_state=42, stratify=df['diabetes']
    )
    batch_df, stream_df = train_test_split(
        temp_df, test_size=0.33, random_state=42, stratify=temp_df['diabetes']
    )
    
    # Save splits
    train_df.to_csv('data/diabetes_train.csv', index=False)
    batch_df.to_csv('data/diabetes_batch.csv', index=False)
    stream_df.to_csv('data/diabetes_streaming.csv', index=False)
    
    # Print summary
    print(f"\n{'='*60}")
    print("DATASET SPLIT SUMMARY")
    print('='*60)
    
    print(f"\nTraining set: {len(train_df):,} records -> data/diabetes_train.csv")
    print(f"    ├─ Diabetes:    {train_df['diabetes'].sum():,} ({train_df['diabetes'].mean()*100:.1f}%)")
    print(f"    └─ No diabetes: {(train_df['diabetes']==0).sum():,} ({(1-train_df['diabetes'].mean())*100:.1f}%)")
    
    print(f"\nBatch set: {len(batch_df):,} records -> data/diabetes_batch.csv")
    print(f"    ├─ Diabetes:    {batch_df['diabetes'].sum():,} ({batch_df['diabetes'].mean()*100:.1f}%)")
    print(f"    └─ No diabetes: {(batch_df['diabetes']==0).sum():,} ({(1-batch_df['diabetes'].mean())*100:.1f}%)")
    
    print(f"\nStreaming set: {len(stream_df):,} records -> data/diabetes_streaming.csv")
    print(f"    ├─ Diabetes:    {stream_df['diabetes'].sum():,} ({stream_df['diabetes'].mean()*100:.1f}%)")
    print(f"    └─ No diabetes: {(stream_df['diabetes']==0).sum():,} ({(1-stream_df['diabetes'].mean())*100:.1f}%)")

prepare_datasets()

PART 1: Data Preparation - Stratified Strategy

Original dataset: 100,000 records
  Diabetes cases:    8,500 (8.5%)
  No diabetes cases: 91,500 (91.5%)
  Imbalance ratio:   1:10.8

✓ Using STRATIFIED split - maintains natural distribution
  Rationale: 5,950 positive cases is sufficient for training
  Will use class_weight='balanced' in model training

DATASET SPLIT SUMMARY

Training set: 70,000 records -> data/diabetes_train.csv
    ├─ Diabetes:    5,950 (8.5%)
    └─ No diabetes: 64,050 (91.5%)

Batch set: 20,100 records -> data/diabetes_batch.csv
    ├─ Diabetes:    1,709 (8.5%)
    └─ No diabetes: 18,391 (91.5%)

Streaming set: 9,900 records -> data/diabetes_streaming.csv
    ├─ Diabetes:    841 (8.5%)
    └─ No diabetes: 9,059 (91.5%)


### Model Training and storing

Train ML model using a standard scikit-learn workflow (outside Apache Beam).
This notebook trains a RandomForestClassifier and applies `class_weight='balanced'` to help with the ~8.5% positive class imbalance.

Artifacts saved to the `models/` directory include:
- `diabetes_model.pkl` — the trained Random Forest model
- `label_encoders.pkl` — fitted LabelEncoder objects for categorical features
- `scaler.pkl` — StandardScaler used to normalize numeric features
- `feature_names.pkl` — ordered list of features used for inference

These artifacts are loaded by the Beam pipelines to perform inference reproducibly across workers.

In [8]:
print("\n" + "=" * 60)
print("PART 2: Model Training")
print("=" * 60)

# Load training data
df = pd.read_csv('data/diabetes_train.csv')
print(f"\nTraining on {len(df):,} records")
print(f"  Diabetes:    {df['diabetes'].sum():,} ({df['diabetes'].mean()*100:.1f}%)")
print(f"  No diabetes: {(df['diabetes']==0).sum():,} ({(1-df['diabetes'].mean())*100:.1f}%)")
    


PART 2: Model Training

Training on 70,000 records
  Diabetes:    5,950 (8.5%)
  No diabetes: 64,050 (91.5%)


In [9]:
# ========================================================================
# STEP 1: Encode Categorical Variables
# ========================================================================
print("\n[1/5] Encoding categorical variables...")

label_encoders = {}

# Encode gender
le_gender = LabelEncoder()
df['gender_encoded'] = le_gender.fit_transform(df['gender'])
label_encoders['gender'] = le_gender
print(f"  Gender categories: {list(le_gender.classes_)}")

# Encode smoking_history
le_smoking = LabelEncoder()
df['smoking_encoded'] = le_smoking.fit_transform(df['smoking_history'])
label_encoders['smoking_history'] = le_smoking
print(f"  Smoking categories: {list(le_smoking.classes_)}")


[1/5] Encoding categorical variables...
  Gender categories: ['Female', 'Male', 'Other']
  Smoking categories: ['No Info', 'current', 'ever', 'former', 'never', 'not current']


In [10]:
# ========================================================================
# STEP 2: Prepare Features and Target
# ========================================================================
print("\n[2/5] Preparing features and target...")

feature_columns = [
    'gender_encoded', 
    'age', 
    'hypertension', 
    'heart_disease', 
    'smoking_encoded', 
    'bmi', 
    'HbA1c_level', 
    'blood_glucose_level'
]

X = df[feature_columns].values
y = df['diabetes'].values

print(f"  Feature matrix shape: {X.shape}")
print(f"  Target vector shape: {y.shape}")


[2/5] Preparing features and target...
  Feature matrix shape: (70000, 8)
  Target vector shape: (70000,)


In [11]:
# ========================================================================
# STEP 3: Scale Features
# ========================================================================
print("\n[3/5] Scaling features...")

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

print(f"  Features scaled using StandardScaler")
print(f"  Mean: {X_scaled.mean(axis=0).round(2)}")
print(f"  Std:  {X_scaled.std(axis=0).round(2)}")

# Split for validation
X_train, X_val, y_train, y_val = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42, stratify=y
)

print(f"\n  Training set:   {len(X_train):,} samples")
print(f"  Validation set: {len(X_val):,} samples")
    


[3/5] Scaling features...
  Features scaled using StandardScaler
  Mean: [ 0. -0. -0.  0.  0.  0.  0.  0.]
  Std:  [1. 1. 1. 1. 1. 1. 1. 1.]

  Training set:   56,000 samples
  Validation set: 14,000 samples


In [34]:
# ========================================================================
# STEP 4: Train Random Forest Model
# ========================================================================
print("\n[4/5] Training Random Forest model...")
print("  Configuration:")
print("    - n_estimators: 200")
print("    - max_depth: 15")
print("    - class_weight: 'balanced' (handles 8.5% imbalance)")
print("    - min_samples_split: 5")
print("    - min_samples_leaf: 2")

model = RandomForestClassifier(
    n_estimators=200,
    max_depth=15,
    min_samples_split=5,
    min_samples_leaf=2,
    class_weight='balanced',  # Critical for imbalanced data!
    random_state=42,
    n_jobs=-1,
    verbose=0
)

model.fit(X_train, y_train)
print("\n Model training completed!")


[4/5] Training Random Forest model...
  Configuration:
    - n_estimators: 200
    - max_depth: 15
    - class_weight: 'balanced' (handles 8.5% imbalance)
    - min_samples_split: 5
    - min_samples_leaf: 2

 Model training completed!


In [35]:
# ========================================================================
# STEP 5: Evaluate Model Performance
# ========================================================================
print("\n[5/5] Evaluating model performance...")

# Predictions on validation set
y_val_pred = model.predict(X_val)
y_val_proba = model.predict_proba(X_val)[:, 1]

# Calculate metrics
accuracy = accuracy_score(y_val, y_val_pred)
precision = precision_score(y_val, y_val_pred)
recall = recall_score(y_val, y_val_pred)
f1 = f1_score(y_val, y_val_pred)
auc_roc = roc_auc_score(y_val, y_val_proba)

print("\n" + "=" * 60)
print("VALIDATION SET PERFORMANCE")
print("=" * 60)
print(f"\nOverall Metrics:")
print(f"  Accuracy:  {accuracy:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Recall:    {recall:.4f}")
print(f"  F1-Score:  {f1:.4f}")
print(f"  AUC-ROC:   {auc_roc:.4f}")

# Confusion matrix
cm = confusion_matrix(y_val, y_val_pred)
tn, fp, fn, tp = cm.ravel()

print(f"\nConfusion Matrix:")
print(f"  True Negatives:  {tn:,}")
print(f"  False Positives: {fp:,}")
print(f"  False Negatives: {fn:,}")
print(f"  True Positives:  {tp:,}")

# Additional insights
specificity = tn / (tn + fp)
npv = tn / (tn + fn) if (tn + fn) > 0 else 0

print(f"\nAdditional Metrics:")
print(f"  Specificity (True Negative Rate): {specificity:.4f}")
print(f"  Negative Predictive Value:        {npv:.4f}")

# Classification report
print("\nDetailed Classification Report:")
print(classification_report(
    y_val, y_val_pred, 
    target_names=['No Diabetes', 'Diabetes'],
    digits=4
))


[5/5] Evaluating model performance...

VALIDATION SET PERFORMANCE

Overall Metrics:
  Accuracy:  0.9431
  Precision: 0.6246
  Recall:    0.8277
  F1-Score:  0.7120
  AUC-ROC:   0.9763

Confusion Matrix:
  True Negatives:  12,218
  False Positives: 592
  False Negatives: 205
  True Positives:  985

Additional Metrics:
  Specificity (True Negative Rate): 0.9538
  Negative Predictive Value:        0.9835

Detailed Classification Report:
              precision    recall  f1-score   support

 No Diabetes     0.9835    0.9538    0.9684     12810
    Diabetes     0.6246    0.8277    0.7120      1190

    accuracy                         0.9431     14000
   macro avg     0.8041    0.8908    0.8402     14000
weighted avg     0.9530    0.9431    0.9466     14000



In [36]:
# ========================================================================
# Feature Importance Analysis
# ========================================================================
print("\n" + "=" * 60)
print("FEATURE IMPORTANCE ANALYSIS")
print("=" * 60)

feature_names = [
    'Gender', 
    'Age', 
    'Hypertension', 
    'Heart Disease', 
    'Smoking History', 
    'BMI', 
    'HbA1c Level', 
    'Blood Glucose'
]

importance_df = pd.DataFrame({
    'Feature': feature_names,
    'Importance': model.feature_importances_
}).sort_values('Importance', ascending=False)

print("\nTop Features:")
for idx, row in importance_df.iterrows():
    print(f"  {row['Feature']:20s}: {row['Importance']:.4f} {'█' * int(row['Importance'] * 100)}")


FEATURE IMPORTANCE ANALYSIS

Top Features:
  HbA1c Level         : 0.3712 █████████████████████████████████████
  Blood Glucose       : 0.2906 █████████████████████████████
  Age                 : 0.1640 ████████████████
  BMI                 : 0.0966 █████████
  Hypertension        : 0.0311 ███
  Smoking History     : 0.0252 ██
  Heart Disease       : 0.0157 █
  Gender              : 0.0055 


In [37]:
# ========================================================================
# Save Model and Preprocessing Objects
# ========================================================================
print("\n" + "=" * 60)
print("SAVING MODEL ARTIFACTS")
print("=" * 60)

os.makedirs('models', exist_ok=True)

joblib.dump(model, 'models/diabetes_model.pkl')
joblib.dump(label_encoders, 'models/label_encoders.pkl')
joblib.dump(scaler, 'models/scaler.pkl')

# Save feature names for reference
joblib.dump(feature_names, 'models/feature_names.pkl')

print("\nModel artifacts saved:")
print("  - models/diabetes_model.pkl")
print("  - models/label_encoders.pkl")
print("  - models/scaler.pkl")
print("  - models/feature_names.pkl")


SAVING MODEL ARTIFACTS

Model artifacts saved:
  - models/diabetes_model.pkl
  - models/label_encoders.pkl
  - models/scaler.pkl
  - models/feature_names.pkl


In [17]:
# ========================================================================
# Model Summary
# ========================================================================
print("\n" + "=" * 60)
print("MODEL SUMMARY")
print("=" * 60)
print(f"\nModel Type: Random Forest Classifier")
print(f"Training Samples: {len(X_train):,}")
print(f"Validation Samples: {len(X_val):,}")
print(f"Number of Features: {len(feature_columns)}")
print(f"Number of Trees: {model.n_estimators}")
print(f"Best Validation AUC-ROC: {auc_roc:.4f}")
print(f"\nModel is ready for Apache Beam inference pipeline!")


MODEL SUMMARY

Model Type: Random Forest Classifier
Training Samples: 56,000
Validation Samples: 14,000
Number of Features: 8
Number of Trees: 200
Best Validation AUC-ROC: 0.9763

Model is ready for Apache Beam inference pipeline!


### Custom Transforms for Prediction

This section defines reusable Apache Beam transforms (DoFn classes) used by both the batch and streaming pipelines.
Key responsibilities implemented here include:
- Parsing CSV lines into structured dictionaries
- Preprocessing records (encoding categorical values and scaling numeric features),
- Formatting examples for the RunInference API,
- Filtering and flagging high-risk patients, and
- Calculating aggregated metrics or windowed summaries.

Design note: heavy resources (model, encoders, scaler) are loaded once per worker in `setup()` to minimize overhead during inference.

In [39]:
# ============================================================================
# TRANSFORM 1: Parse CSV Data
# ============================================================================

class ParseCSV(beam.DoFn):
    """Parse CSV line into dictionary."""
    
    def __init__(self, columns):
        self.columns = columns
    
    def process(self, line):
        try:
            values = line.strip().split(',')
            if len(values) != len(self.columns):
                return
            
            record = dict(zip(self.columns, values))
            
            # Convert numeric fields
            record['age'] = float(record['age'])
            record['hypertension'] = int(record['hypertension'])
            record['heart_disease'] = int(record['heart_disease'])
            record['bmi'] = float(record['bmi'])
            record['HbA1c_level'] = float(record['HbA1c_level'])
            record['blood_glucose_level'] = float(record['blood_glucose_level'])
            
            if 'diabetes' in record and record['diabetes'] != '':
                record['diabetes'] = int(record['diabetes'])
            
            yield record
            
        except Exception as e:
            print(f"Error parsing line: {e}")

In [40]:
# ============================================================================
# TRANSFORM 2: Preprocess for Inference
# ============================================================================

class PreprocessForInference(beam.DoFn):
    """Preprocess data for model inference."""
    
    def __init__(self, encoders_path, scaler_path):
        self.encoders_path = encoders_path
        self.scaler_path = scaler_path
        self.encoders = None
        self.scaler = None
    
    def setup(self):
        """Load encoders and scaler once per worker."""
        self.encoders = joblib.load(self.encoders_path)
        self.scaler = joblib.load(self.scaler_path)
    
    def process(self, record):
        try:
            # Encode categorical features
            gender_encoded = self.encoders['gender'].transform([record['gender']])[0]
            smoking_encoded = self.encoders['smoking_history'].transform([record['smoking_history']])[0]
            
            # Create feature vector
            features = np.array([
                gender_encoded,
                record['age'],
                record['hypertension'],
                record['heart_disease'],
                smoking_encoded,
                record['bmi'],
                record['HbA1c_level'],
                record['blood_glucose_level']
            ]).reshape(1, -1)
            
            # Scale features
            features_scaled = self.scaler.transform(features)
            
            yield (record, features_scaled[0])
            
        except Exception as e:
            print(f"Preprocessing error: {e}")

In [41]:
# ============================================================================
# TRANSFORM 3: Format for Model Inference
# ============================================================================

class FormatForInference(beam.DoFn):
    """Format features for RunInference API."""
    
    def process(self, element):
        record, features = element
        yield np.array(features).reshape(1, -1)

In [None]:
# ============================================================================
# TRANSFORM 4: Filter High-Risk Patients
# ============================================================================

class FilterHighRisk(beam.DoFn):
    """Filter high-risk patients (predicted diabetes = 1)."""
    
    def process(self, record):
        if record['predicted_diabetes'] == 1:
            yield record

In [None]:
# ============================================================================
# TRANSFORM 5: Calculate Metrics
# ============================================================================

class CalculateMetrics(beam.DoFn):
    """Calculate prediction metrics and statistics."""
    
    def process(self, records):
        """Calculate metrics from a list of prediction records."""
        records_list = list(records)
        
        if not records_list:
            return
        
        total = len(records_list)
        predicted_positive = sum(1 for r in records_list if r['predicted_diabetes'] == 1)
        predicted_negative = total - predicted_positive
        
        # Check if actual labels are available
        has_labels = records_list[0]['actual_diabetes'] is not None
        
        if has_labels:
            # Calculate confusion matrix components
            tp = sum(1 for r in records_list 
                    if r['predicted_diabetes'] == 1 and r['actual_diabetes'] == 1)
            tn = sum(1 for r in records_list 
                    if r['predicted_diabetes'] == 0 and r['actual_diabetes'] == 0)
            fp = sum(1 for r in records_list 
                    if r['predicted_diabetes'] == 1 and r['actual_diabetes'] == 0)
            fn = sum(1 for r in records_list 
                    if r['predicted_diabetes'] == 0 and r['actual_diabetes'] == 1)
            
            # Calculate performance metrics
            accuracy = (tp + tn) / total if total > 0 else 0
            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
            
            actual_positive = sum(1 for r in records_list if r['actual_diabetes'] == 1)
            
            metrics = {
                'total_predictions': total,
                'predicted_positive': predicted_positive,
                'predicted_negative': predicted_negative,
                'actual_positive': actual_positive,
                'actual_negative': total - actual_positive,
                'true_positives': tp,
                'true_negatives': tn,
                'false_positives': fp,
                'false_negatives': fn,
                'accuracy': round(accuracy, 4),
                'precision': round(precision, 4),
                'recall': round(recall, 4),
                'f1_score': round(f1, 4),
                'high_risk_percentage': round((predicted_positive / total) * 100, 2),
                'timestamp': datetime.now().isoformat()
            }
        else:
            # Metrics without ground truth labels
            metrics = {
                'total_predictions': total,
                'predicted_positive': predicted_positive,
                'predicted_negative': predicted_negative,
                'high_risk_percentage': round((predicted_positive / total) * 100, 2),
                'timestamp': datetime.now().isoformat()
            }
        
        yield metrics

In [None]:
# ============================================================================
# TRANSFORM 6: Filter High-Risk Patients
# ============================================================================

class FilterHighRisk(beam.DoFn):
    """Filter and flag high-risk patients for follow-up."""
    
    def process(self, record):
        """Filter high-risk patients and assign priority."""
        if record['predicted_diabetes'] == 1:
            # Determine priority based on multiple factors
            priority_score = 0
            
            # High HbA1c
            if record['HbA1c_level'] >= 6.5:
                priority_score += 3
            
            # High blood glucose
            if record['blood_glucose_level'] >= 200:
                priority_score += 3
            
            # Obesity
            if record['bmi'] >= 30:
                priority_score += 2
            
            # Comorbidities
            if record['hypertension']:
                priority_score += 1
            if record['heart_disease']:
                priority_score += 1
            
            # Age factor
            if record['age'] >= 60:
                priority_score += 1
            
            # Assign priority level
            if priority_score >= 7:
                priority = 'CRITICAL'
            elif priority_score >= 5:
                priority = 'HIGH'
            else:
                priority = 'MEDIUM'
            
            # Add priority information
            record['priority'] = priority
            record['priority_score'] = priority_score
            record['recommended_action'] = 'Immediate medical consultation required'
            
            yield record

### Apache Beam Batch Prediction Pipeline

This pipeline shows how to perform offline/batch inference with Apache Beam using the trained model artifacts.
Steps include reading and parsing the CSV batch file, preprocessing records in parallel, running model inference via the RunInference API, and writing both full prediction results and a CSV of high-risk patients.

Outputs are written under `output/batch_predictions/` and include a JSONL file with all predictions and a CSV file listing high-risk patients for follow-up. The pipeline runs locally with the DirectRunner by default but can be adapted to other Beam runners.

In [None]:
# ============================================================================
# CUSTOM MODEL HANDLER
# ============================================================================

class DiabetesModelHandler(ModelHandler):
    """Custom model handler for diabetes prediction."""
    
    def __init__(self, model_path):
        self.model_path = model_path
        self._model = None
    
    def load_model(self):
        """Load the model once per worker."""
        self._model = joblib.load(self.model_path)
        return self._model
    
    def run_inference(self, batch: Iterable[np.ndarray], model: Any, inference_args=None) -> Iterable[PredictionResult]:
        """Run inference on a batch."""
        predictions = []
        for example in batch:
            if len(example.shape) == 1:
                example = example.reshape(1, -1)
            pred = model.predict(example)
            predictions.append(PredictionResult(example=example, inference=pred))
        return predictions

In [45]:
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def combine_results(preprocessed_item, predictions):
    """Combine preprocessed record with prediction."""
    record, features = preprocessed_item
    
    # Find corresponding prediction (same index)
    idx = 0  # Simplified: assumes order is preserved
    if idx < len(predictions):
        pred = predictions[idx]
        predicted_class = int(pred.inference[0])
    else:
        predicted_class = 0
    
    return {
        'patient_id': f"P{hash(str(record)) % 1000000:06d}",
        'gender': record['gender'],
        'age': int(record['age']),
        'bmi': round(record['bmi'], 2),
        'HbA1c_level': round(record['HbA1c_level'], 2),
        'blood_glucose_level': int(record['blood_glucose_level']),
        'smoking_history': record['smoking_history'],
        'hypertension': bool(record['hypertension']),
        'heart_disease': bool(record['heart_disease']),
        'predicted_diabetes': predicted_class,
        'risk_category': 'HIGH' if predicted_class == 1 else 'LOW',
        'actual_diabetes': record.get('diabetes', None),
    }


def format_csv(record):
    """Format record as CSV line."""
    return f"{record['patient_id']}," \
           f"{record['gender']}," \
           f"{record['age']}," \
           f"{record['bmi']}," \
           f"{record['HbA1c_level']}," \
           f"{record['blood_glucose_level']}," \
           f"{record['predicted_diabetes']}," \
           f"{record.get('actual_diabetes', 'N/A')}"


def count_lines(filepath):
    """Count lines in file."""
    try:
        with open(filepath, 'r') as f:
            return sum(1 for _ in f)
    except:
        return 0

In [46]:
# ============================================================================
# BATCH PIPELINE
# ============================================================================

def run_batch_pipeline(
    input_file='data/diabetes_batch.csv',
    output_dir='output/batch_predictions',
    model_path='models/diabetes_model.pkl',
    encoders_path='models/label_encoders.pkl',
    scaler_path='models/scaler.pkl'
):
    """
    Batch prediction pipeline.
    
    Steps:
    1. Read and parse CSV
    2. Preprocess (encode + scale)
    3. Run ML inference
    4. Store all predictions
    5. Filter high-risk patients
    """
    
    print("=" * 70)
    print("APACHE BEAM BATCH PREDICTION PIPELINE")
    print("=" * 70)
    print(f"\nInput:  {input_file}")
    print(f"Output: {output_dir}/")
    print("\nExecuting pipeline...")
    print("=" * 70 + "\n")
    
    # CSV columns
    columns = [
        'gender', 'age', 'hypertension', 'heart_disease', 
        'smoking_history', 'bmi', 'HbA1c_level', 
        'blood_glucose_level', 'diabetes'
    ]
    
    # Model handler
    model_handler = DiabetesModelHandler(model_path)
    
    # Pipeline options
    options = PipelineOptions(['--runner=DirectRunner'])
    
    with beam.Pipeline(options=options) as pipeline:
        
        # Step 1: Read and parse
        parsed = (
            pipeline
            | 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1)
            | 'Parse' >> beam.ParDo(ParseCSV(columns))
        )
        
        # Step 2: Preprocess
        preprocessed = (
            parsed
            | 'Preprocess' >> beam.ParDo(
                PreprocessForInference(encoders_path, scaler_path)
            )
        )
        
        # Step 3: Run inference
        predictions = (
            preprocessed
            | 'Format Features' >> beam.ParDo(FormatForInference())
            | 'Predict' >> RunInference(model_handler)
        )
        
        # Step 4: Combine and create results
        results = (
            preprocessed
            | 'Combine with Predictions' >> beam.Map(
                combine_results,
                predictions=beam.pvalue.AsList(predictions)
            )
        )
        
        # Step 5: Store all predictions
        (
            results
            | 'Format JSON' >> beam.Map(lambda r: json.dumps(r))
            | 'Write All Predictions' >> WriteToText(
                f'{output_dir}/predictions',
                file_name_suffix='.jsonl',
                shard_name_template=''
            )
        )
        
        # Step 6: Filter and store high-risk patients
        (
            results
            | 'Filter High Risk' >> beam.ParDo(FilterHighRisk())
            | 'Format CSV' >> beam.Map(format_csv)
            | 'Write High Risk' >> WriteToText(
                f'{output_dir}/high_risk_patients',
                file_name_suffix='.csv',
                header='patient_id,gender,age,bmi,HbA1c_level,blood_glucose_level,predicted_diabetes,actual_diabetes',
                shard_name_template=''
            )
        )
    
    print("\n" + "=" * 70)
    print("PIPELINE COMPLETED!")
    print("=" * 70)
    print(f"\nOutput Files:")
    print(f"  {output_dir}/predictions.jsonl")
    print(f"    - All {count_lines(f'{output_dir}/predictions.jsonl')} predictions")
    print(f"  ")
    print(f"  {output_dir}/high_risk_patients.csv")
    print(f"    - High-risk patients (predicted diabetes = 1)")
    print("=" * 70 + "\n")

run_batch_pipeline()

APACHE BEAM BATCH PREDICTION PIPELINE

Input:  data/diabetes_batch.csv
Output: output/batch_predictions/

Executing pipeline...






PIPELINE COMPLETED!

Output Files:
  output/batch_predictions/predictions.jsonl
    - All 20100 predictions
  
  output/batch_predictions/high_risk_patients.csv
    - High-risk patients (predicted diabetes = 1)



### Streaming Pipeline

This section simulates a near-real-time inference pipeline using Apache Beam primitives suited for streaming workloads.
The pipeline reads records (simulated from a CSV file), attaches timestamps, windows the stream, preprocesses each record, runs inference, and emits continuous outputs and alerts for high-risk patients.

Streaming outputs are written to `output/streaming_predictions/` and include a rolling JSONL of predictions, a JSONL of real-time high-risk alerts, and a periodic summary file with windowed metrics.

In [None]:
# ============================================================================
# STREAMING-SPECIFIC TRANSFORMS
# ============================================================================

class AddTimestamp(beam.DoFn):
    """Add timestamp to each record for windowing."""
    
    def process(self, element):
        # Add current timestamp
        yield beam.window.TimestampedValue(element, time.time())


class FormatStreamingOutput(beam.DoFn):
    """Format output with streaming metadata."""
    
    def process(self, element):
        record, features = element
        
        result = {
            'stream_id': f"S{int(time.time() * 1000000) % 1000000:06d}",
            'patient_id': f"P{hash(str(record)) % 1000000:06d}",
            'gender': record['gender'],
            'age': int(record['age']),
            'bmi': round(record['bmi'], 2),
            'HbA1c_level': round(record['HbA1c_level'], 2),
            'blood_glucose_level': int(record['blood_glucose_level']),
            'smoking_history': record['smoking_history'],
            'processed_time': datetime.now().isoformat(),
            'window': 'streaming'
        }
        
        yield result


class AddPredictionToRecord(beam.DoFn):
    """Add prediction to streaming record."""
    
    def process(self, element, predictions):
        if not predictions:
            return
        
        # Get first prediction
        pred = predictions[0]
        predicted_class = int(pred.inference[0])
        
        element['predicted_diabetes'] = predicted_class
        element['risk_category'] = 'HIGH' if predicted_class == 1 else 'LOW'
        element['prediction_time'] = datetime.now().isoformat()
        
        yield element


class CountPredictions(beam.DoFn):
    """Count predictions in each window."""
    
    def process(self, elements):
        elements_list = list(elements)
        total = len(elements_list)
        high_risk = sum(1 for e in elements_list if e.get('predicted_diabetes') == 1)
        
        summary = {
            'window_summary': True,
            'total_processed': total,
            'high_risk_count': high_risk,
            'low_risk_count': total - high_risk,
            'high_risk_percentage': round((high_risk / total * 100), 2) if total > 0 else 0,
            'timestamp': datetime.now().isoformat()
        }
        
        yield summary

In [48]:
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def combine_streaming_results(preprocessed_item, predictions):
    """Combine preprocessed record with prediction."""
    record, features = preprocessed_item
    
    # Get prediction
    idx = 0
    if idx < len(predictions):
        pred = predictions[idx]
        predicted_class = int(pred.inference[0])
    else:
        predicted_class = 0
    
    return {
        'stream_id': f"S{int(time.time() * 1000000) % 1000000:06d}",
        'patient_id': f"P{hash(str(record)) % 1000000:06d}",
        'gender': record['gender'],
        'age': int(record['age']),
        'bmi': round(record['bmi'], 2),
        'HbA1c_level': round(record['HbA1c_level'], 2),
        'blood_glucose_level': int(record['blood_glucose_level']),
        'smoking_history': record['smoking_history'],
        'hypertension': bool(record['hypertension']),
        'heart_disease': bool(record['heart_disease']),
        'predicted_diabetes': predicted_class,
        'risk_category': 'HIGH' if predicted_class == 1 else 'LOW',
        'actual_diabetes': record.get('diabetes', None),
        'processed_time': datetime.now().isoformat()
    }

In [None]:
# ============================================================================
# STREAMING PIPELINE
# ============================================================================

def run_streaming_pipeline(
    input_file='data/diabetes_streaming.csv',
    output_dir='output/streaming_predictions',
    model_path='models/diabetes_model.pkl',
    encoders_path='models/label_encoders.pkl',
    scaler_path='models/scaler.pkl',
    window_size_seconds=10
):
    """Run streaming prediction pipeline."""
    
    print("=" * 70)
    print("APACHE BEAM STREAMING PREDICTION PIPELINE")
    print("=" * 70)
    print(f"\nInput:  {input_file}")
    print(f"Output: {output_dir}/")
    print(f"Window: {window_size_seconds} seconds")
    print("\nSimulating real-time streaming...")
    print("=" * 70 + "\n")
    
    # CSV columns
    columns = [
        'gender', 'age', 'hypertension', 'heart_disease', 
        'smoking_history', 'bmi', 'HbA1c_level', 
        'blood_glucose_level', 'diabetes'
    ]
    
    # Model handler
    model_handler = DiabetesModelHandler(model_path)
    
    # Pipeline options for streaming
    options = PipelineOptions([
        '--runner=DirectRunner'
    ])
    
    with beam.Pipeline(options=options) as pipeline:
        
        # Step 1: Read and parse (simulates incoming stream)
        parsed = (
            pipeline
            | 'Read Stream' >> ReadFromText(input_file, skip_header_lines=1)
            | 'Parse' >> beam.ParDo(ParseCSV(columns))
        )
        
        # Step 2: Preprocess
        preprocessed = (
            parsed
            | 'Preprocess' >> beam.ParDo(
                PreprocessForInference(encoders_path, scaler_path)
            )
        )
        
        # Step 3: Run inference
        predictions = (
            preprocessed
            | 'Format Features' >> beam.ParDo(FormatForInference())
            | 'Predict' >> RunInference(model_handler)
        )
        
        # Step 4: Combine results
        results = (
            preprocessed
            | 'Combine Results' >> beam.Map(
                combine_streaming_results,
                predictions=beam.pvalue.AsList(predictions)
            )
        )
        
        # Step 5: Write all streaming predictions
        (
            results
            | 'Format JSON' >> beam.Map(lambda r: json.dumps(r))
            | 'Write Stream Predictions' >> WriteToText(
                f'{output_dir}/stream_predictions',
                file_name_suffix='.jsonl',
                shard_name_template=''
            )
        )
        
        # Step 6: Filter and write high-risk alerts
        (
            results
            | 'Filter High Risk Stream' >> beam.ParDo(FilterHighRisk())
            | 'Format Alert' >> beam.Map(
                lambda r: json.dumps({
                    'ALERT': 'HIGH RISK PATIENT',
                    'patient_id': r['patient_id'],
                    'age': r['age'],
                    'HbA1c_level': r['HbA1c_level'],
                    'blood_glucose_level': r['blood_glucose_level'],
                    'alert_time': datetime.now().isoformat()
                })
            )
            | 'Write Alerts' >> WriteToText(
                f'{output_dir}/high_risk_alerts',
                file_name_suffix='.jsonl',
                shard_name_template=''
            )
        )
        
        # Step 7: Create summary statistics
        (
            results
            | 'Group for Summary' >> beam.combiners.ToList()
            | 'Calculate Summary' >> beam.ParDo(CountPredictions())
            | 'Format Summary' >> beam.Map(lambda s: json.dumps(s, indent=2))
            | 'Write Summary' >> WriteToText(
                f'{output_dir}/stream_summary',
                file_name_suffix='.json',
                shard_name_template=''
            )
        )
    
    print("\n" + "=" * 70)
    print("STREAMING PIPELINE COMPLETED!")
    print("=" * 70)
    print(f"\nOutput Files:")
    print(f"  {output_dir}/stream_predictions.jsonl")
    print(f"    - All streaming predictions")
    print(f"  ")
    print(f"  {output_dir}/high_risk_alerts.jsonl")
    print(f"    - Real-time alerts for high-risk patients")
    print(f"  ")
    print(f"  {output_dir}/stream_summary.json")
    print(f"    - Summary statistics")
    print("=" * 70 + "\n")


run_streaming_pipeline()

APACHE BEAM STREAMING PREDICTION PIPELINE

Input:  data/diabetes_streaming.csv
Output: output/streaming_predictions/
Window: 10 seconds

Simulating real-time streaming...






STREAMING PIPELINE COMPLETED!

Output Files:
  output/streaming_predictions/stream_predictions.jsonl
    - All streaming predictions
  
  output/streaming_predictions/high_risk_alerts.jsonl
    - Real-time alerts for high-risk patients
  
  output/streaming_predictions/stream_summary.json
    - Summary statistics

