# Ray Distributed Computing Demo

This notebook demonstrates how to run distributed computing tasks on Ray clusters through Jupyter notebooks.

## Features:
- **Magic commands** for seamless Ray integration
- **Distributed execution** on remote Ray clusters  
- **Automatic output capture** including plots and results
- **Scalable computing** for large datasets and complex algorithms

## Available Commands:
- `%ray_status` - Check Ray cluster status
- `%ray_jobs` - List Ray jobs
- `%%ray_exec` - Execute cell code on Ray cluster
- `%ray_result <job_id>` - Get Ray job result

In [2]:
# Load Ray magic commands
from utils.notebook_ray_magic import load_ray_magic
load_ray_magic()

print("Ray magic commands loaded successfully!")
print("You can now use:")
print("  %ray_status     - Check Ray cluster status")
print("  %ray_jobs       - List Ray jobs")
print("  %%ray_exec      - Execute cell on Ray cluster")
print("  %ray_result     - Get Ray job result")

ImportError: cannot import name 'parse_argline' from 'IPython.core.magic_arguments' (/opt/conda/lib/python3.11/site-packages/IPython/core/magic_arguments.py)

## 1. Check Ray Cluster Status

In [2]:
%ray_status

UsageError: Line magic function `%ray_status` not found.


## 2. List Current Ray Jobs

In [None]:
%ray_jobs

## 3. Execute Code on Ray Cluster

### Simple Computation Example

In [None]:
%%ray_exec --name "simple_computation" --timeout 60
import numpy as np
import matplotlib.pyplot as plt

# Generate sample data
x = np.linspace(0, 2*np.pi, 100)
y = np.sin(x)

# Create plot
plt.figure(figsize=(10, 6))
plt.plot(x, y, 'b-', linewidth=2)
plt.title('Sine Wave Computed on Ray Cluster')
plt.xlabel('X')
plt.ylabel('sin(X)')
plt.grid(True)
plt.show()

print(f"Computed {len(x)} points on Ray cluster!")
print(f"Max value: {np.max(y):.3f}")
print(f"Min value: {np.min(y):.3f}")

### Parallel Monte Carlo Pi Estimation

In [None]:
%%ray_exec --name "monte_carlo_pi" --timeout 120
import ray
import numpy as np
import matplotlib.pyplot as plt
import time

# Initialize Ray
ray.init(ignore_reinit_error=True)

@ray.remote
def monte_carlo_pi_sample(n_samples):
    """Estimate π using Monte Carlo method"""
    import random
    inside_circle = 0
    for _ in range(n_samples):
        x, y = random.random(), random.random()
        if x*x + y*y <= 1:
            inside_circle += 1
    return inside_circle

# Parallel computation settings
total_samples = 1000000
num_workers = 4
samples_per_worker = total_samples // num_workers

print(f"Starting Monte Carlo π estimation...")
print(f"Total samples: {total_samples}")
print(f"Workers: {num_workers}")
print(f"Samples per worker: {samples_per_worker}")

# Submit parallel tasks
start_time = time.time()
futures = [monte_carlo_pi_sample.remote(samples_per_worker) for _ in range(num_workers)]

# Get results
results = ray.get(futures)
end_time = time.time()

# Calculate π estimate
total_inside = sum(results)
pi_estimate = 4 * total_inside / total_samples
error = abs(pi_estimate - np.pi)
execution_time = end_time - start_time

print(f"\nResults:")
print(f"π estimate: {pi_estimate:.6f}")
print(f"True π: {np.pi:.6f}")
print(f"Error: {error:.6f}")
print(f"Execution time: {execution_time:.3f} seconds")

# Visualize results
plt.figure(figsize=(12, 5))

# Worker results
plt.subplot(1, 2, 1)
plt.bar(range(num_workers), results, color='skyblue', alpha=0.7)
plt.xlabel('Worker ID')
plt.ylabel('Samples Inside Circle')
plt.title('Samples Inside Circle per Worker')
plt.grid(True, alpha=0.3)

# Comparison with true π
plt.subplot(1, 2, 2)
values = [pi_estimate, np.pi]
labels = ['Estimated π', 'True π']
colors = ['orange', 'green']
plt.bar(labels, values, color=colors, alpha=0.7)
plt.ylabel('Value')
plt.title('π Estimation Comparison')
plt.ylim([3.0, 3.2])
for i, v in enumerate(values):
    plt.text(i, v + 0.01, f'{v:.4f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

ray.shutdown()
print("\nRay shutdown completed.")

### Matrix Operations

In [None]:
%%ray_exec --name "matrix_operations" --timeout 90
import ray
import numpy as np
import matplotlib.pyplot as plt
import time

# Initialize Ray
ray.init(ignore_reinit_error=True)

@ray.remote
def matrix_multiply(A, B):
    """Parallel matrix multiplication"""
    return np.dot(A, B)

@ray.remote
def matrix_eigenvals(matrix):
    """Compute eigenvalues of a matrix"""
    return np.linalg.eigvals(matrix)

# Create test matrices
print("Creating test matrices...")
matrix_size = 300
num_matrices = 6

matrices_A = [np.random.rand(matrix_size, matrix_size) for _ in range(num_matrices)]
matrices_B = [np.random.rand(matrix_size, matrix_size) for _ in range(num_matrices)]

print(f"Created {num_matrices} matrices of size {matrix_size}x{matrix_size}")

# Parallel matrix multiplication
print("\nPerforming parallel matrix multiplication...")
start_time = time.time()
mult_futures = [matrix_multiply.remote(A, B) for A, B in zip(matrices_A, matrices_B)]
mult_results = ray.get(mult_futures)
mult_time = time.time() - start_time

print(f"Completed {len(mult_results)} matrix multiplications in {mult_time:.3f}s")

# Parallel eigenvalue computation
print("\nComputing eigenvalues...")
start_time = time.time()
eigen_futures = [matrix_eigenvals.remote(result) for result in mult_results]
eigenvals_results = ray.get(eigen_futures)
eigen_time = time.time() - start_time

print(f"Computed eigenvalues for {len(eigenvals_results)} matrices in {eigen_time:.3f}s")

# Visualize eigenvalues
plt.figure(figsize=(15, 10))

for i, eigenvals in enumerate(eigenvals_results):
    plt.subplot(2, 3, i+1)
    
    # Plot eigenvalues in complex plane
    real_parts = np.real(eigenvals)
    imag_parts = np.imag(eigenvals)
    
    plt.scatter(real_parts, imag_parts, alpha=0.6, s=10)
    plt.xlabel('Real Part')
    plt.ylabel('Imaginary Part')
    plt.title(f'Eigenvalues Matrix {i+1}')
    plt.grid(True, alpha=0.3)
    
    # Add unit circle for reference
    theta = np.linspace(0, 2*np.pi, 100)
    plt.plot(np.cos(theta), np.sin(theta), 'r--', alpha=0.5, linewidth=1)

plt.tight_layout()
plt.show()

# Performance summary
total_time = mult_time + eigen_time
print(f"\nPerformance Summary:")
print(f"Matrix multiplication time: {mult_time:.3f}s")
print(f"Eigenvalue computation time: {eigen_time:.3f}s")
print(f"Total computation time: {total_time:.3f}s")
print(f"Average time per matrix: {total_time/num_matrices:.3f}s")

ray.shutdown()
print("\nRay shutdown completed.")

## 4. Async Execution Example

In [None]:
# Submit a job asynchronously
job_id = "async_example"

print(f"Submitting async job: {job_id}")
print("You can continue working while the job runs...")

# Note: This would work with the actual magic command:
# %%ray_exec --async --name "async_computation"
# # Your long-running code here

print("Job submitted! Use %ray_result <job_id> to check results.")

In [None]:
# Check job result (example - replace with actual job ID)
# %ray_result <your_job_id_here>

print("Replace <your_job_id_here> with the actual job ID from async execution")

## Summary

This notebook demonstrated the Ray magic commands:

1. **Loading magic commands** with `load_ray_magic()`
2. **Checking cluster status** with `%ray_status`
3. **Listing jobs** with `%ray_jobs`
4. **Executing distributed code** with `%%ray_exec`
5. **Getting job results** with `%ray_result`

### Key Features:
- **Seamless integration** with Jupyter notebooks
- **Automatic output capture** including plots and print statements
- **Synchronous and asynchronous** execution modes
- **Built-in timeout handling**
- **Ray cluster management** integration

### Use Cases:
- **Large-scale data processing**
- **Machine learning training**
- **Scientific simulations**
- **Parallel algorithm development**
- **Performance benchmarking**