# 12_parallel_computing
Multicore tasks, parfor-equivalents

In [None]:
% Content to be added

# File: notebooks/12_parallel_computing.ipynb

# OctaveMasterPro: Parallel Computing

Master parallel and distributed computing! This final notebook covers multicore processing, parallel algorithms, distributed computing concepts, and performance optimization techniques for large-scale computational problems.

**Learning Objectives:**
- Understand parallel computing concepts and paradigms
- Implement multicore algorithms and parallel processing
- Apply distributed computing principles
- Optimize parallel performance and scalability
- Master advanced parallel programming patterns

---

## 1. Parallel Computing Fundamentals

```octave
% Parallel computing concepts and basics
fprintf('=== Parallel Computing Fundamentals ===\n');

fprintf('Note: Octave has limited built-in parallel processing.\n');
fprintf('This notebook demonstrates concepts and simulates parallel execution.\n\n');

% Parallel algorithm simulation framework
fprintf('1. Parallel Execution Simulation:\n');

function obj = ParallelExecutor(num_workers)
    % Simulate parallel execution environment
    if nargin < 1, num_workers = 4; end
    
    obj.num_workers = num_workers;
    obj.task_queue = {};
    obj.results = {};
    obj.execution_times = [];
    obj.class_name = 'ParallelExecutor';
    
    obj.add_task = @(task_func, task_data) add_parallel_task(obj, task_func, task_data);
    obj.execute_all = @() execute_parallel_tasks(obj);
    obj.get_results = @() obj.results;
    obj.get_performance = @() get_parallel_performance(obj);
end

function add_parallel_task(obj, task_func, task_data)
    task = struct('func', task_func, 'data', task_data, 'id', length(obj.task_queue) + 1);
    obj.task_queue{end+1} = task;
    fprintf('   Added task %d to queue\n', task.id);
end

function execute_parallel_tasks(obj)
    fprintf('   Executing %d tasks on %d workers\n', length(obj.task_queue), obj.num_workers);
    
    obj.results = cell(length(obj.task_queue), 1);
    obj.execution_times = zeros(length(obj.task_queue), 1);
    
    % Simulate parallel execution by batching tasks
    batch_size = ceil(length(obj.task_queue) / obj.num_workers);
    
    total_start = tic;
    
    for batch = 1:obj.num_workers
        batch_start = (batch - 1) * batch_size + 1;
        batch_end = min(batch * batch_size, length(obj.task_queue));
        
        if batch_start > length(obj.task_queue)
            break;
        end
        
        fprintf('   Worker %d processing tasks %d-%d\n', batch, batch_start, batch_end);
        
        # Simulate parallel execution (in reality, tasks would run simultaneously)
        for task_idx = batch_start:batch_end
            task = obj.task_queue{task_idx};
            
            task_start = tic;
            obj.results{task_idx} = task.func(task.data);
            obj.execution_times(task_idx) = toc(task_start);
        end
    end
    
    total_time = toc(total_start);
    fprintf('   Total execution time: %.4f seconds\n', total_time);
end

function perf = get_parallel_performance(obj)
    perf = struct();
    perf.total_tasks = length(obj.task_queue);
    perf.workers = obj.num_workers;
    perf.individual_times = obj.execution_times;
    perf.total_time = sum(obj.execution_times);
    perf.average_time = mean(obj.execution_times);
    perf.max_time = max(obj.execution_times);
    perf.min_time = min(obj.execution_times);
    
    % Simulate parallel speedup (actual time would be max of concurrent batches)
    sequential_time = sum(obj.execution_times);
    simulated_parallel_time = max(obj.execution_times) * ceil(perf.total_tasks / perf.workers);
    perf.speedup = sequential_time / simulated_parallel_time;
    perf.efficiency = perf.speedup / perf.workers;
    
    fprintf('   Performance metrics:\n');
    fprintf('     Sequential time: %.4f seconds\n', sequential_time);
    fprintf('     Parallel time (simulated): %.4f seconds\n', simulated_parallel_time);
    fprintf('     Speedup: %.2fx\n', perf.speedup);
    fprintf('     Efficiency: %.1f%%\n', perf.efficiency * 100);
end

% Test parallel execution simulation
parallel_exec = ParallelExecutor(4);

# Define sample computational tasks
compute_pi = @(n) estimate_pi_monte_carlo(n);
matrix_multiply = @(data) data.A * data.B;
numerical_integration = @(data) simpson_integrate(data.func, data.a, data.b, data.n);

function pi_est = estimate_pi_monte_carlo(n)
    % Monte Carlo estimation of π
    x = rand(n, 1);
    y = rand(n, 1);
    inside_circle = sum(x.^2 + y.^2 <= 1);
    pi_est = 4 * inside_circle / n;
end

function integral = simpson_integrate(func, a, b, n)
    % Simpson's rule integration
    if mod(n, 2) == 1, n = n + 1; end  % Ensure even number of intervals
    
    h = (b - a) / n;
    x = a:h:b;
    y = func(x);
    
    integral = h/3 * (y(1) + 4*sum(y(2:2:end-1)) + 2*sum(y(3:2:end-2)) + y(end));
end

% Add tasks to parallel executor
parallel_exec.add_task(compute_pi, 100000);
parallel_exec.add_task(compute_pi, 200000);
parallel_exec.add_task(compute_pi, 150000);

matrix_data1 = struct('A', rand(100, 80), 'B', rand(80, 60));
matrix_data2 = struct('A', rand(120, 90), 'B', rand(90, 70));
parallel_exec.add_task(matrix_multiply, matrix_data1);
parallel_exec.add_task(matrix_multiply, matrix_data2);

integ_data1 = struct('func', @sin, 'a', 0, 'b', pi, 'n', 1000);
integ_data2 = struct('func', @(x) exp(-x.^2), 'a', -2, 'b', 2, 'n', 2000);
parallel_exec.add_task(numerical_integration, integ_data1);
parallel_exec.add_task(numerical_integration, integ_data2);

% Execute all tasks
parallel_exec.execute_all();

results = parallel_exec.get_results();
fprintf('   Sample results:\n');
fprintf('     π estimates: %.6f, %.6f, %.6f\n', results{1}, results{2}, results{3});
fprintf('     Integration results: %.6f, %.6f\n', results{6}, results{7});

parallel_exec.get_performance();
```

## 2. Parallel Algorithms and Patterns

```octave
% Parallel algorithm implementations
fprintf('\n=== Parallel Algorithms and Patterns ===\n');

% Parallel reduction
fprintf('1. Parallel Reduction:\n');

function result = parallel_reduction(data, operation, num_workers)
    % Parallel reduction using divide-and-conquer
    % Input: data - input array, operation - reduction operation, num_workers
    % Output: result - reduced result
    
    if nargin < 3, num_workers = 4; end
    
    n = length(data);
    chunk_size = ceil(n / num_workers);
    
    fprintf('   Parallel reduction: %d elements, %d workers\n', n, num_workers);
    
    % Phase 1: Local reductions
    local_results = zeros(num_workers, 1);
    
    for worker = 1:num_workers
        start_idx = (worker - 1) * chunk_size + 1;
        end_idx = min(worker * chunk_size, n);
        
        if start_idx <= n
            chunk = data(start_idx:end_idx);
            local_results(worker) = apply_reduction(chunk, operation);
            fprintf('     Worker %d: indices %d-%d, result = %.6f\n', ...
                    worker, start_idx, end_idx, local_results(worker));
        end
    end
    
    # Phase 2: Global reduction
    result = apply_reduction(local_results(local_results ~= 0), operation);
    fprintf('   Final result: %.6f\n', result);
end

function result = apply_reduction(data, operation)
    switch lower(operation)
        case 'sum'
            result = sum(data);
        case 'max'
            result = max(data);
        case 'min'
            result = min(data);
        case 'product'
            result = prod(data);
        otherwise
            error('Unknown reduction operation: %s', operation);
    end
end

% Test parallel reduction
test_data = rand(10000, 1) * 100;

serial_sum = sum(test_data);
parallel_sum = parallel_reduction(test_data, 'sum', 4);
fprintf('   Reduction verification: serial=%.6f, parallel=%.6f, diff=%.2e\n', ...
        serial_sum, parallel_sum, abs(serial_sum - parallel_sum));

% Parallel matrix operations
fprintf('\n2. Parallel Matrix Operations:\n');

function C = parallel_matrix_multiply(A, B, block_size)
    % Block-parallel matrix multiplication
    % Input: A, B - matrices to multiply, block_size - block dimensions
    % Output: C - result matrix A*B
    
    [m, k1] = size(A);
    [k2, n] = size(B);
    
    if k1 ~= k2
        error('Matrix dimensions incompatible');
    end
    k = k1;
    
    if nargin < 3
        block_size = min(64, min([m, n, k]));
    end
    
    fprintf('   Parallel matrix multiply: (%dx%d) * (%dx%d), block_size=%d\n', ...
            m, k, k, n, block_size);
    
    C = zeros(m, n);
    
    % Number of block operations
    blocks_i = ceil(m / block_size);
    blocks_j = ceil(n / block_size);
    blocks_k = ceil(k / block_size);
    total_blocks = blocks_i * blocks_j * blocks_k;
    
    fprintf('   Processing %d blocks (%dx%dx%d)\n', total_blocks, blocks_i, blocks_j, blocks_k);
    
    block_count = 0;
    
    for i = 1:block_size:m
        i_end = min(i + block_size - 1, m);
        for j = 1:block_size:n
            j_end = min(j + block_size - 1, n);
            for kk = 1:block_size:k
                kk_end = min(kk + block_size - 1, k);
                
                % This block operation could be executed in parallel
                C(i:i_end, j:j_end) = C(i:i_end, j:j_end) + ...
                    A(i:i_end, kk:kk_end) * B(kk:kk_end, j:j_end);
                
                block_count = block_count + 1;
            end
        end
    end
    
    fprintf('   Completed %d block operations\n', block_count);
end

% Test parallel matrix multiplication
A_par = rand(200, 150);
B_par = rand(150, 180);

tic;
C_parallel = parallel_matrix_multiply(A_par, B_par, 50);
parallel_mm_time = toc;

tic;
C_builtin = A_par * B_par;
builtin_mm_time = toc;

mm_error = max(abs(C_parallel(:) - C_builtin(:)));
fprintf('   Matrix multiply verification: error=%.2e\n', mm_error);
fprintf('   Timing: parallel=%.4fs, builtin=%.4fs\n', parallel_mm_time, builtin_mm_time);

% Parallel sorting algorithms
fprintf('\n3. Parallel Sorting:\n');

function sorted_data = parallel_merge_sort(data, num_workers)
    % Parallel merge sort implementation
    % Input: data - array to sort, num_workers - number of parallel workers
    % Output: sorted_data - sorted array
    
    if nargin < 2, num_workers = 4; end
    
    n = length(data);
    chunk_size = ceil(n / num_workers);
    
    fprintf('   Parallel merge sort: %d elements, %d workers\n', n, num_workers);
    
    % Phase 1: Sort chunks in parallel
    sorted_chunks = cell(num_workers, 1);
    
    for worker = 1:num_workers
        start_idx = (worker - 1) * chunk_size + 1;
        end_idx = min(worker * chunk_size, n);
        
        if start_idx <= n
            chunk = data(start_idx:end_idx);
            sorted_chunks{worker} = sort(chunk);  % Local sort
            fprintf('     Worker %d sorted chunk of size %d\n', worker, length(chunk));
        end
    end
    
    % Phase 2: Merge sorted chunks
    sorted_data = [];
    for worker = 1:num_workers
        if ~isempty(sorted_chunks{worker})
            sorted_data = merge_sorted_arrays(sorted_data, sorted_chunks{worker});
        end
    end
    
    fprintf('   Parallel merge sort completed\n');
end

function merged = merge_sorted_arrays(arr1, arr2)
    % Merge two sorted arrays
    if isempty(arr1), merged = arr2; return; end
    if isempty(arr2), merged = arr1; return; end
    
    merged = zeros(1, length(arr1) + length(arr2));
    i = 1; j = 1; k = 1;
    
    while i <= length(arr1) && j <= length(arr2)
        if arr1(i) <= arr2(j)
            merged(k) = arr1(i);
            i = i + 1;
        else
            merged(k) = arr2(j);
            j = j + 1;
        end
        k = k + 1;
    end
    
    % Copy remaining elements
    while i <= length(arr1)
        merged(k) = arr1(i);
        i = i + 1;
        k = k + 1;
    end
    
    while j <= length(arr2)
        merged(k) = arr2(j);
        j = j + 1;
        k = k + 1;
    end
end

% Test parallel sorting
sort_data = rand(5000, 1) * 1000;

tic;
parallel_sorted = parallel_merge_sort(sort_data, 6);
parallel_sort_time = toc;

tic;
builtin_sorted = sort(sort_data);
builtin_sort_time = toc;

sort_error = max(abs(parallel_sorted' - builtin_sorted));
fprintf('   Sort verification: error=%.2e\n', sort_error);
fprintf('   Timing: parallel=%.4fs, builtin=%.4fs\n', parallel_sort_time, builtin_sort_time);
```

## 3. Load Balancing and Scheduling

```octave
% Load balancing and task scheduling
fprintf('\n=== Load Balancing and Scheduling ===\n');

% Dynamic load balancing
fprintf('1. Dynamic Load Balancing:\n');

function obj = LoadBalancer(num_workers)
    % Dynamic load balancer for parallel tasks
    obj.num_workers = num_workers;
    obj.worker_loads = zeros(num_workers, 1);
    obj.task_assignments = {};
    obj.completion_times = [];
    obj.class_name = 'LoadBalancer';
    
    obj.assign_task = @(task_cost) assign_task_dynamic(obj, task_cost);
    obj.complete_task = @(worker_id, completion_time) complete_worker_task(obj, worker_id, completion_time);
    obj.get_balance_stats = @() get_load_balance_stats(obj);
end

function worker_id = assign_task_dynamic(obj, task_cost)
    % Assign task to worker with minimum current load
    [~, worker_id] = min(obj.worker_loads);
    
    obj.worker_loads(worker_id) = obj.worker_loads(worker_id) + task_cost;
    
    if length(obj.task_assignments) < worker_id
        obj.task_assignments{worker_id} = [];
    end
    obj.task_assignments{worker_id}(end+1) = task_cost;
    
    fprintf('   Assigned task (cost=%.2f) to worker %d (new load: %.2f)\n', ...
            task_cost, worker_id, obj.worker_loads(worker_id));
end

function complete_worker_task(obj, worker_id, completion_time)
    % Mark task completion and update worker load
    if ~isempty(obj.task_assignments{worker_id})
        completed_cost = obj.task_assignments{worker_id}(1);
        obj.task_assignments{worker_id}(1) = [];
        obj.worker_loads(worker_id) = obj.worker_loads(worker_id) - completed_cost;
        obj.completion_times(end+1) = completion_time;
        
        fprintf('   Worker %d completed task (remaining load: %.2f)\n', ...
                worker_id, obj.worker_loads(worker_id));
    end
end

function stats = get_load_balance_stats(obj)
    stats = struct();
    stats.worker_loads = obj.worker_loads;
    stats.total_load = sum(obj.worker_loads);
    stats.average_load = stats.total_load / obj.num_workers;
    stats.load_imbalance = max(obj.worker_loads) - min(obj.worker_loads);
    stats.load_variance = var(obj.worker_loads);
    
    fprintf('   Load balance statistics:\n');
    fprintf('     Total load: %.2f\n', stats.total_load);
    fprintf('     Average load per worker: %.2f\n', stats.average_load);
    fprintf('     Load imbalance: %.2f\n', stats.load_imbalance);
    fprintf('     Load variance: %.4f\n', stats.load_variance);
    
    for i = 1:obj.num_workers
        fprintf('     Worker %d load: %.2f\n', i, obj.worker_loads(i));
    end
end

% Test dynamic load balancing
lb = LoadBalancer(4);

# Simulate tasks with varying computational costs
task_costs = [1.5, 3.2, 0.8, 2.1, 4.0, 1.2, 2.8, 0.5, 3.5, 1.8];

for i = 1:length(task_costs)
    lb.assign_task(task_costs(i));
end

lb.get_balance_stats();

% Work-stealing algorithm simulation
fprintf('\n2. Work-Stealing Algorithm:\n');

function obj = WorkStealingScheduler(num_workers)
    % Work-stealing scheduler simulation
    obj.num_workers = num_workers;
    obj.worker_queues = cell(num_workers, 1);
    obj.worker_busy = false(num_workers, 1);
    obj.steal_attempts = 0;
    obj.successful_steals = 0;
    obj.class_name = 'WorkStealingScheduler';
    
    for i = 1:num_workers
        obj.worker_queues{i} = [];
    end
    
    obj.add_task = @(worker_id, task) add_task_to_worker(obj, worker_id, task);
    obj.execute_step = @() execute_scheduling_step(obj);
    obj.get_queue_sizes = @() get_worker_queue_sizes(obj);
    obj.get_steal_stats = @() get_stealing_statistics(obj);
end

function add_task_to_worker(obj, worker_id, task)
    obj.worker_queues{worker_id}(end+1) = task;
    fprintf('   Added task %.2f to worker %d queue\n', task, worker_id);
end

function execute_scheduling_step(obj)
    % Execute one step of work-stealing scheduling
    
    for worker = 1:obj.num_workers
        if ~obj.worker_busy(worker)
            if ~isempty(obj.worker_queues{worker})
                # Worker has local work
                task = obj.worker_queues{worker}(1);
                obj.worker_queues{worker}(1) = [];
                obj.worker_busy(worker) = true;
                
                fprintf('   Worker %d executing local task %.2f\n', worker, task);
            else
                % Worker is idle - attempt work stealing
                victim = attempt_work_stealing(obj, worker);
                if victim > 0
                    obj.successful_steals = obj.successful_steals + 1;
                    fprintf('   Worker %d stole work from worker %d\n', worker, victim);
                end
                obj.steal_attempts = obj.steal_attempts + 1;
            end
        end
    end
    
    % Simulate task completion (workers become available)
    completion_probability = 0.3;  % 30% chance a busy worker completes
    for worker = 1:obj.num_workers
        if obj.worker_busy(worker) && rand() < completion_probability
            obj.worker_busy(worker) = false;
            fprintf('   Worker %d completed task\n', worker);
        end
    end
end

function victim_id = attempt_work_stealing(obj, thief_id)
    % Attempt to steal work from another worker
    victim_id = 0;
    
    % Randomly select potential victims
    potential_victims = setdiff(1:obj.num_workers, thief_id);
    potential_victims = potential_victims(randperm(length(potential_victims)));
    
    for victim = potential_victims
        if length(obj.worker_queues{victim}) > 1  % Victim has work to steal
            % Steal from the end of victim's queue (work-stealing from tail)
            stolen_task = obj.worker_queues{victim}(end);
            obj.worker_queues{victim}(end) = [];
            obj.worker_queues{thief_id}(end+1) = stolen_task;
            victim_id = victim;
            break;
        end
    end
end

function sizes = get_worker_queue_sizes(obj)
    sizes = zeros(obj.num_workers, 1);
    for i = 1:obj.num_workers
        sizes(i) = length(obj.worker_queues{i});
    end
    
    fprintf('   Queue sizes: ['); fprintf('%d ', sizes'); fprintf(']\n');
end

function stats = get_stealing_statistics(obj)
    stats = struct();
    stats.steal_attempts = obj.steal_attempts;
    stats.successful_steals = obj.successful_steals;
    stats.steal_success_rate = obj.successful_steals / max(1, obj.steal_attempts);
    
    fprintf('   Work-stealing statistics:\n');
    fprintf('     Steal attempts: %d\n', stats.steal_attempts);
    fprintf('     Successful steals: %d\n', stats.successful_steals);
    fprintf('     Success rate: %.1f%%\n', stats.steal_success_rate * 100);
end

% Test work-stealing scheduler
ws = WorkStealingScheduler(3);

% Initially distribute tasks unevenly
ws.add_task(1, 2.5);
ws.add_task(1, 1.8);
ws.add_task(1, 3.2);
ws.add_task(1, 1.5);

ws.add_task(2, 0.8);

% Worker 3 starts with no tasks

fprintf('   Initial distribution:\n');
ws.get_queue_sizes();

% Simulate several scheduling steps
fprintf('   Executing work-stealing steps:\n');
for step = 1:8
    fprintf('   Step %d:\n', step);
    ws.execute_step();
    ws.get_queue_sizes();
end

ws.get_steal_stats();
```

## 4. Distributed Computing Concepts

```octave
% Distributed computing principles
fprintf('\n=== Distributed Computing Concepts ===\n');

% Message passing simulation
fprintf('1. Message Passing Interface Simulation:\n');

function obj = MPISimulator(num_processes)
    % Simulate MPI-style message passing
    obj.num_processes = num_processes;
    obj.message_queues = cell(num_processes, 1);
    obj.process_states = cell(num_processes, 1);
    obj.message_count = 0;
    obj.class_name = 'MPISimulator';
    
    for i = 1:num_processes
        obj.message_queues{i} = {};
        obj.process_states{i} = struct('id', i, 'data', [], 'status', 'ready');
    end
    
    obj.send = @(from, to, data) send_message(obj, from, to, data);
    obj.receive = @(process_id) receive_message(obj, process_id);
    obj.broadcast = @(root, data) broadcast_message(obj, root, data);
    obj.gather = @(root) gather_data(obj, root);
    obj.barrier = @() synchronization_barrier(obj);
    obj.get_status = @() get_mpi_status(obj);
end

function send_message(obj, from, to, data)
    if from < 1 || from > obj.num_processes || to < 1 || to > obj.num_processes
        error('Invalid process ID');
    end
    
    message = struct('from', from, 'data', data, 'timestamp', now());
    obj.message_queues{to}{end+1} = message;
    obj.message_count = obj.message_count + 1;
    
    fprintf('   Process %d -> Process %d: %s\n', from, to, mat2str(data));
end

function [data, from] = receive_message(obj, process_id)
    if isempty(obj.message_queues{process_id})
        data = [];
        from = -1;
        fprintf('   Process %d: No messages in queue\n', process_id);
    else
        message = obj.message_queues{process_id}{1};
        obj.message_queues{process_id}(1) = [];
        data = message.data;
        from = message.from;
        fprintf('   Process %d received from Process %d: %s\n', ...
                process_id, from, mat2str(data));
    end
end

function broadcast_message(obj, root, data)
    fprintf('   Broadcasting from Process %d: %s\n', root, mat2str(data));
    for i = 1:obj.num_processes
        if i ~= root
            obj.send(root, i, data);
        end
    end
end

function gathered_data = gather_data(obj, root)
    fprintf('   Gathering data to Process %d\n', root);
    gathered_data = cell(obj.num_processes, 1);
    
    % Root collects its own data
    gathered_data{root} = obj.process_states{root}.data;
    
    # Other processes send data to root
    for i = 1:obj.num_processes
        if i ~= root
            obj.send(i, root, obj.process_states{i}.data);
        end
    end
    
    # Root receives all data
    for i = 1:obj.num_processes
        if i ~= root
            [data, ~] = obj.receive(root);
            gathered_data{i} = data;
        end
    end
end

function synchronization_barrier(obj)
    fprintf('   Synchronization barrier - all processes wait\n');
    % In real MPI, all processes would wait here until all reach the barrier
    for i = 1:obj.num_processes
        obj.process_states{i}.status = 'synchronized';
    end
end

function status = get_mpi_status(obj)
    status = struct();
    status.num_processes = obj.num_processes;
    status.total_messages = obj.message_count;
    
    queue_lengths = zeros(obj.num_processes, 1);
    for i = 1:obj.num_processes
        queue_lengths(i) = length(obj.message_queues{i});
    end
    status.queue_lengths = queue_lengths;
    
    fprintf('   MPI Status: %d processes, %d messages sent\n', ...
            status.num_processes, status.total_messages);
    fprintf('   Queue lengths: ['); fprintf('%d ', queue_lengths'); fprintf(']\n');
end

% Test MPI simulation
mpi = MPISimulator(4);

% Set some data in processes
for i = 1:4
    mpi.process_states{i}.data = i * 10 + rand();
end

% Test point-to-point communication
mpi.send(1, 2, [1, 2, 3]);
mpi.send(3, 1, 'hello');
mpi.receive(2);
mpi.receive(1);

% Test broadcast
mpi.broadcast(1, [100, 200, 300]);

# Test gather operation
gathered = mpi.gather(1);
fprintf('   Gathered data: ');
for i = 1:length(gathered)
    fprintf('P%d=%.2f ', i, gathered{i});
end
fprintf('\n');

mpi.barrier();
mpi.get_status();

% MapReduce paradigm simulation
fprintf('\n2. MapReduce Paradigm:\n');

function result = mapreduce_simulation(data, map_func, reduce_func, num_mappers, num_reducers)
    % Simulate MapReduce computation
    % Input: data - input data, map_func - mapper function, reduce_func - reducer function
    % Output: result - final result
    
    fprintf('   MapReduce: %d elements, %d mappers, %d reducers\n', ...
            length(data), num_mappers, num_reducers);
    
    % Phase 1: Map
    chunk_size = ceil(length(data) / num_mappers);
    map_results = cell(num_mappers, 1);
    
    fprintf('   Map phase:\n');
    for mapper = 1:num_mappers
        start_idx = (mapper - 1) * chunk_size + 1;
        end_idx = min(mapper * chunk_size, length(data));
        
        if start_idx <= length(data)
            chunk = data(start_idx:end_idx);
            map_results{mapper} = map_func(chunk);
            fprintf('     Mapper %d: processed %d elements\n', mapper, length(chunk));
        else
            map_results{mapper} = [];
        end
    end
    
    # Phase 2: Shuffle and Sort (simplified)
    all_intermediate = [];
    for i = 1:num_mappers
        if ~isempty(map_results{i})
            all_intermediate = [all_intermediate; map_results{i}(:)];
        end
    end
    
    fprintf('   Shuffle phase: %d intermediate results\n', length(all_intermediate));
    
    % Phase 3: Reduce
    reducer_chunk_size = ceil(length(all_intermediate) / num_reducers);
    reduce_results = cell(num_reducers, 1);
    
    fprintf('   Reduce phase:\n');
    for reducer = 1:num_reducers
        start_idx = (reducer - 1) * reducer_chunk_size + 1;
        end_idx = min(reducer * reducer_chunk_size, length(all_intermediate));
        
        if start_idx <= length(all_intermediate)
            chunk = all_intermediate(start_idx:end_idx);
            reduce_results{reducer} = reduce_func(chunk);
            fprintf('     Reducer %d: processed %d elements, result=%.4f\n', ...
                    reducer, length(chunk), reduce_results{reducer});
        else
            reduce_results{reducer} = 0;
        end
    end
    
    # Final aggregation
    result = sum([reduce_results{:}]);
    fprintf('   Final result: %.6f\n', result);
end

% Test MapReduce with word count simulation
word_count_data = randi([1, 100], 10000, 1);  % Simulate word IDs

% Map function: emit (word, 1) pairs
mapper = @(words) ones(size(words));  % Each word gets count of 1

% Reduce function: sum counts
reducer = @(counts) sum(counts);

mr_result = mapreduce_simulation(word_count_data, mapper, reducer, 4, 2);

% Compare with direct computation
direct_result = sum(ones(size(word_count_data)));
fprintf('   MapReduce vs direct: %.6f vs %.6f (diff: %.2e)\n', ...
        mr_result, direct_result, abs(mr_result - direct_result));
```

## 5. Performance Analysis and Optimization

```octave
% Performance analysis for parallel computing
fprintf('\n=== Performance Analysis and Optimization ===\n');

% Scalability analysis
fprintf('1. Scalability Analysis:\n');

function results = scalability_analysis(problem_sizes, worker_counts, algorithm_func)
    % Analyze algorithm scalability
    % Input: problem_sizes - array of problem sizes
    %        worker_counts - array of worker counts
    %        algorithm_func - function to test
    % Output: results - performance results
    
    results = struct();
    results.problem_sizes = problem_sizes;
    results.worker_counts = worker_counts;
    results.execution_times = zeros(length(problem_sizes), length(worker_counts));
    results.speedups = zeros(length(problem_sizes), length(worker_counts));
    results.efficiencies = zeros(length(problem_sizes), length(worker_counts));
    
    fprintf('   Scalability analysis: %d problem sizes, %d worker configurations\n', ...
            length(problem_sizes), length(worker_counts));
    
    for i = 1:length(problem_sizes)
        n = problem_sizes(i);
        fprintf('   Problem size: %d\n', n);
        
        # Generate test data
        test_data = rand(n, 1);
        
        for j = 1:length(worker_counts)
            workers = worker_counts(j);
            
            # Measure execution time
            tic;
            algorithm_func(test_data, workers);
            exec_time = toc;
            
            results.execution_times(i, j) = exec_time;
            
            # Calculate speedup relative to single worker
            if j == 1  % Assuming first entry is single worker
                baseline_time = exec_time;
                speedup = 1;
                efficiency = 1;
            else
                speedup = baseline_time / exec_time;
                efficiency = speedup / workers;
            end
            
            results.speedups(i, j) = speedup;
            results.efficiencies(i, j) = efficiency;
            
            fprintf('     %d workers: %.4fs (speedup: %.2fx, efficiency: %.1f%%)\n', ...
                    workers, exec_time, speedup, efficiency * 100);
        end
    end
    
    # Analysis summary
    fprintf('   Scalability Summary:\n');
    avg_efficiency = mean(results.efficiencies, 1);
    for j = 1:length(worker_counts)
        fprintf('     %d workers: average efficiency %.1f%%\n', ...
                worker_counts(j), avg_efficiency(j) * 100);
    end
end

% Test scalability with parallel sum
test_parallel_sum = @(data, workers) parallel_reduction(data, 'sum', workers);

problem_sizes = [1000, 5000, 10000];
worker_counts = [1, 2, 4, 8];

scale_results = scalability_analysis(problem_sizes, worker_counts, test_parallel_sum);

% Amdahl's Law analysis
fprintf('\n2. Amdahl''s Law Analysis:\n');

function theoretical_speedup = amdahls_law(serial_fraction, num_processors)
    % Calculate theoretical speedup using Amdahl's Law
    % S = 1 / (serial_fraction + (1 - serial_fraction) / num_processors)
    
    theoretical_speedup = 1 ./ (serial_fraction + (1 - serial_fraction) ./ num_processors);
end

function demonstrate_amdahls_law()
    processors = 1:16;
    serial_fractions = [0.01, 0.05, 0.1, 0.25];
    
    fprintf('   Amdahl''s Law theoretical speedups:\n');
    fprintf('   Processors: '); fprintf('%3d ', processors); fprintf('\n');
    
    for i = 1:length(serial_fractions)
        f = serial_fractions(i);
        speedups = amdahls_law(f, processors);
        
        fprintf('   Serial %.0f%%:  ', f * 100);
        fprintf('%5.2f', speedups);
        fprintf('\n');
    end
    
    % Maximum theoretical speedup
    fprintf('   Maximum theoretical speedups:\n');
    for i = 1:length(serial_fractions)
        f = serial_fractions(i);
        max_speedup = 1 / f;
        fprintf('     Serial %.0f%%: %.1fx\n', f * 100, max_speedup);
    end
end

demonstrate_amdahls_law();

% Gustafson's Law (weak scaling)
fprintf('\n3. Gustafson''s Law (Weak Scaling):\n');

function scaled_speedup = gustafsons_law(serial_fraction, num_processors)
    % Calculate speedup using Gustafson's Law (weak scaling)
    % S = serial_fraction + num_processors * (1 - serial_fraction)
    
    scaled_speedup = serial_fraction + num_processors .* (1 - serial_fraction);
end

function demonstrate_gustafsons_law()
    processors = 1:16;
    serial_fractions = [0.01, 0.05, 0.1, 0.25];
    
    fprintf('   Gustafson''s Law scaled speedups:\n');
    fprintf('   Processors: '); fprintf('%3d ', processors); fprintf('\n');
    
    for i = 1:length(serial_fractions)
        f = serial_fractions(i);
        speedups = gustafsons_law(f, processors);
        
        fprintf('   Serial %.0f%%:  ', f * 100);
        fprintf('%5.2f', speedups);
        fprintf('\n');
    end
end

demonstrate_gustafsons_law();

% Cache performance analysis
fprintf('\n4. Cache Performance Considerations:\n');

function cache_analysis = analyze_cache_performance(data_sizes, access_patterns)
    % Analyze cache performance for different access patterns
    cache_analysis = struct();
    cache_analysis.data_sizes = data_sizes;
    cache_analysis.patterns = access_patterns;
    cache_analysis.access_times = zeros(length(data_sizes), length(access_patterns));
    
    fprintf('   Cache performance analysis:\n');
    
    for i = 1:length(data_sizes)
        n = data_sizes(i);
        data = rand(n, n);  % Square matrix
        
        fprintf('   Matrix size: %dx%d (%.1f MB)\n', n, n, n*n*8/(1024^2));
        
        for j = 1:length(access_patterns)
            pattern = access_patterns{j};
            
            tic;
            switch pattern
                case 'row_major'
                    % Row-major access (cache-friendly)
                    sum_val = 0;
                    for row = 1:n
                        for col = 1:n
                            sum_val = sum_val + data(row, col);
                        end
                    end
                    
                case 'column_major'
                    # Column-major access (cache-friendly in Octave/MATLAB)
                    sum_val = 0;
                    for col = 1:n
                        for row = 1:n
                            sum_val = sum_val + data(row, col);
                        end
                    end
                    
                case 'random'
                    # Random access (cache-unfriendly)
                    sum_val = 0;
                    indices = randperm(n*n, min(1000, n*n));  % Sample random indices
                    for k = 1:length(indices)
                        idx = indices(k);
                        sum_val = sum_val + data(idx);
                    end
                    
                case 'blocked'
                    # Block access (cache-optimized)
                    sum_val = 0;
                    block_size = 32;
                    for block_i = 1:block_size:n
                        for block_j = 1:block_size:n
                            end_i = min(block_i + block_size - 1, n);
                            end_j = min(block_j + block_size - 1, n);
                            for row = block_i:end_i
                                for col = block_j:end_j
                                    sum_val = sum_val + data(row, col);
                                end
                            end
                        end
                    end
            end
            
            access_time = toc;
            cache_analysis.access_times(i, j) = access_time;
            
            fprintf('     %s: %.4f seconds\n', pattern, access_time);
        end
    end
    
    % Relative performance analysis
    fprintf('   Relative performance (normalized to column-major):\n');
    col_major_idx = find(strcmp(access_patterns, 'column_major'));
    if ~isempty(col_major_idx)
        for i = 1:length(data_sizes)
            baseline = cache_analysis.access_times(i, col_major_idx);
            fprintf('     Size %dx%d: ', data_sizes(i), data_sizes(i));
            for j = 1:length(access_patterns)
                relative = cache_analysis.access_times(i, j) / baseline;
                fprintf('%s=%.2fx ', access_patterns{j}, relative);
            end
            fprintf('\n');
        end
    end
end

# Test cache performance
test_sizes = [100, 200, 400];
access_patterns = {'column_major', 'row_major', 'blocked', 'random'};

cache_results = analyze_cache_performance(test_sizes, access_patterns);
```

---

# Summary

**Parallel Computing Mastery Completed:**

This comprehensive final notebook covered all essential aspects of parallel and distributed computing:

- ✅ **Parallel Fundamentals**: Parallel execution models, task scheduling, performance simulation
- ✅ **Parallel Algorithms**: Reduction operations, parallel matrix operations, parallel sorting
- ✅ **Load Balancing**: Dynamic load balancing, work-stealing algorithms, task distribution
- ✅ **Distributed Computing**: Message passing, MapReduce paradigm, distributed coordination
- ✅ **Performance Analysis**: Scalability analysis, Amdahl's and Gustafson's laws, cache optimization

**Expert Parallel Computing Skills:**
1. **Algorithm Design**: Implement efficient parallel algorithms with proper load balancing
2. **Performance Modeling**: Apply theoretical models to predict and optimize parallel performance
3. **Distributed Systems**: Design and coordinate distributed computational workflows
4. **Scalability Analysis**: Evaluate and optimize applications for different scales and architectures
5. **Memory Optimization**: Design cache-efficient algorithms for high-performance computing

**Professional HPC Development:**
- Design scalable parallel algorithms for scientific computing applications
- Implement efficient load balancing and task distribution strategies
- Apply performance modeling to optimize resource utilization
- Build distributed computing systems with proper coordination mechanisms
- Analyze and optimize memory access patterns for cache efficiency

**Research and Industry Impact:**
- **Scientific Computing**: Large-scale simulations, climate modeling, computational physics
- **Data Analytics**: Big data processing, machine learning at scale, real-time analytics
- **High-Performance Computing**: Supercomputing applications, grid computing, cloud computing
- **Financial Computing**: Risk analysis, algorithmic trading, portfolio optimization
- **Engineering**: Computational fluid dynamics, structural analysis, optimization problems

**Advanced Techniques Mastered:**
- Parallel algorithm design patterns and implementation strategies
- Performance modeling using Amdahl's and Gustafson's laws
- Load balancing techniques including work-stealing and dynamic scheduling
- Distributed computing paradigms including MapReduce and message passing
- Cache-aware algorithm design and memory optimization techniques

**Next Steps for Continued Growth:**
- Explore GPU computing and CUDA/OpenCL programming
- Study advanced parallel programming models (OpenMP, MPI)
- Investigate machine learning parallelization techniques
- Apply these concepts to real-world HPC systems and clusters
- Contribute to open-source parallel computing libraries and frameworks

**🎉 OctaveMasterPro Journey Complete! 🎉**

You have now mastered the complete spectrum of Octave programming, from basic syntax to advanced parallel computing. Your skills span:

**Foundation to Expert Pipeline:**
- Basic programming → Advanced algorithms
- Simple plotting → Publication-quality visualization  
- Linear algebra → Advanced numerical methods
- Basic I/O → Large-scale data processing
- Sequential code → Parallel and distributed computing

**Professional Readiness:**
Your OctaveMasterPro certification demonstrates expertise in scientific computing, numerical analysis, data visualization, software engineering, and high-performance computing. You're now prepared for:

- **Research Positions**: Computational science, engineering research, academic positions
- **Industry Roles**: Scientific software development, data analysis, quantitative analysis
- **Leadership**: Technical lead for scientific computing projects and teams
- **Innovation**: Contributing to cutting-edge computational methods and tools

**Congratulations on completing OctaveMasterPro!** 🚀🎯

Your journey in computational excellence begins now!