# Polars Tutorial - Part 5: Parallel Processing

In this notebook, we'll explore Polars' parallel processing capabilities:
- Understanding automatic parallelization
- Multi-threading in Polars
- Performance benchmarks
- Batch processing
- Best practices for parallel operations

In [None]:
import polars as pl
import time
import os
import multiprocessing as mp

DATA_DIR = '../data/'

# Check available CPU cores
n_cores = mp.cpu_count()
print(f"Available CPU cores: {n_cores}")
print(f"Polars version: {pl.__version__}")

## 1. Automatic Parallelization in Polars

Polars automatically parallelizes many operations without any configuration needed!

In [None]:
# Create a larger dataset for demonstration
large_df = pl.DataFrame({
    'id': range(1000000),
    'value': [i * 2.5 for i in range(1000000)],
    'category': ['A', 'B', 'C', 'D'] * 250000,
    'random': pl.Series([i % 100 for i in range(1000000)])
})

print(f"Dataset shape: {large_df.shape}")
print(f"Memory usage: {large_df.estimated_size() / 1024 / 1024:.2f} MB")

### 1.1 Parallel Filtering and Selection

In [None]:
# Polars automatically parallelizes filtering operations
start_time = time.time()

result = large_df.filter(
    (pl.col('value') > 50000) & (pl.col('random') > 50)
).select(['id', 'value', 'category'])

elapsed = time.time() - start_time

print(f"Filtered {large_df.height:,} rows to {result.height:,} rows")
print(f"Time taken: {elapsed:.4f} seconds")
print(f"\nFirst few results:")
print(result.head())

### 1.2 Parallel Aggregations

In [None]:
# Group by operations are automatically parallelized
start_time = time.time()

aggregated = large_df.group_by('category').agg([
    pl.count().alias('count'),
    pl.sum('value').alias('total_value'),
    pl.mean('value').alias('avg_value'),
    pl.std('value').alias('std_value'),
    pl.min('value').alias('min_value'),
    pl.max('value').alias('max_value')
])

elapsed = time.time() - start_time

print(f"Aggregation time: {elapsed:.4f} seconds")
print(f"\nAggregated results:")
print(aggregated)

## 2. Understanding Thread Pool Size

Polars uses a thread pool for parallel operations. You can control the number of threads:

In [None]:
# Check current thread pool size
print(f"Default thread pool size: {pl.thread_pool_size()}")

# Note: You can set it with pl.set_thread_pool_size(n)
# but it's usually best to let Polars manage this automatically

## 3. Parallel Reading of Multiple Files

In [None]:
# Create multiple CSV files for parallel reading demonstration
for i in range(4):
    df_chunk = pl.DataFrame({
        'id': range(i*10000, (i+1)*10000),
        'value': [x * 1.5 for x in range(i*10000, (i+1)*10000)],
        'chunk': [i] * 10000
    })
    df_chunk.write_csv(os.path.join(DATA_DIR, f'chunk_{i}.csv'))

print("Created 4 CSV files")

In [None]:
# Read and process files in parallel using scan_csv
import glob

start_time = time.time()

# Using lazy evaluation for parallel processing
csv_files = glob.glob(os.path.join(DATA_DIR, 'chunk_*.csv'))

# Scan all files and concatenate
lazy_frames = [pl.scan_csv(f) for f in csv_files]
combined = pl.concat(lazy_frames)

# Apply operations and collect
result = combined.filter(
    pl.col('value') > 5000
).select(['id', 'value']).collect()

elapsed = time.time() - start_time

print(f"Processed {len(csv_files)} files in {elapsed:.4f} seconds")
print(f"Result shape: {result.shape}")
print(result.head())

## 4. Parallel Column Operations

In [None]:
# Create a DataFrame with multiple numeric columns
df_multi = pl.DataFrame({
    'col1': range(100000),
    'col2': range(100000, 200000),
    'col3': range(200000, 300000),
    'col4': range(300000, 400000)
})

start_time = time.time()

# Apply transformations to multiple columns in parallel
result = df_multi.with_columns([
    (pl.col('col1') * 2).alias('col1_doubled'),
    (pl.col('col2') ** 2).alias('col2_squared'),
    (pl.col('col3') / 10).alias('col3_divided'),
    (pl.col('col4').log()).alias('col4_log')
])

elapsed = time.time() - start_time

print(f"Parallel column operations time: {elapsed:.4f} seconds")
print(result.head())

## 5. Batch Processing with Polars

### 5.1 Processing Data in Chunks

In [None]:
def process_batch(df_batch, batch_id):
    """
    Process a batch of data
    """
    result = df_batch.with_columns([
        (pl.col('value') * 2).alias('value_doubled'),
        pl.lit(batch_id).alias('batch_id')
    ])
    return result

# Create sample data
full_data = pl.DataFrame({
    'id': range(50000),
    'value': [i * 1.5 for i in range(50000)]
})

batch_size = 10000
batches = []

start_time = time.time()

# Process in batches
for i in range(0, full_data.height, batch_size):
    batch = full_data.slice(i, batch_size)
    processed_batch = process_batch(batch, i // batch_size)
    batches.append(processed_batch)

# Combine all batches
final_result = pl.concat(batches)

elapsed = time.time() - start_time

print(f"Processed {full_data.height:,} rows in {len(batches)} batches")
print(f"Total time: {elapsed:.4f} seconds")
print(f"\nSample output:")
print(final_result.head())

## 6. Performance Comparison: Operations

Let's compare the performance of different operations:

In [None]:
# Create test DataFrame
test_df = pl.DataFrame({
    'a': range(500000),
    'b': [i * 2 for i in range(500000)],
    'c': ['cat_' + str(i % 100) for i in range(500000)]
})

operations = {}

# Test 1: Simple filter
start = time.time()
_ = test_df.filter(pl.col('a') > 250000)
operations['Filter'] = time.time() - start

# Test 2: Group by + aggregation
start = time.time()
_ = test_df.group_by('c').agg([
    pl.sum('a').alias('sum_a'),
    pl.mean('b').alias('mean_b')
])
operations['GroupBy + Agg'] = time.time() - start

# Test 3: Multiple transformations
start = time.time()
_ = test_df.with_columns([
    (pl.col('a') * 2).alias('a_doubled'),
    (pl.col('b') / 10).alias('b_divided'),
    pl.col('c').str.to_uppercase().alias('c_upper')
])
operations['Transformations'] = time.time() - start

# Test 4: Sorting
start = time.time()
_ = test_df.sort(['c', 'a'])
operations['Sort'] = time.time() - start

# Display results
print("Performance Benchmark (500,000 rows):")
print("=" * 40)
for op, duration in operations.items():
    print(f"{op:20s}: {duration:.4f} seconds")

## 7. Lazy Evaluation for Parallel Optimization

Lazy evaluation allows Polars to optimize the execution plan:

In [None]:
# Create a lazy frame
df_sales = pl.read_csv(os.path.join(DATA_DIR, 'sales_data.csv'))

# Define lazy operations
lazy_query = (
    df_sales.lazy()
    .filter(pl.col('revenue') > 500)
    .group_by('category')
    .agg([
        pl.sum('revenue').alias('total_revenue'),
        pl.count().alias('count')
    ])
    .sort('total_revenue', descending=True)
)

# Show the optimized execution plan
print("Optimized Execution Plan:")
print(lazy_query.explain())

# Execute the query
result = lazy_query.collect()
print("\nResult:")
print(result)

## 8. Streaming Processing

For very large datasets that don't fit in memory:

In [None]:
# Create a large CSV for streaming demonstration
large_csv = os.path.join(DATA_DIR, 'large_data.csv')
pl.DataFrame({
    'id': range(200000),
    'value': [i * 1.5 for i in range(200000)],
    'category': ['A', 'B', 'C', 'D'] * 50000
}).write_csv(large_csv)

# Stream processing with lazy evaluation
start_time = time.time()

result = (
    pl.scan_csv(large_csv)
    .filter(pl.col('value') > 10000)
    .group_by('category')
    .agg([
        pl.count().alias('count'),
        pl.mean('value').alias('avg_value')
    ])
    .collect(streaming=True)  # Enable streaming
)

elapsed = time.time() - start_time

print(f"Streaming processing time: {elapsed:.4f} seconds")
print(result)

## 9. Best Practices for Parallel Processing

### Key Recommendations:

1. **Let Polars Handle Parallelization**: Don't manually create threads; Polars optimizes automatically
2. **Use Lazy Evaluation**: For complex queries, use `.lazy()` to allow query optimization
3. **Batch Processing**: For very large datasets, process in chunks
4. **Streaming**: Use `streaming=True` for datasets larger than memory
5. **Minimize Data Movement**: Keep data in Polars format as long as possible
6. **Use Parquet**: Most efficient format for parallel reading/writing
7. **Filter Early**: Apply filters before aggregations to reduce data size

In [None]:
# Example of optimal query structure
optimal_query = (
    pl.scan_csv(os.path.join(DATA_DIR, 'sales_data.csv'))
    .filter(pl.col('revenue') > 1000)  # Filter early
    .select(['product', 'category', 'revenue'])  # Select only needed columns
    .group_by(['category', 'product'])
    .agg([pl.sum('revenue').alias('total_revenue')])
    .sort('total_revenue', descending=True)
    .limit(10)  # Limit results if possible
    .collect()
)

print("Top 10 high-revenue products by category:")
print(optimal_query)

## 10. Monitoring Performance

In [None]:
import psutil

def monitor_operation(operation_func, *args, **kwargs):
    """
    Monitor memory and time for an operation
    """
    import gc
    gc.collect()
    
    process = psutil.Process()
    mem_before = process.memory_info().rss / 1024 / 1024  # MB
    
    start_time = time.time()
    result = operation_func(*args, **kwargs)
    elapsed = time.time() - start_time
    
    mem_after = process.memory_info().rss / 1024 / 1024  # MB
    mem_used = mem_after - mem_before
    
    print(f"Time: {elapsed:.4f} seconds")
    print(f"Memory used: {mem_used:.2f} MB")
    
    return result

# Test with a heavy operation
print("Monitoring heavy aggregation:")
result = monitor_operation(
    lambda: large_df.group_by('category').agg([
        pl.sum('value'),
        pl.mean('value'),
        pl.std('value')
    ])
)

## 11. Summary

In this notebook, we explored:
- ✅ Automatic parallelization in Polars
- ✅ Thread pool management
- ✅ Parallel file reading and processing
- ✅ Batch processing techniques
- ✅ Performance benchmarking
- ✅ Lazy evaluation and query optimization
- ✅ Streaming for large datasets
- ✅ Best practices for parallel operations

### Key Takeaways:
1. **Polars parallelizes automatically** - no manual threading needed
2. **Lazy evaluation enables optimization** - use `.lazy()` for complex queries
3. **Streaming handles large data** - use `streaming=True` when data exceeds memory
4. **Filter and select early** - reduce data size before heavy operations
5. **Polars is fast by default** - focus on query logic, not parallelization

### Performance Tips:
- Use Parquet for best I/O performance
- Enable streaming for datasets larger than RAM
- Let Polars manage thread pool automatically
- Chain operations for better optimization

**Next:** In the next notebook, we'll dive deeper into lazy evaluation and query optimization!