# Out-of-core/Parallel Conversion with Dask: Progress, Retries, and Idempotency

In medical data integration, we often work with large datasets that don't fit in memory or require substantial processing time. This notebook demonstrates how to use Dask for out-of-core parallel processing while implementing robust error handling, progress tracking, and idempotent operations for medical data workflows.

## Setting Up the Environment

We'll start by importing the necessary libraries and setting up our Dask environment. Dask provides parallel computing capabilities that are essential for large-scale medical data processing.

In [1]:
import dask
import dask.dataframe as dd
import dask.bag as db
from dask.distributed import Client, as_completed, progress
from dask.delayed import delayed
import pandas as pd
import numpy as np
import time
import os
import hashlib
from pathlib import Path
import logging
from functools import wraps
import json

Let's create a local Dask client to manage our parallel computations. This will give us a dashboard to monitor our tasks in real-time.

In [2]:
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
print(f"Dashboard available at: {client.dashboard_link}")
client

Dashboard available at: http://127.0.0.1:8787/status


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 7.45 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:53791,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:53815,Total threads: 2
Dashboard: http://127.0.0.1:53816/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:53798,
Local directory: C:\Users\rober\AppData\Local\Temp\dask-scratch-space\worker-1dedfgzf,Local directory: C:\Users\rober\AppData\Local\Temp\dask-scratch-space\worker-1dedfgzf

0,1
Comm: tcp://127.0.0.1:53820,Total threads: 2
Dashboard: http://127.0.0.1:53822/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:53800,
Local directory: C:\Users\rober\AppData\Local\Temp\dask-scratch-space\worker-5ykyhsq7,Local directory: C:\Users\rober\AppData\Local\Temp\dask-scratch-space\worker-5ykyhsq7

0,1
Comm: tcp://127.0.0.1:53821,Total threads: 2
Dashboard: http://127.0.0.1:53824/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:53802,
Local directory: C:\Users\rober\AppData\Local\Temp\dask-scratch-space\worker-5ak4z0zx,Local directory: C:\Users\rober\AppData\Local\Temp\dask-scratch-space\worker-5ak4z0zx

0,1
Comm: tcp://127.0.0.1:53814,Total threads: 2
Dashboard: http://127.0.0.1:53817/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:53804,
Local directory: C:\Users\rober\AppData\Local\Temp\dask-scratch-space\worker-sdctv9_g,Local directory: C:\Users\rober\AppData\Local\Temp\dask-scratch-space\worker-sdctv9_g


## Creating Sample Medical Data

We'll generate synthetic medical data that simulates patient records, lab results, and imaging metadata. This data will be large enough to demonstrate out-of-core processing techniques.

In [3]:
def generate_patient_data(n_patients=10000):
    """Generate synthetic patient data"""
    np.random.seed(42)
    
    data = {
        'patient_id': [f'PAT_{i:06d}' for i in range(n_patients)],
        'age': np.random.normal(45, 15, n_patients).astype(int),
        'gender': np.random.choice(['M', 'F'], n_patients),
        'blood_pressure_systolic': np.random.normal(120, 20, n_patients),
        'blood_pressure_diastolic': np.random.normal(80, 10, n_patients),
        'cholesterol': np.random.normal(200, 40, n_patients),
        'glucose': np.random.normal(100, 25, n_patients),
        'diagnosis_code': np.random.choice(['I10', 'E11', 'J44', 'N18', 'F32'], n_patients)
    }
    
    return pd.DataFrame(data)

# Generate and save sample data
sample_data = generate_patient_data(50000)
sample_data.to_parquet('patient_data.parquet', index=False)
print(f"Generated {len(sample_data)} patient records")
sample_data.head()

Generated 50000 patient records


Unnamed: 0,patient_id,age,gender,blood_pressure_systolic,blood_pressure_diastolic,cholesterol,glucose,diagnosis_code
0,PAT_000000,52,F,104.670376,73.827472,241.369531,108.155472,F32
1,PAT_000001,42,M,114.568068,64.99531,234.934741,110.5426,N18
2,PAT_000002,54,F,134.88076,93.0751,233.232742,92.089789,F32
3,PAT_000003,67,M,126.92162,71.918336,160.387754,80.155702,I10
4,PAT_000004,41,F,92.933328,84.259369,210.757392,108.522527,N18


## Implementing Idempotent Operations

Idempotency ensures that operations can be safely repeated without changing the result. This is crucial in medical data processing where partial failures might require rerunning computations.

In [4]:
def create_checksum(data):
    """Create a checksum for data to ensure idempotency"""
    if isinstance(data, pd.DataFrame):
        data_str = data.to_string()
    else:
        data_str = str(data)
    return hashlib.md5(data_str.encode()).hexdigest()

def idempotent_operation(func):
    """Decorator to make operations idempotent"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # Create unique identifier for this operation
        op_id = f"{func.__name__}_{create_checksum(str(args) + str(kwargs))[:8]}"
        output_file = f"cache/{op_id}.parquet"
        
        # Check if result already exists
        if os.path.exists(output_file):
            print(f"Loading cached result for {op_id}")
            return pd.read_parquet(output_file)
        
        # Execute function and cache result
        os.makedirs('cache', exist_ok=True)
        result = func(*args, **kwargs)
        if isinstance(result, pd.DataFrame):
            result.to_parquet(output_file, index=False)
        print(f"Cached result for {op_id}")
        return result
    
    return wrapper

## Implementing Retry Logic

Medical data processing can fail due to network issues, temporary resource constraints, or data quality problems. Implementing robust retry logic helps ensure our pipelines complete successfully.

In [5]:
def retry_on_failure(max_retries=3, delay=1):
    """Decorator to retry failed operations"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_retries:
                        print(f"Attempt {attempt + 1} failed: {str(e)}. Retrying in {delay} seconds...")
                        time.sleep(delay * (2 ** attempt))  # Exponential backoff
                    else:
                        print(f"All {max_retries + 1} attempts failed")
            
            raise last_exception
        return wrapper
    return decorator

## Defining Medical Data Processing Functions

We'll create several data processing functions that simulate common medical data transformations. These functions will incorporate our idempotency and retry mechanisms.

In [6]:
@idempotent_operation
@retry_on_failure(max_retries=2)
def calculate_cardiovascular_risk(df_chunk):
    """Calculate cardiovascular risk score for a chunk of patients"""
    # Simulate processing time
    time.sleep(0.1)
    
    # Calculate risk score based on multiple factors
    risk_score = (
        (df_chunk['age'] * 0.02) +
        (df_chunk['blood_pressure_systolic'] * 0.01) +
        (df_chunk['cholesterol'] * 0.005) +
        (df_chunk['glucose'] * 0.003)
    )
    
    df_result = df_chunk.copy()
    df_result['cv_risk_score'] = risk_score
    df_result['risk_category'] = pd.cut(risk_score, 
                                       bins=[0, 5, 10, 15, float('inf')],
                                       labels=['Low', 'Medium', 'High', 'Very High'])
    
    return df_result

Let's create another processing function that performs data quality checks. This demonstrates how to handle potential data issues in medical datasets.

In [7]:
@idempotent_operation
@retry_on_failure(max_retries=2)
def perform_data_quality_check(df_chunk):
    """Perform data quality checks on patient data"""
    time.sleep(0.05)
    
    df_result = df_chunk.copy()
    
    # Flag abnormal values
    df_result['age_valid'] = df_chunk['age'].between(0, 120)
    df_result['bp_valid'] = (
        df_chunk['blood_pressure_systolic'].between(70, 250) & 
        df_chunk['blood_pressure_diastolic'].between(40, 150)
    )
    df_result['cholesterol_valid'] = df_chunk['cholesterol'].between(100, 400)
    df_result['glucose_valid'] = df_chunk['glucose'].between(50, 300)
    
    # Overall data quality score
    quality_columns = ['age_valid', 'bp_valid', 'cholesterol_valid', 'glucose_valid']
    df_result['data_quality_score'] = df_result[quality_columns].sum(axis=1) / len(quality_columns)
    
    return df_result

## Loading Data with Dask for Out-of-Core Processing

We'll load our patient data using Dask DataFrame, which allows us to work with datasets larger than memory. The data will be automatically partitioned across multiple cores.

In [8]:
# Load data with Dask
ddf = dd.read_parquet('patient_data.parquet')
print(f"Loaded Dask DataFrame with {len(ddf)} rows and {ddf.npartitions} partitions")
print(f"Memory usage per partition: ~{ddf.memory_usage(deep=True).sum().compute() / ddf.npartitions / 1024**2:.2f} MB")
ddf.head()

Loaded Dask DataFrame with 50000 rows and 1 partitions
Memory usage per partition: ~3.72 MB


Unnamed: 0,patient_id,age,gender,blood_pressure_systolic,blood_pressure_diastolic,cholesterol,glucose,diagnosis_code
0,PAT_000000,52,F,104.670376,73.827472,241.369531,108.155472,F32
1,PAT_000001,42,M,114.568068,64.99531,234.934741,110.5426,N18
2,PAT_000002,54,F,134.88076,93.0751,233.232742,92.089789,F32
3,PAT_000003,67,M,126.92162,71.918336,160.387754,80.155702,I10
4,PAT_000004,41,F,92.933328,84.259369,210.757392,108.522527,N18


## Creating Delayed Operations for Progress Tracking

We'll use Dask's delayed operations to create a computation graph that we can monitor. This approach allows us to track progress and handle failures gracefully.

In [9]:
# Convert partitions to delayed operations
partitions = ddf.to_delayed()
print(f"Created {len(partitions)} delayed partitions")

# Create delayed computation graph for cardiovascular risk calculation
cv_risk_tasks = [delayed(calculate_cardiovascular_risk)(partition) for partition in partitions]
print(f"Created {len(cv_risk_tasks)} cardiovascular risk calculation tasks")

Created 1 delayed partitions
Created 1 cardiovascular risk calculation tasks


## Executing Tasks with Progress Monitoring

We'll execute our delayed tasks while monitoring progress in real-time. This is essential for long-running medical data processing jobs where we need visibility into the processing status.

In [10]:
# Execute with progress tracking
print("Starting cardiovascular risk calculation...")
start_time = time.time()

# Compute with progress bar
cv_results = dask.compute(*cv_risk_tasks)

end_time = time.time()
print(f"Completed in {end_time - start_time:.2f} seconds")
print(f"Processed {sum(len(result) for result in cv_results)} patient records")

Starting cardiovascular risk calculation...
Completed in 0.26 seconds
Processed 50000 patient records


Let's combine the results and examine the cardiovascular risk distribution. This demonstrates how to aggregate results from parallel processing operations.

In [11]:
# Combine results
combined_cv_results = pd.concat(cv_results, ignore_index=True)
print("Cardiovascular Risk Distribution:")
print(combined_cv_results['risk_category'].value_counts())
print(f"\nMean CV Risk Score: {combined_cv_results['cv_risk_score'].mean():.2f}")
print(f"Standard Deviation: {combined_cv_results['cv_risk_score'].std():.2f}")

Cardiovascular Risk Distribution:
risk_category
Low          49999
Medium           1
High             0
Very High        0
Name: count, dtype: int64

Mean CV Risk Score: 3.39
Standard Deviation: 0.42


## Implementing Batch Processing with Error Handling

For more complex scenarios, we'll implement a batch processing system that can handle failures gracefully and provide detailed progress information.

In [12]:
def process_medical_data_batch(partitions, processing_func, batch_name="Processing"):
    """Process partitions in batches with progress tracking and error handling"""
    
    # Create delayed tasks
    tasks = [delayed(processing_func)(partition) for partition in partitions]
    
    # Submit tasks and track completion
    futures = client.compute(tasks)
    
    results = []
    failed_tasks = []
    
    print(f"Starting {batch_name} for {len(futures)} partitions...")
    
    # Process completed tasks as they finish
    for i, future in enumerate(as_completed(futures)):
        try:
            result = future.result()
            results.append(result)
            print(f"✓ Completed partition {i+1}/{len(futures)}")
        except Exception as e:
            print(f"✗ Failed partition {i+1}/{len(futures)}: {str(e)}")
            failed_tasks.append((i, str(e)))
    
    print(f"\n{batch_name} Summary:")
    print(f"  Successful: {len(results)}/{len(futures)}")
    print(f"  Failed: {len(failed_tasks)}/{len(futures)}")
    
    return results, failed_tasks

Now let's use our batch processing function to perform data quality checks. This demonstrates how to handle multiple processing steps in a robust medical data pipeline.

In [13]:
# Perform data quality checks
quality_results, quality_failures = process_medical_data_batch(
    partitions, 
    perform_data_quality_check, 
    "Data Quality Assessment"
)

# Analyze quality results if successful
if quality_results:
    combined_quality_results = pd.concat(quality_results, ignore_index=True)
    print("\nData Quality Summary:")
    print(f"Mean data quality score: {combined_quality_results['data_quality_score'].mean():.3f}")
    print(f"Records with perfect quality: {(combined_quality_results['data_quality_score'] == 1.0).sum()}")
    print(f"Records requiring attention: {(combined_quality_results['data_quality_score'] < 0.8).sum()}")

Starting Data Quality Assessment for 1 partitions...
✓ Completed partition 1/1

Data Quality Assessment Summary:
  Successful: 1/1
  Failed: 0/1

Data Quality Summary:
Mean data quality score: 0.991
Records with perfect quality: 48202
Records requiring attention: 1798


## Monitoring Resource Usage and Performance

Understanding resource usage is crucial for optimizing medical data processing workflows. Let's examine how our tasks performed and identify potential bottlenecks.

In [14]:
# Get performance metrics from Dask client
def get_performance_summary():
    """Get summary of Dask cluster performance"""
    
    # Get worker information
    workers_info = client.scheduler_info()['workers']
    
    total_cores = sum(info['nthreads'] for info in workers_info.values())
    total_memory = sum(info['memory_limit'] for info in workers_info.values())
    
    print("Cluster Performance Summary:")
    print(f"  Workers: {len(workers_info)}")
    print(f"  Total cores: {total_cores}")
    print(f"  Total memory: {total_memory / 1024**3:.2f} GB")
    
    # Get task information
    processing_info = client.processing()
    print(f"  Currently processing: {len(processing_info)} tasks")
    
    return {
        'workers': len(workers_info),
        'cores': total_cores,
        'memory_gb': total_memory / 1024**3,
        'active_tasks': len(processing_info)
    }

performance_summary = get_performance_summary()

Cluster Performance Summary:
  Workers: 4
  Total cores: 8
  Total memory: 7.45 GB
  Currently processing: 4 tasks


## Saving Results with Idempotency

Finally, we'll save our processed results in a way that ensures idempotency. This allows us to safely rerun our pipeline without duplicating or corrupting our output data.

In [15]:
def save_results_idempotent(data, filename, metadata=None):
    """Save results with idempotency checks"""
    
    # Create metadata file path
    metadata_file = filename.replace('.parquet', '_metadata.json')
    
    # Generate data checksum
    data_checksum = create_checksum(data)
    
    # Check if file exists and has same checksum
    if os.path.exists(filename) and os.path.exists(metadata_file):
        with open(metadata_file, 'r') as f:
            existing_metadata = json.load(f)
        
        if existing_metadata.get('checksum') == data_checksum:
            print(f"File {filename} already exists with same data. Skipping save.")
            return
    
    # Save data
    data.to_parquet(filename, index=False)
    
    # Save metadata
    save_metadata = {
        'checksum': data_checksum,
        'rows': len(data),
        'columns': list(data.columns),
        'created_at': time.strftime('%Y-%m-%d %H:%M:%S'),
        **(metadata or {})
    }
    
    with open(metadata_file, 'w') as f:
        json.dump(save_metadata, f, indent=2)
    
    print(f"Saved {filename} with {len(data)} records")

# Save processed results
save_results_idempotent(
    combined_cv_results, 
    'processed_cv_risk_data.parquet',
    metadata={
        'processing_type': 'cardiovascular_risk_assessment',
        'performance': performance_summary
    }
)

if quality_results:
    save_results_idempotent(
        combined_quality_results,
        'data_quality_results.parquet',
        metadata={
            'processing_type': 'data_quality_assessment',
            'failed_partitions': len(quality_failures)
        }
    )

Saved processed_cv_risk_data.parquet with 50000 records
Saved data_quality_results.parquet with 50000 records


## Cleanup and Resource Management

Proper cleanup is important in medical data processing environments. We'll close our Dask client and clean up temporary files while preserving our processed results.

In [16]:
# Clean up temporary cache files (optional)
import shutil

print("Cleaning up temporary files...")
if os.path.exists('cache'):
    cache_size = sum(os.path.getsize(os.path.join('cache', f)) 
                    for f in os.listdir('cache')) / 1024**2
    print(f"Cache directory size: {cache_size:.2f} MB")
    
    # Uncomment the next line to actually remove cache
    # shutil.rmtree('cache')
    print("Cache preserved for demonstration")

# Close Dask client
client.close()
print("Dask client closed successfully")

Cleaning up temporary files...
Cache directory size: 5.11 MB
Cache preserved for demonstration
Dask client closed successfully


## Summary

In this notebook, we demonstrated key concepts for robust medical data processing:

1. **Out-of-core Processing**: Using Dask to handle datasets larger than memory
2. **Parallel Computing**: Distributing work across multiple cores for faster processing
3. **Idempotency**: Ensuring operations can be safely repeated without side effects
4. **Retry Logic**: Handling failures gracefully with exponential backoff
5. **Progress Monitoring**: Tracking long-running operations in real-time
6. **Resource Management**: Monitoring and optimizing computational resources

These techniques are essential for building robust, scalable medical data integration pipelines that can handle the volume and complexity of real-world healthcare data.

## Exercise

Create a robust medical data processing pipeline that:

1. **Generate a larger dataset**: Create 100,000 patient records with additional fields (BMI, smoking status, family history)
2. **Implement a new processing function**: Create a diabetes risk assessment function that incorporates idempotency and retry logic
3. **Add progress tracking**: Implement a custom progress tracker that saves intermediate results every 25% completion
4. **Handle deliberate failures**: Modify one of your processing functions to randomly fail 10% of the time and demonstrate how your retry logic handles these failures
5. **Performance optimization**: Experiment with different numbers of Dask workers and partitions, and measure the impact on processing time
6. **Create a summary report**: Generate a final report that includes processing statistics, data quality metrics, and performance benchmarks

Your solution should demonstrate all the concepts covered in this notebook and be robust enough to handle real-world medical data processing challenges.