# Kubernetes Tutorial

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ContextLab/clustrix/blob/master/docs/notebooks/kubernetes_tutorial.ipynb)

This tutorial demonstrates how to use Clustrix with Kubernetes clusters for containerized distributed computing.

## Prerequisites

- Access to a Kubernetes cluster
- kubectl configured for your cluster
- Clustrix installed with Kubernetes support: `pip install clustrix[kubernetes]`

In [None]:
# Install Clustrix with Kubernetes support (uncomment if needed)
# !pip install clustrix[kubernetes]

import clustrix
from clustrix import cluster, configure
import numpy as np

## Kubernetes Configuration

Configure Clustrix for your Kubernetes cluster:

In [None]:
# Configure for Kubernetes cluster
configure(
    cluster_type="kubernetes",
    
    # Kubernetes-specific settings
    k8s_namespace="default",              # Kubernetes namespace
    k8s_config_file="~/.kube/config",     # Path to kubeconfig
    
    # Default resource requirements
    default_cores=2,
    default_memory="4Gi",                 # Kubernetes format (Gi, Mi)
    default_cpu_limit=4,                  # CPU limit (can be > cores)
    default_memory_limit="8Gi",           # Memory limit
    
    # Container settings
    container_image="python:3.11-slim",  # Base Python image
    image_pull_policy="IfNotPresent",     # Image pull policy
    
    # Job settings
    job_ttl_seconds=3600,                 # Job cleanup after 1 hour
    backoff_limit=3,                      # Retry failed jobs up to 3 times
    
    # Cleanup
    cleanup_on_success=True,
    max_parallel_jobs=20
)

print("Kubernetes cluster configured successfully!")

## Example 1: Containerized Machine Learning

Train machine learning models in Kubernetes pods:

In [None]:
@cluster(
    cores=4,
    memory="8Gi",
    cpu_limit=6,
    memory_limit="12Gi",
    container_image="python:3.11",
    job_name="ml-training"  # Custom job name
)
def distributed_ml_training(model_type="random_forest", n_estimators=200, dataset_size=50000):
    """
    Distributed machine learning training in Kubernetes.
    """
    import numpy as np
    import os
    import json
    from datetime import datetime
    
    # Install required packages within the container
    os.system("pip install scikit-learn pandas numpy")
    
    from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
    from sklearn.svm import SVC
    from sklearn.neural_network import MLPClassifier
    from sklearn.datasets import make_classification
    from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    from sklearn.preprocessing import StandardScaler
    import pandas as pd
    
    print(f"Starting ML training: {model_type}, {n_estimators} estimators, {dataset_size:,} samples")
    print(f"Pod started at: {datetime.now()}")
    
    # Generate synthetic dataset
    print("Generating synthetic dataset...")
    X, y = make_classification(
        n_samples=dataset_size,
        n_features=50,
        n_informative=30,
        n_redundant=10,
        n_classes=3,
        n_clusters_per_class=2,
        flip_y=0.05,  # Add some noise
        random_state=42
    )
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    # Feature scaling for SVM and MLP
    if model_type in ['svm', 'mlp']:
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
    
    print(f"Dataset: {X_train.shape[0]:,} training, {X_test.shape[0]:,} test samples")
    
    # Model selection and configuration
    models = {
        'random_forest': {
            'model': RandomForestClassifier,
            'params': {
                'n_estimators': n_estimators,
                'max_depth': 20,
                'min_samples_split': 5,
                'min_samples_leaf': 2,
                'n_jobs': -1,
                'random_state': 42
            },
            'param_grid': {
                'max_depth': [15, 20, 25],
                'min_samples_split': [2, 5, 10]
            }
        },
        'gradient_boosting': {
            'model': GradientBoostingClassifier,
            'params': {
                'n_estimators': n_estimators,
                'learning_rate': 0.1,
                'max_depth': 6,
                'random_state': 42
            },
            'param_grid': {
                'learning_rate': [0.05, 0.1, 0.2],
                'max_depth': [4, 6, 8]
            }
        },
        'svm': {
            'model': SVC,
            'params': {
                'kernel': 'rbf',
                'C': 1.0,
                'gamma': 'scale',
                'random_state': 42
            },
            'param_grid': {
                'C': [0.1, 1.0, 10.0],
                'gamma': ['scale', 'auto']
            }
        },
        'mlp': {
            'model': MLPClassifier,
            'params': {
                'hidden_layer_sizes': (100, 50),
                'activation': 'relu',
                'solver': 'adam',
                'alpha': 0.0001,
                'max_iter': 1000,
                'random_state': 42
            },
            'param_grid': {
                'hidden_layer_sizes': [(50,), (100,), (100, 50)],
                'alpha': [0.0001, 0.001, 0.01]
            }
        }
    }
    
    if model_type not in models:
        model_type = 'random_forest'  # Default fallback
    
    model_config = models[model_type]
    
    # Train base model
    print(f"Training {model_type} model...")
    start_time = datetime.now()
    
    base_model = model_config['model'](**model_config['params'])
    base_model.fit(X_train, y_train)
    
    training_time = (datetime.now() - start_time).total_seconds()
    print(f"Base model training completed in {training_time:.2f} seconds")
    
    # Base model evaluation
    y_pred = base_model.predict(X_test)
    base_accuracy = accuracy_score(y_test, y_pred)
    base_precision = precision_score(y_test, y_pred, average='weighted')
    base_recall = recall_score(y_test, y_pred, average='weighted')
    base_f1 = f1_score(y_test, y_pred, average='weighted')
    
    print(f"Base model performance: Accuracy={base_accuracy:.4f}")
    
    # Cross-validation
    print("Performing cross-validation...")
    cv_scores = cross_val_score(base_model, X_train, y_train, cv=5, n_jobs=-1)
    
    # Hyperparameter optimization
    print("Optimizing hyperparameters...")
    grid_search = GridSearchCV(
        model_config['model'](),
        model_config['param_grid'],
        cv=3,
        scoring='accuracy',
        n_jobs=-1,
        verbose=0
    )
    
    grid_search.fit(X_train, y_train)
    best_model = grid_search.best_estimator_
    
    # Best model evaluation
    y_pred_best = best_model.predict(X_test)
    best_accuracy = accuracy_score(y_test, y_pred_best)
    best_precision = precision_score(y_test, y_pred_best, average='weighted')
    best_recall = recall_score(y_test, y_pred_best, average='weighted')
    best_f1 = f1_score(y_test, y_pred_best, average='weighted')
    
    print(f"Optimized model performance: Accuracy={best_accuracy:.4f}")
    
    # Feature importance (if available)
    feature_importance = None
    if hasattr(best_model, 'feature_importances_'):
        feature_importance = best_model.feature_importances_.tolist()
        top_features = sorted(enumerate(feature_importance), 
                            key=lambda x: x[1], reverse=True)[:10]
        print(f"Top 5 features: {[f'Feature_{i}' for i, _ in top_features[:5]]}")
    
    # Model complexity analysis
    def analyze_model_complexity(model, model_type):
        complexity_metrics = {}
        
        if model_type == 'random_forest':
            complexity_metrics = {
                'n_estimators': model.n_estimators,
                'max_depth': model.max_depth,
                'total_nodes': sum(tree.tree_.node_count for tree in model.estimators_),
                'avg_depth': np.mean([tree.tree_.max_depth for tree in model.estimators_])
            }
        elif model_type == 'gradient_boosting':
            complexity_metrics = {
                'n_estimators': model.n_estimators,
                'max_depth': model.max_depth,
                'learning_rate': model.learning_rate,
                'total_nodes': sum(tree[0].tree_.node_count for tree in model.estimators_)
            }
        elif model_type == 'svm':
            complexity_metrics = {
                'n_support_vectors': model.n_support_.sum(),
                'kernel': model.kernel,
                'C': model.C,
                'gamma': model.gamma
            }
        elif model_type == 'mlp':
            complexity_metrics = {
                'hidden_layers': len(model.hidden_layer_sizes),
                'total_parameters': sum(layer.size for layer in model.coefs_) + 
                                  sum(layer.size for layer in model.intercepts_),
                'n_iterations': model.n_iter_,
                'loss': model.loss_
            }
        
        return complexity_metrics
    
    complexity_metrics = analyze_model_complexity(best_model, model_type)
    
    # Compile results
    training_results = {
        'model_info': {
            'model_type': model_type,
            'dataset_size': dataset_size,
            'n_features': X.shape[1],
            'n_classes': len(np.unique(y)),
            'training_samples': X_train.shape[0],
            'test_samples': X_test.shape[0]
        },
        'training_metrics': {
            'training_time_seconds': training_time,
            'hyperparameter_optimization': True,
            'cross_validation_folds': 5
        },
        'base_model_performance': {
            'accuracy': base_accuracy,
            'precision': base_precision,
            'recall': base_recall,
            'f1_score': base_f1
        },
        'optimized_model_performance': {
            'accuracy': best_accuracy,
            'precision': best_precision,
            'recall': best_recall,
            'f1_score': best_f1,
            'improvement_over_base': best_accuracy - base_accuracy
        },
        'cross_validation': {
            'cv_scores': cv_scores.tolist(),
            'cv_mean': np.mean(cv_scores),
            'cv_std': np.std(cv_scores)
        },
        'best_hyperparameters': grid_search.best_params_,
        'model_complexity': complexity_metrics,
        'feature_importance': feature_importance,
        'kubernetes_info': {
            'pod_name': os.environ.get('HOSTNAME', 'unknown'),
            'namespace': os.environ.get('KUBERNETES_NAMESPACE', 'default'),
            'completion_time': datetime.now().isoformat()
        }
    }
    
    return training_results

# Run ML training in Kubernetes
ml_results = distributed_ml_training(
    model_type="random_forest", 
    n_estimators=150, 
    dataset_size=30000
)

print(f"\nMACHINE LEARNING TRAINING COMPLETE")
model_info = ml_results['model_info']
print(f"Model: {model_info['model_type']}")
print(f"Dataset: {model_info['dataset_size']:,} samples, {model_info['n_features']} features")

base_perf = ml_results['base_model_performance']
opt_perf = ml_results['optimized_model_performance']
print(f"\nPerformance Comparison:")
print(f"  Base model accuracy: {base_perf['accuracy']:.4f}")
print(f"  Optimized accuracy: {opt_perf['accuracy']:.4f}")
print(f"  Improvement: +{opt_perf['improvement_over_base']:.4f}")

cv = ml_results['cross_validation']
print(f"\nCross-validation: {cv['cv_mean']:.4f} ± {cv['cv_std']:.4f}")

k8s_info = ml_results['kubernetes_info']
print(f"\nKubernetes Info:")
print(f"  Pod: {k8s_info['pod_name']}")
print(f"  Namespace: {k8s_info['namespace']}")

## Example 2: Distributed Data Processing

Process large datasets using Kubernetes job parallelization:

In [None]:
@cluster(
    cores=6,
    memory="12Gi",
    cpu_limit=8,
    memory_limit="16Gi",
    parallel=True,  # Enable automatic parallelization
    job_name="data-processing",
    parallelism=3,  # Run up to 3 pods simultaneously
    completions=10  # Total number of completions needed
)
def distributed_data_analysis(data_chunks=100, chunk_size=10000):
    """
    Distributed data analysis across multiple Kubernetes pods.
    """
    import numpy as np
    import os
    import json
    from datetime import datetime, timedelta
    import random
    import math
    
    # Install required packages
    os.system("pip install pandas scipy numpy")
    
    import pandas as pd
    from scipy import stats
    
    print(f"Starting distributed data analysis: {data_chunks} chunks of {chunk_size:,} records each")
    print(f"Total data points: {data_chunks * chunk_size:,}")
    
    def generate_synthetic_timeseries_data(chunk_id, chunk_size):
        """Generate synthetic time-series data for analysis"""
        np.random.seed(chunk_id * 123)  # Reproducible but different per chunk
        
        # Generate timestamps (1 year of hourly data)
        start_date = datetime(2023, 1, 1) + timedelta(days=chunk_id * 10)
        timestamps = [start_date + timedelta(hours=i) for i in range(chunk_size)]
        
        # Generate multiple correlated time series
        base_trend = np.linspace(100, 200, chunk_size)  # Long-term trend
        seasonal = 20 * np.sin(2 * np.pi * np.arange(chunk_size) / (24 * 7))  # Weekly seasonality
        daily = 10 * np.sin(2 * np.pi * np.arange(chunk_size) / 24)  # Daily pattern
        
        # Add different noise patterns
        noise = np.random.normal(0, 5, chunk_size)
        
        # Primary metric (e.g., web traffic, sales, etc.)
        primary_metric = base_trend + seasonal + daily + noise
        primary_metric = np.maximum(0, primary_metric)  # Ensure non-negative
        
        # Secondary metrics correlated with primary
        secondary_metric = primary_metric * 0.7 + np.random.normal(0, 3, chunk_size)
        tertiary_metric = primary_metric * 1.2 + np.random.normal(10, 8, chunk_size)
        
        # Categorical data
        categories = ['A', 'B', 'C', 'D', 'E']
        category_weights = [0.3, 0.25, 0.2, 0.15, 0.1]
        categories_data = np.random.choice(categories, chunk_size, p=category_weights)
        
        # Geographic regions
        regions = ['North', 'South', 'East', 'West', 'Central']
        region_weights = [0.2, 0.2, 0.25, 0.2, 0.15]
        regions_data = np.random.choice(regions, chunk_size, p=region_weights)
        
        # Create DataFrame
        data = pd.DataFrame({
            'timestamp': timestamps,
            'primary_metric': primary_metric,
            'secondary_metric': secondary_metric,
            'tertiary_metric': tertiary_metric,
            'category': categories_data,
            'region': regions_data,
            'chunk_id': chunk_id
        })
        
        return data
    
    def analyze_chunk_statistics(chunk_data):
        """Comprehensive statistical analysis of a data chunk"""
        numeric_cols = ['primary_metric', 'secondary_metric', 'tertiary_metric']
        
        statistics = {}
        
        # Basic descriptive statistics
        for col in numeric_cols:
            series = chunk_data[col]
            statistics[col] = {
                'count': len(series),
                'mean': float(np.mean(series)),
                'median': float(np.median(series)),
                'std': float(np.std(series)),
                'min': float(np.min(series)),
                'max': float(np.max(series)),
                'q25': float(np.percentile(series, 25)),
                'q75': float(np.percentile(series, 75)),
                'skewness': float(stats.skew(series)),
                'kurtosis': float(stats.kurtosis(series))
            }
        
        # Correlation analysis
        correlation_matrix = chunk_data[numeric_cols].corr()
        statistics['correlations'] = {
            'primary_secondary': float(correlation_matrix.loc['primary_metric', 'secondary_metric']),
            'primary_tertiary': float(correlation_matrix.loc['primary_metric', 'tertiary_metric']),
            'secondary_tertiary': float(correlation_matrix.loc['secondary_metric', 'tertiary_metric'])
        }
        
        # Categorical analysis
        category_stats = chunk_data['category'].value_counts()
        region_stats = chunk_data['region'].value_counts()
        
        statistics['categorical'] = {
            'category_distribution': category_stats.to_dict(),
            'region_distribution': region_stats.to_dict(),
            'category_entropy': float(-sum(p * np.log2(p) for p in category_stats / len(chunk_data) if p > 0)),
            'region_entropy': float(-sum(p * np.log2(p) for p in region_stats / len(chunk_data) if p > 0))
        }
        
        # Time-based analysis
        chunk_data['hour'] = chunk_data['timestamp'].dt.hour
        chunk_data['day_of_week'] = chunk_data['timestamp'].dt.dayofweek
        
        hourly_pattern = chunk_data.groupby('hour')['primary_metric'].mean()
        daily_pattern = chunk_data.groupby('day_of_week')['primary_metric'].mean()
        
        statistics['temporal'] = {
            'hourly_peak': int(hourly_pattern.idxmax()),
            'hourly_trough': int(hourly_pattern.idxmin()),
            'hourly_variation': float(hourly_pattern.std()),
            'daily_peak': int(daily_pattern.idxmax()),  # 0=Monday, 6=Sunday
            'daily_variation': float(daily_pattern.std())
        }
        
        # Anomaly detection (simple threshold-based)
        for col in numeric_cols:
            series = chunk_data[col]
            q1, q3 = np.percentile(series, [25, 75])
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            
            outliers = series[(series < lower_bound) | (series > upper_bound)]
            statistics[col]['outliers'] = {
                'count': len(outliers),
                'percentage': float(len(outliers) / len(series) * 100),
                'lower_bound': float(lower_bound),
                'upper_bound': float(upper_bound)
            }
        
        return statistics
    
    def detect_patterns_and_trends(chunk_data):
        """Advanced pattern detection and trend analysis"""
        patterns = {}
        
        # Trend analysis using linear regression
        time_index = np.arange(len(chunk_data))
        
        for col in ['primary_metric', 'secondary_metric', 'tertiary_metric']:
            slope, intercept, r_value, p_value, std_err = stats.linregress(time_index, chunk_data[col])
            
            patterns[f'{col}_trend'] = {
                'slope': float(slope),
                'r_squared': float(r_value ** 2),
                'p_value': float(p_value),
                'trend_direction': 'increasing' if slope > 0 else 'decreasing',
                'trend_strength': 'strong' if abs(r_value) > 0.7 else 'moderate' if abs(r_value) > 0.3 else 'weak'
            }
        
        # Seasonality detection (simplified)
        primary_hourly = chunk_data.groupby(chunk_data['timestamp'].dt.hour)['primary_metric'].mean()
        hourly_variation = primary_hourly.std() / primary_hourly.mean()
        
        patterns['seasonality'] = {
            'hourly_coefficient_of_variation': float(hourly_variation),
            'has_daily_pattern': hourly_variation > 0.15,  # Threshold for daily seasonality
            'peak_hours': [int(hour) for hour in primary_hourly.nlargest(3).index.tolist()],
            'trough_hours': [int(hour) for hour in primary_hourly.nsmallest(3).index.tolist()]
        }
        
        # Change point detection (simplified)
        def detect_change_points(series, window=100):
            if len(series) < 2 * window:
                return []
            
            change_points = []
            for i in range(window, len(series) - window):
                before = series[i-window:i]
                after = series[i:i+window]
                
                # Statistical test for difference in means
                t_stat, p_val = stats.ttest_ind(before, after)
                if p_val < 0.01:  # Significant change
                    change_points.append(i)
            
            return change_points
        
        change_points = detect_change_points(chunk_data['primary_metric'].values)
        patterns['change_points'] = {
            'detected_points': len(change_points),
            'positions': change_points[:5] if change_points else [],  # First 5
            'has_significant_changes': len(change_points) > 0
        }
        
        return patterns
    
    # Process chunks (this loop will be automatically parallelized)
    chunk_results = []
    
    for chunk_id in range(data_chunks):
        if chunk_id % 10 == 0:
            print(f"Processing chunk {chunk_id + 1}/{data_chunks}...")
        
        # Generate data for this chunk
        chunk_data = generate_synthetic_timeseries_data(chunk_id, chunk_size)
        
        # Analyze the chunk
        chunk_stats = analyze_chunk_statistics(chunk_data)
        chunk_patterns = detect_patterns_and_trends(chunk_data)
        
        chunk_result = {
            'chunk_id': chunk_id,
            'chunk_size': len(chunk_data),
            'statistics': chunk_stats,
            'patterns': chunk_patterns,
            'processing_timestamp': datetime.now().isoformat()
        }
        
        chunk_results.append(chunk_result)
    
    # Aggregate results across all chunks
    def aggregate_chunk_results(chunk_results):
        """Aggregate statistics across all processed chunks"""
        
        total_records = sum(chunk['chunk_size'] for chunk in chunk_results)
        
        # Aggregate basic statistics
        metrics = ['primary_metric', 'secondary_metric', 'tertiary_metric']
        aggregated_stats = {}
        
        for metric in metrics:
            means = [chunk['statistics'][metric]['mean'] for chunk in chunk_results]
            stds = [chunk['statistics'][metric]['std'] for chunk in chunk_results]
            
            aggregated_stats[metric] = {
                'global_mean': float(np.mean(means)),
                'mean_std': float(np.std(means)),
                'avg_within_chunk_std': float(np.mean(stds)),
                'total_variation': float(np.std(means) + np.mean(stds))
            }
        
        # Aggregate patterns
        trend_directions = {}
        for metric in metrics:
            directions = [chunk['patterns'][f'{metric}_trend']['trend_direction'] 
                         for chunk in chunk_results]
            trend_directions[metric] = {
                'increasing_chunks': directions.count('increasing'),
                'decreasing_chunks': directions.count('decreasing'),
                'dominant_trend': 'increasing' if directions.count('increasing') > directions.count('decreasing') else 'decreasing'
            }
        
        # Aggregate seasonality
        seasonal_chunks = sum(1 for chunk in chunk_results 
                            if chunk['patterns']['seasonality']['has_daily_pattern'])
        
        # Aggregate change points
        total_change_points = sum(chunk['patterns']['change_points']['detected_points'] 
                                for chunk in chunk_results)
        
        aggregated_results = {
            'processing_summary': {
                'total_chunks': len(chunk_results),
                'total_records': total_records,
                'avg_records_per_chunk': total_records / len(chunk_results),
                'processing_completed': datetime.now().isoformat()
            },
            'aggregated_statistics': aggregated_stats,
            'global_patterns': {
                'trend_analysis': trend_directions,
                'seasonality': {
                    'chunks_with_daily_patterns': seasonal_chunks,
                    'percentage_seasonal': float(seasonal_chunks / len(chunk_results) * 100)
                },
                'change_points': {
                    'total_detected': total_change_points,
                    'avg_per_chunk': float(total_change_points / len(chunk_results))
                }
            },
            'data_quality': {
                'chunks_processed': len(chunk_results),
                'processing_success_rate': 100.0,  # All chunks processed successfully
                'data_consistency_score': float(np.mean([chunk['statistics']['primary_metric']['std'] 
                                                       for chunk in chunk_results]) / 
                                               np.std([chunk['statistics']['primary_metric']['mean'] 
                                                      for chunk in chunk_results])) if len(chunk_results) > 1 else 1.0
            },
            'kubernetes_execution': {
                'pod_hostname': os.environ.get('HOSTNAME', 'unknown'),
                'parallel_execution': True,
                'chunk_distribution': 'automatic_parallelization'
            }
        }
        
        return aggregated_results
    
    final_results = aggregate_chunk_results(chunk_results)
    final_results['individual_chunks'] = chunk_results[:5]  # Include first 5 for inspection
    
    return final_results

# Run distributed data analysis
data_results = distributed_data_analysis(data_chunks=50, chunk_size=5000)

print(f"\nDISTRIBUTED DATA ANALYSIS COMPLETE")
summary = data_results['processing_summary']
print(f"Chunks processed: {summary['total_chunks']}")
print(f"Total records: {summary['total_records']:,}")
print(f"Avg records per chunk: {summary['avg_records_per_chunk']:,.0f}")

patterns = data_results['global_patterns']
print(f"\nGlobal Patterns:")
print(f"  Chunks with daily seasonality: {patterns['seasonality']['chunks_with_daily_patterns']} ({patterns['seasonality']['percentage_seasonal']:.1f}%)")
print(f"  Total change points detected: {patterns['change_points']['total_detected']}")
print(f"  Average change points per chunk: {patterns['change_points']['avg_per_chunk']:.2f}")

quality = data_results['data_quality']
print(f"\nData Quality:")
print(f"  Processing success rate: {quality['processing_success_rate']:.1f}%")
print(f"  Data consistency score: {quality['data_consistency_score']:.3f}")

k8s_exec = data_results['kubernetes_execution']
print(f"\nKubernetes Execution:")
print(f"  Pod hostname: {k8s_exec['pod_hostname']}")
print(f"  Parallel execution: {k8s_exec['parallel_execution']}")

## Example 3: Fault-Tolerant Scientific Computing

Demonstrate Kubernetes' fault tolerance and job retry capabilities:

In [None]:
@cluster(
    cores=4,
    memory="8Gi",
    cpu_limit=6,
    memory_limit="12Gi",
    backoff_limit=5,  # Retry up to 5 times on failure
    restart_policy="OnFailure",
    job_name="fault-tolerant-computation"
)
def fault_tolerant_monte_carlo(n_simulations=1000000, failure_probability=0.1, checkpoint_interval=100000):
    """
    Fault-tolerant Monte Carlo simulation with checkpointing.
    """
    import numpy as np
    import os
    import json
    import pickle
    import random
    import time
    from datetime import datetime
    
    print(f"Starting fault-tolerant Monte Carlo: {n_simulations:,} simulations")
    print(f"Failure probability: {failure_probability}, Checkpoint interval: {checkpoint_interval:,}")
    
    # Simulate random failures for demonstration
    def simulate_random_failure():
        if random.random() < failure_probability:
            failure_types = [
                "Simulated network timeout",
                "Simulated memory pressure",
                "Simulated compute node failure",
                "Simulated resource exhaustion"
            ]
            failure_type = random.choice(failure_types)
            print(f"WARNING: {failure_type} - continuing with fault tolerance...")
            time.sleep(2)  # Simulate recovery time
            return True
        return False
    
    # Checkpoint management
    checkpoint_file = "/tmp/monte_carlo_checkpoint.pkl"
    
    def save_checkpoint(iteration, results, random_state):
        """Save current progress to checkpoint"""
        checkpoint_data = {
            'iteration': iteration,
            'results': results,
            'random_state': random_state,
            'timestamp': datetime.now().isoformat()
        }
        
        try:
            with open(checkpoint_file, 'wb') as f:
                pickle.dump(checkpoint_data, f)
            print(f"Checkpoint saved at iteration {iteration:,}")
        except Exception as e:
            print(f"Failed to save checkpoint: {e}")
    
    def load_checkpoint():
        """Load progress from checkpoint if available"""
        if os.path.exists(checkpoint_file):
            try:
                with open(checkpoint_file, 'rb') as f:
                    checkpoint_data = pickle.load(f)
                print(f"Checkpoint loaded from iteration {checkpoint_data['iteration']:,}")
                return checkpoint_data
            except Exception as e:
                print(f"Failed to load checkpoint: {e}")
        return None
    
    # Monte Carlo simulation functions
    def estimate_pi_sample():
        """Single sample for pi estimation"""
        x, y = np.random.random(2)
        return 1 if x*x + y*y <= 1 else 0
    
    def option_pricing_sample(S0=100, K=105, T=1, r=0.05, sigma=0.2):
        """Single Monte Carlo sample for option pricing"""
        # Geometric Brownian Motion
        dt = T
        z = np.random.standard_normal()
        ST = S0 * np.exp((r - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * z)
        payoff = max(ST - K, 0)  # Call option payoff
        return payoff * np.exp(-r * T)  # Discounted payoff
    
    def portfolio_var_sample(returns_mean=0.08, returns_std=0.2, portfolio_value=1000000):
        """Single sample for portfolio Value at Risk calculation"""
        daily_return = np.random.normal(returns_mean/252, returns_std/np.sqrt(252))
        portfolio_change = portfolio_value * daily_return
        return portfolio_change
    
    def percolation_sample(grid_size=50, p=0.593):
        """Single sample for percolation theory"""
        # Simplified 2D percolation
        grid = np.random.random((grid_size, grid_size)) < p
        # Check if there's a path from top to bottom (simplified)
        # This is a very simplified percolation check
        top_row = grid[0, :]
        bottom_row = grid[-1, :]
        return 1 if np.any(top_row) and np.any(bottom_row) else 0
    
    # Load checkpoint if available
    checkpoint = load_checkpoint()
    if checkpoint:
        start_iteration = checkpoint['iteration']
        pi_samples = checkpoint['results']['pi_samples']
        option_prices = checkpoint['results']['option_prices']
        portfolio_changes = checkpoint['results']['portfolio_changes']
        percolation_samples = checkpoint['results']['percolation_samples']
        # Restore random state
        np.random.set_state(checkpoint['random_state'])
        print(f"Resuming from iteration {start_iteration:,}")
    else:
        start_iteration = 0
        pi_samples = []
        option_prices = []
        portfolio_changes = []
        percolation_samples = []
    
    # Main simulation loop with fault tolerance
    failure_count = 0
    successful_simulations = start_iteration
    
    for i in range(start_iteration, n_simulations):
        if i % (n_simulations // 20) == 0:
            print(f"Progress: {i:,}/{n_simulations:,} ({100*i/n_simulations:.1f}%)")
        
        # Simulate potential failures
        if simulate_random_failure():
            failure_count += 1
            continue  # Skip this iteration but continue
        
        # Perform Monte Carlo samples
        try:
            pi_sample = estimate_pi_sample()
            option_price = option_pricing_sample()
            portfolio_change = portfolio_var_sample()
            percolation = percolation_sample()
            
            pi_samples.append(pi_sample)
            option_prices.append(option_price)
            portfolio_changes.append(portfolio_change)
            percolation_samples.append(percolation)
            
            successful_simulations += 1
            
        except Exception as e:
            print(f"Simulation error at iteration {i}: {e}")
            failure_count += 1
            continue
        
        # Checkpoint periodically
        if (i + 1) % checkpoint_interval == 0:
            results = {
                'pi_samples': pi_samples,
                'option_prices': option_prices,
                'portfolio_changes': portfolio_changes,
                'percolation_samples': percolation_samples
            }
            save_checkpoint(i + 1, results, np.random.get_state())
    
    # Final calculations
    print(f"Simulation completed. Successful: {successful_simulations:,}, Failures: {failure_count}")
    
    # Pi estimation
    pi_estimate = 4 * np.mean(pi_samples) if pi_samples else 0
    pi_error = abs(pi_estimate - np.pi) if pi_samples else 0
    pi_confidence_interval = 1.96 * np.sqrt(np.var(pi_samples) / len(pi_samples)) if len(pi_samples) > 1 else 0
    
    # Option pricing
    option_price_mean = np.mean(option_prices) if option_prices else 0
    option_price_std = np.std(option_prices) if len(option_prices) > 1 else 0
    option_confidence_interval = 1.96 * option_price_std / np.sqrt(len(option_prices)) if len(option_prices) > 1 else 0
    
    # Portfolio VaR (95% confidence)
    if portfolio_changes:
        portfolio_changes_sorted = sorted(portfolio_changes)
        var_95 = portfolio_changes_sorted[int(0.05 * len(portfolio_changes))]
        expected_shortfall = np.mean(portfolio_changes_sorted[:int(0.05 * len(portfolio_changes))])
    else:
        var_95 = 0
        expected_shortfall = 0
    
    # Percolation probability
    percolation_probability = np.mean(percolation_samples) if percolation_samples else 0
    
    # Cleanup checkpoint file
    try:
        os.remove(checkpoint_file)
        print("Checkpoint file cleaned up")
    except:
        pass
    
    fault_tolerant_results = {
        'simulation_parameters': {
            'total_simulations_requested': n_simulations,
            'successful_simulations': successful_simulations,
            'simulated_failures': failure_count,
            'success_rate': successful_simulations / n_simulations if n_simulations > 0 else 0,
            'checkpoint_interval': checkpoint_interval
        },
        'pi_estimation': {
            'estimate': pi_estimate,
            'true_value': float(np.pi),
            'absolute_error': pi_error,
            'relative_error_percent': (pi_error / np.pi) * 100,
            'confidence_interval_95': pi_confidence_interval * 4,  # Scale for pi
            'samples_used': len(pi_samples)
        },
        'option_pricing': {
            'estimated_price': option_price_mean,
            'price_std_dev': option_price_std,
            'confidence_interval_95': option_confidence_interval,
            'samples_used': len(option_prices)
        },
        'portfolio_risk': {
            'value_at_risk_95': var_95,
            'expected_shortfall': expected_shortfall,
            'daily_volatility': np.std(portfolio_changes) if len(portfolio_changes) > 1 else 0,
            'samples_used': len(portfolio_changes)
        },
        'percolation_analysis': {
            'percolation_probability': percolation_probability,
            'theoretical_threshold': 0.593,  # 2D percolation threshold
            'samples_used': len(percolation_samples)
        },
        'fault_tolerance': {
            'checkpoint_saves': successful_simulations // checkpoint_interval,
            'recovery_successful': checkpoint is not None,
            'resilience_score': (successful_simulations / (successful_simulations + failure_count)) if (successful_simulations + failure_count) > 0 else 0
        },
        'kubernetes_info': {
            'pod_name': os.environ.get('HOSTNAME', 'unknown'),
            'restart_count': int(os.environ.get('RESTART_COUNT', '0')),
            'completion_time': datetime.now().isoformat()
        }
    }
    
    return fault_tolerant_results

# Run fault-tolerant Monte Carlo simulation
mc_results = fault_tolerant_monte_carlo(
    n_simulations=500000, 
    failure_probability=0.05,  # 5% chance of simulated failure
    checkpoint_interval=50000
)

print(f"\nFAULT-TOLERANT MONTE CARLO COMPLETE")
sim_params = mc_results['simulation_parameters']
print(f"Requested simulations: {sim_params['total_simulations_requested']:,}")
print(f"Successful simulations: {sim_params['successful_simulations']:,}")
print(f"Simulated failures: {sim_params['simulated_failures']}")
print(f"Success rate: {sim_params['success_rate']*100:.1f}%")

pi_est = mc_results['pi_estimation']
print(f"\nPi Estimation:")
print(f"  Estimate: {pi_est['estimate']:.6f}")
print(f"  True value: {pi_est['true_value']:.6f}")
print(f"  Error: {pi_est['relative_error_percent']:.4f}%")

option = mc_results['option_pricing']
print(f"\nOption Pricing:")
print(f"  Estimated price: ${option['estimated_price']:.2f}")
print(f"  Standard deviation: ${option['price_std_dev']:.2f}")

risk = mc_results['portfolio_risk']
print(f"\nPortfolio Risk:")
print(f"  VaR (95%): ${risk['value_at_risk_95']:,.0f}")
print(f"  Expected shortfall: ${risk['expected_shortfall']:,.0f}")

fault_tol = mc_results['fault_tolerance']
print(f"\nFault Tolerance:")
print(f"  Checkpoints saved: {fault_tol['checkpoint_saves']}")
print(f"  Resilience score: {fault_tol['resilience_score']:.3f}")

k8s_info = mc_results['kubernetes_info']
print(f"\nKubernetes Info:")
print(f"  Pod: {k8s_info['pod_name']}")
print(f"  Restart count: {k8s_info['restart_count']}")

## Kubernetes Resource Management and Best Practices

In [None]:
# Example Kubernetes resource configurations
def get_kubernetes_resource_examples():
    """
    Examples of resource configurations for different workload types.
    """
    
    examples = {
        'cpu_intensive': {
            'cores': 8,
            'memory': '16Gi',
            'cpu_limit': 8,
            'memory_limit': '20Gi'
        },
        'memory_intensive': {
            'cores': 4,
            'memory': '32Gi',
            'cpu_limit': 6,
            'memory_limit': '40Gi'
        },
        'ml_training': {
            'cores': 6,
            'memory': '24Gi',
            'cpu_limit': 8,
            'memory_limit': '32Gi'
        }
    }
    
    return examples

# Example usage:
# resources = get_kubernetes_resource_examples()
# print(f"Available resource patterns: {list(resources.keys())}")

## Clustrix Kubernetes Configuration Examples

### Basic Computation
**Use case**: Simple mathematical computations

```python
@cluster(
    cores=2,
    memory="4Gi",
    cpu_limit=3,
    memory_limit="6Gi",
    container_image="python:3.11-slim"
)
```

### ML Training
**Use case**: Machine learning model training with fault tolerance

```python
@cluster(
    cores=8,
    memory="32Gi",
    cpu_limit=12,
    memory_limit="40Gi",
    container_image="python:3.11",
    job_name="ml-training",
    backoff_limit=3
)
```

### Parallel Processing
**Use case**: Embarrassingly parallel data processing

```python
@cluster(
    cores=4,
    memory="16Gi",
    parallel=True,
    parallelism=5,
    completions=20,
    job_name="parallel-processing"
)
```

### Fault Tolerant
**Use case**: Long-running computations with automatic retry

```python
@cluster(
    cores=6,
    memory="24Gi",
    backoff_limit=5,
    restart_policy="OnFailure",
    job_ttl_seconds=7200,
    active_deadline_seconds=3600
)
```

## Kubernetes Job Patterns

### Single Job
- **Description**: Single pod, run-to-completion
- **Best for**: One-off computations, small datasets
- **Parameters**:
  - completions: 1
  - parallelism: 1
  - backoff_limit: 3

### Parallel Job
- **Description**: Multiple pods running simultaneously
- **Best for**: Independent parallel tasks, embarrassingly parallel problems
- **Parameters**:
  - completions: 10
  - parallelism: 5
  - backoff_limit: 2

### Queue Job
- **Description**: Work queue pattern with multiple workers
- **Best for**: Dynamic workloads, task queues, streaming data
- **Parameters**:
  - completions: None (no fixed completion count)
  - parallelism: 3
  - backoff_limit: 5

### Indexed Job
- **Description**: Jobs with completion index for task assignment
- **Best for**: Parameter sweeps, data partitioning, batch processing
- **Parameters**:
  - completion_mode: Indexed
  - completions: 20
  - parallelism: 4

## Kubernetes Resource Management Guidelines

### CPU Intensive Workloads
- **Description**: Mathematical computations, simulations, optimization
- **Resource ratio**: cores ≈ cpu_limit, memory moderate
- **Example configuration**:
  - cores: 8
  - memory: 16Gi
  - cpu_limit: 8
  - memory_limit: 20Gi
- **Use cases**: Monte Carlo simulations, Genetic algorithms, Scientific computing

### Memory Intensive Workloads
- **Description**: Large dataset processing, in-memory analytics
- **Resource ratio**: memory >> cores, higher memory limits
- **Example configuration**:
  - cores: 4
  - memory: 32Gi
  - cpu_limit: 6
  - memory_limit: 40Gi
- **Use cases**: Big data processing, Large ML models, Genomics analysis

### I/O Intensive Workloads
- **Description**: File processing, database operations, network I/O
- **Resource ratio**: moderate cores and memory, focus on concurrency
- **Example configuration**:
  - cores: 2
  - memory: 8Gi
  - cpu_limit: 4
  - memory_limit: 12Gi
- **Use cases**: Data ingestion, ETL pipelines, Web scraping

### ML Training Workloads
- **Description**: Machine learning model training
- **Resource ratio**: balanced cores and memory, burst capacity
- **Example configuration**:
  - cores: 6
  - memory: 24Gi
  - cpu_limit: 8
  - memory_limit: 32Gi
- **Use cases**: Deep learning, Model hyperparameter tuning, Feature engineering

## Kubernetes Cluster Monitoring

In [None]:
def check_kubernetes_cluster_status():
    """
    Check Kubernetes cluster status and resources.
    Note: This requires kubectl to be configured properly.
    """
    import subprocess
    import json
    
    def run_kubectl_command(cmd):
        """Run kubectl command and return output"""
        try:
            result = subprocess.run(
                f"kubectl {cmd}", 
                shell=True, 
                capture_output=True, 
                text=True,
                timeout=30
            )
            if result.returncode == 0:
                return result.stdout.strip()
            else:
                return f"Error: {result.stderr.strip()}"
        except subprocess.TimeoutExpired:
            return "Error: Command timed out"
        except Exception as e:
            return f"Error: {str(e)}"
    
    print("Kubernetes Cluster Status Check:")
    print("=" * 40)
    
    # Check cluster info
    print("\n1. Cluster Info:")
    cluster_info = run_kubectl_command("cluster-info")
    if "Error" not in cluster_info:
        lines = cluster_info.split('\n')[:3]  # First 3 lines
        for line in lines:
            print(f"   {line}")
    else:
        print(f"   {cluster_info}")
    
    # Check nodes
    print("\n2. Node Status:")
    nodes = run_kubectl_command("get nodes -o wide")
    if "Error" not in nodes:
        lines = nodes.split('\n')[:6]  # Header + first 5 nodes
        for line in lines:
            print(f"   {line}")
    else:
        print(f"   {nodes}")
    
    # Check namespaces
    print("\n3. Namespaces:")
    namespaces = run_kubectl_command("get namespaces")
    if "Error" not in namespaces:
        lines = namespaces.split('\n')[:8]  # Header + first 7 namespaces
        for line in lines:
            print(f"   {line}")
    else:
        print(f"   {namespaces}")
    
    # Check current context
    print("\n4. Current Context:")
    context = run_kubectl_command("config current-context")
    print(f"   {context}")
    
    # Check resource quotas
    print("\n5. Resource Quotas (default namespace):")
    quotas = run_kubectl_command("get resourcequota -n default")
    if "No resources found" in quotas:
        print("   No resource quotas configured")
    else:
        print(f"   {quotas}")
    
    # Check running jobs
    print("\n6. Running Jobs (default namespace):")
    jobs = run_kubectl_command("get jobs -n default")
    if "No resources found" in jobs:
        print("   No jobs currently running")
    else:
        lines = jobs.split('\n')[:6]  # Header + first 5 jobs
        for line in lines:
            print(f"   {line}")
    
    # Check running pods
    print("\n7. Running Pods (default namespace):")
    pods = run_kubectl_command("get pods -n default")
    if "No resources found" in pods:
        print("   No pods currently running")
    else:
        lines = pods.split('\n')[:6]  # Header + first 5 pods
        for line in lines:
            print(f"   {line}")
    
    # Check node resource usage
    print("\n8. Node Resource Usage:")
    top_nodes = run_kubectl_command("top nodes")
    if "Error" not in top_nodes and "not available" not in top_nodes:
        lines = top_nodes.split('\n')[:6]  # Header + first 5 nodes
        for line in lines:
            print(f"   {line}")
    else:
        print("   Resource metrics not available (metrics-server may not be installed)")

# Check cluster status
try:
    check_kubernetes_cluster_status()
except Exception as e:
    print(f"Failed to check Kubernetes cluster status: {e}")
    print("Make sure kubectl is installed and configured for your cluster")

## Summary

This tutorial covered Kubernetes usage with Clustrix:

1. **Kubernetes Configuration** - Setting up Clustrix for container-based computing
2. **Machine Learning Training** - Distributed ML workflows in pods
3. **Data Processing** - Large-scale data analysis with automatic parallelization
4. **Fault Tolerance** - Robust computing with checkpointing and retry mechanisms
5. **Resource Management** - Intelligent resource allocation and limits
6. **Job Patterns** - Different Kubernetes job execution patterns
7. **Cluster Monitoring** - Status checking and resource monitoring

### Key Kubernetes Advantages:

- **Containerization**: Consistent execution environments across clusters
- **Scalability**: Automatic scaling based on workload demands
- **Fault Tolerance**: Built-in restart and retry mechanisms
- **Resource Management**: Fine-grained CPU and memory control
- **Isolation**: Secure, isolated execution environments
- **Portability**: Run on any Kubernetes cluster (cloud or on-premises)

### Best Practices:

- **Resource Limits**: Always set both requests and limits for predictable scheduling
- **Container Images**: Use specific, lightweight base images for faster startup
- **Job Patterns**: Choose appropriate job patterns for your workload type
- **Fault Tolerance**: Implement checkpointing for long-running computations
- **Monitoring**: Regular cluster health and resource usage monitoring
- **Cleanup**: Set TTL for automatic job cleanup to prevent resource buildup

### Kubernetes-Specific Features:

- **`cpu_limit` and `memory_limit`**: Resource limits for burst capacity
- **`backoff_limit`**: Automatic retry on failures
- **`parallelism` and `completions`**: Parallel job execution control
- **`job_ttl_seconds`**: Automatic cleanup of completed jobs
- **`restart_policy`**: Pod restart behavior on failure
- **`active_deadline_seconds`**: Maximum job runtime limit

### Next Steps:

- Compare with [SLURM Tutorial](slurm_tutorial.ipynb) for HPC-style clusters
- Explore [PBS Tutorial](pbs_tutorial.ipynb) for traditional batch systems
- Try [SSH Tutorial](ssh_tutorial.ipynb) for simple remote execution
- Check the [Configuration Guide](../api/config.rst) for advanced settings

For more information, visit the [Clustrix Documentation](https://clustrix.readthedocs.io).