# Understanding Parellism in Python with Dask

## Part 1: DASK Fundamentals

### What is DASK?

DASK is a flexible parallel computing library that:
- Provides parallel DataFrame and Array abstractions
- Builds task graphs dynamically
- Executes these graphs in parallel
- Scales from laptops to clusters

### Installing Dask

You can install Dask using pip:

```bash
pip install dask
```

or conda:

```bash
conda install dask
```

In [None]:
import dask
import dask.dataframe as dd
import dask.array as da
import numpy as np
import pandas as pd
from dask.distributed import Client, LocalCluster

## Understanding DASK's Architecture

### 1. Task Graphs
DASK represents computations as task graphs:

In [None]:
# Simple example of task graph creation
@dask.delayed
def add(a, b):
    return a + b

@dask.delayed
def multiply(a, b):
    return a * b

# Create a simple computation
x = add(1, 2)
y = multiply(x, 3)

# Visualize the task graph
y.visualize()

### 2. Schedulers
DASK offers multiple schedulers:
- Single-thread
- Multi-thread
- Multi-process
- Distributed

In [None]:
# Creating a local cluster
cluster = LocalCluster(n_workers=10, threads_per_worker=2)
client = Client(cluster)
print("Cluster Dashboard:", client.dashboard_link)

## DASK Collections

### 1. DASK Arrays
Similar to NumPy arrays, but divided into chunks:

In [None]:
# Create a large array
x = da.random.random((10000, 10000), chunks=(1000, 1000))
print("Array shape:", x.shape)
print("Chunk shape:", x.chunks)

# Show how chunks are processed
result = x.mean(axis=0)
result.visualize()

### 2. DASK DataFrames
Parallel implementation of Pandas:

In [None]:
# Create a simple DataFrame with guaranteed equal lengths
n_rows = 1000000
n_repeats = (n_rows + 2) // 3  # Ceiling division to ensure we have enough elements
df = pd.DataFrame({
    'value': np.arange(n_rows),  # Using numpy array instead of range
    'category': ((['A'] * n_repeats + ['B'] * n_repeats + ['C'] * n_repeats)[:n_rows])
})

# Convert to DASK DataFrame
ddf = dd.from_pandas(df, npartitions=4)
print("Number of partitions:", ddf.npartitions)
print("Partition sizes:", ddf.map_partitions(len).compute())

# Understanding Lazy Evaluation

In [None]:
# Create a computation without executing it
x = da.random.random((1000, 1000), chunks=(100, 100))
y = x + x.T
z = y.mean(axis=0)

# Nothing computed yet
print("Computation defined but not executed")

# Execute computation
result = z.compute()
print("Computation completed")

# Memory Management and Optimization

## 1. Chunking Strategies

In [None]:
# Bad chunking (too many small chunks)
bad_chunks = da.random.random((10000, 10000), chunks=(100, 100))

# Better chunking
good_chunks = da.random.random((10000, 10000), chunks=(1000, 1000))

# Compare task graphs
print("Number of tasks (bad chunking):", len(bad_chunks.dask))
print("Number of tasks (good chunking):", len(good_chunks.dask))

## 2. Persist vs Compute

In [None]:
# Persist keeps data in memory
x = da.random.random((5000, 5000), chunks=(1000, 1000))
x_persisted = x.persist()

# Multiple computations are now faster
%time result0 = (x+1).mean().compute()
%time result1 = (x * 2).mean().compute()
%time result2 = (x_persisted + 1).mean().compute()
%time result3 = (x_persisted * 2).mean().compute()

## Part 2: Engineering Applications

### Example 1: Processing Time Series Data

Understanding how DASK handles large time series datasets:

In [None]:
def generate_sensor_data(n_points):
    """Generate sample sensor data"""
    time = np.linspace(0, 100, n_points)
    signal = np.sin(0.1 * time) + 0.1 * np.random.random(n_points)
    return pd.DataFrame({
        'time': time,
        'signal': signal
    })

# Create large dataset
df = generate_sensor_data(10000000)
ddf = dd.from_pandas(df, npartitions=4)

# Show how DASK partitions the data
print("Partition info:")
print(ddf.divisions)  # Boundary values between partitions

### Understanding Parallel Operations

In [None]:
# Define a complex operation
@dask.delayed
def process_chunk(data):
    """Process a chunk of data"""
    # Simulate complex calculation
    result = np.fft.fft(data['signal'])
    return np.abs(result).mean()

# Apply to partitions
results = []
for partition in range(ddf.npartitions):
    chunk = ddf.get_partition(partition)
    results.append(process_chunk(chunk))

# Compute all results
final_results = dask.compute(*results)

## Advanced DASK Features

### 1. Custom Partitioning

In [None]:
# Partition by time periods
timestamps = pd.date_range('2024-01-01', '2024-12-31', freq='H')
values = np.random.random(len(timestamps))

df = pd.DataFrame({
    'timestamp': timestamps,
    'value': values
})

# Partition by month
ddf = dd.from_pandas(df, npartitions=12)

### 2. Dashboard and Diagnostics

In [None]:
from dask.distributed import progress

def complex_calculation():
    # Create large arrays
    x = da.random.random((100000, 100000), chunks=(1000, 1000))
    y = da.random.random((100000, 100000), chunks=(1000, 1000))
    
    # Multiple operations
    z = (x + y).mean(axis=0)
    w = (x - y).std(axis=1)
    
    # Show progress
    progress(z, w)
    
    return dask.compute(z, w)

results = complex_calculation()

### Engineering Example: Vibration Analysis

Now let's combine our DASK knowledge with engineering:

In [None]:
# Generate vibration data
def generate_vibration_data(n_samples, n_sensors):
    time = np.linspace(0, 10, n_samples)
    data = {}
    
    for sensor in range(n_sensors):
        # Base frequency
        f1 = 50 + np.random.random() * 10
        # Harmonic
        f2 = f1 * 2
        
        signal = (np.sin(2 * np.pi * f1 * time) + 
                 0.5 * np.sin(2 * np.pi * f2 * time) +
                 0.1 * np.random.random(n_samples))
        data[f'sensor_{sensor}'] = signal
    
    return pd.DataFrame(data, index=time)

# Create large dataset
df = generate_vibration_data(1000000, 16)
ddf = dd.from_pandas(df, npartitions=8)

In [None]:
# Define analysis function
@dask.delayed
def analyze_vibration(data):
    """Analyze vibration data chunk"""
    # Calculate FFT
    fft_result = np.fft.fft(data)
    # Get magnitude
    magnitude = np.abs(fft_result)
    # Find peak frequencies
    peak_freq = np.argmax(magnitude)
    return peak_freq

# Apply analysis to each sensor
results = {}
for column in ddf.columns:
    results[column] = analyze_vibration(ddf[column])

# Compute results
peak_frequencies = dask.compute(results)

In [None]:
peak_frequencies

## Best Practices

1. **Data Organization**
   - Choose appropriate chunk sizes
   - Consider data access patterns
   - Use persist() for frequently accessed data

2. **Performance Optimization**
   - Monitor memory usage
   - Use appropriate number of workers
   - Balance parallelism and overhead

3. **Debugging and Monitoring**
   - Use the dashboard
   - Monitor task progress
   - Check worker logs

## Real-world Considerations

1. **When to Use DASK**
   - Data doesn't fit in memory
   - Computation is CPU-bound
   - Parallel processing needed

2. **When Not to Use DASK**
   - Small datasets
   - Simple computations
   - I/O bound operations

In [None]:
# Cleanup
client.close()
cluster.close()