In [None]:
import numpy as np
import time
import psutil
from collections import defaultdict
import os


BLOCK_SIZE = 500        
DTYPE = np.float64


def generate_large_matrix(n):
    """
    Generates a large random matrix.
    """
    return np.random.rand(n, n).astype(DTYPE)


def split_into_blocks(matrix, block_size):
    """
    Splits a matrix into square blocks.
    """
    n = matrix.shape[0]
    blocks = {}

    for i in range(0, n, block_size):
        for j in range(0, n, block_size):
            blocks[(i // block_size, j // block_size)] = \
                matrix[i:i+block_size, j:j+block_size]

    return blocks


def map_matrix_A(blocks_A, num_blocks):
    mapped = []
    for (i, k), block in blocks_A.items():
        for j in range(num_blocks):
            mapped.append(((i, j), ("A", k, block)))
    return mapped

def map_matrix_B(blocks_B, num_blocks):
    mapped = []
    for (k, j), block in blocks_B.items():
        for i in range(num_blocks):
            mapped.append(((i, j), ("B", k, block)))
    return mapped


def reduce_block(key, values):
    """
    Combines partial blocks to compute C_ij.
    """
    A_blocks = {}
    B_blocks = {}

    for tag, k, block in values:
        if tag == "A":
            A_blocks[k] = block
        else:
            B_blocks[k] = block

    result = None
    for k in A_blocks:
        if k in B_blocks:
            partial = A_blocks[k] @ B_blocks[k]
            result = partial if result is None else result + partial

    return key, result


def distributed_matrix_multiply(A, B):
    """
    Executes distributed matrix multiplication using MapReduce.
    """
    n = A.shape[0]
    num_blocks = n // BLOCK_SIZE

    print("\n--- Splitting matrices into blocks ---")
    blocks_A = split_into_blocks(A, BLOCK_SIZE)
    blocks_B = split_into_blocks(B, BLOCK_SIZE)

    # Network overhead estimation
    network_bytes = sum(block.nbytes for block in blocks_A.values()) + \
                    sum(block.nbytes for block in blocks_B.values())

    print(f"Total blocks A: {len(blocks_A)}")
    print(f"Total blocks B: {len(blocks_B)}")
    print(f"Estimated data transferred: {network_bytes / 1e6:.2f} MB")

    start_time = time.perf_counter()

    # ---------------- Map ----------------
    mapped = map_matrix_A(blocks_A, num_blocks) + \
             map_matrix_B(blocks_B, num_blocks)

    # ---------------- Shuffle ----------------
    shuffled = defaultdict(list)
    for key, value in mapped:
        shuffled[key].append(value)

    # ---------------- Reduce ----------------
    reduced_blocks = {}
    for key, values in shuffled.items():
        reduced_blocks[key] = reduce_block(key, values)[1]

    end_time = time.perf_counter()

    execution_time = end_time - start_time
    cpu_usage = psutil.cpu_percent(interval=0.1)
    memory_usage = psutil.Process().memory_info().rss / (1024**2)

    print(f"\nExecution time: {execution_time:.2f} s")
    print(f"CPU usage: {cpu_usage:.1f} %")
    print(f"Memory usage: {memory_usage:.1f} MB")

    return reduced_blocks, execution_time, network_bytes, cpu_usage, memory_usage


def run_experiments(matrix_sizes):
    results = []

    for n in matrix_sizes:
        print(f"\n===============================")
        print(f" Distributed multiplication {n}x{n}")
        print(f"===============================")

        A = generate_large_matrix(n)
        B = generate_large_matrix(n)

        blocks, t, net, cpu, mem = distributed_matrix_multiply(A, B)

        results.append({
            "size": n,
            "time": t,
            "network_MB": net / 1e6,
            "cpu": cpu,
            "memory": mem,
            "blocks": len(blocks)
        })

    return results

import matplotlib.pyplot as plt
import os


def plot_distributed_results(results):

    os.makedirs("plots_distributed", exist_ok=True)

    sizes = [r["size"] for r in results]
    times = [r["time"] for r in results]
    network = [r["network_MB"] for r in results]
    memory = [r["memory"] for r in results]
    cpu = [r["cpu"] for r in results]

    # ------------------------------------------------
    # Execution Time
    # ------------------------------------------------
    plt.figure()
    plt.plot(sizes, times, marker="o")
    plt.xlabel("Matrix size (n)")
    plt.ylabel("Execution time (s)")
    plt.title("Scalability of Distributed Matrix Multiplication")
    plt.grid()
    plt.tight_layout()
    plt.savefig("plots_distributed/time_vs_size.png")
    plt.close()

    # ------------------------------------------------
    # Network Overhead
    # ------------------------------------------------
    plt.figure()
    plt.plot(sizes, network, marker="o")
    plt.xlabel("Matrix size (n)")
    plt.ylabel("Data transferred (MB)")
    plt.title("Network Overhead")
    plt.grid()
    plt.tight_layout()
    plt.savefig("plots_distributed/network_vs_size.png")
    plt.close()

    # ------------------------------------------------
    # Memory Usage
    # ------------------------------------------------
    plt.figure()
    plt.plot(sizes, memory, marker="o")
    plt.xlabel("Matrix size (n)")
    plt.ylabel("Memory usage (MB)")
    plt.title("Memory Consumption")
    plt.grid()
    plt.tight_layout()
    plt.savefig("plots_distributed/memory_vs_size.png")
    plt.close()

    # ------------------------------------------------
    # CPU Usage
    # ------------------------------------------------
    plt.figure()
    plt.plot(sizes, cpu, marker="o")
    plt.xlabel("Matrix size (n)")
    plt.ylabel("CPU usage (%)")
    plt.title("CPU Utilization")
    plt.grid()
    plt.tight_layout()
    plt.savefig("plots_distributed/cpu_vs_size.png")
    plt.close()

    # ------------------------------------------------
    # Log-Log Plot (Very Important)
    # ------------------------------------------------
    plt.figure()
    plt.loglog(sizes, times, marker="o")
    plt.xlabel("Matrix size (log scale)")
    plt.ylabel("Execution time (log scale)")
    plt.title("Log-Log Scalability Plot")
    plt.grid(True, which="both")
    plt.tight_layout()
    plt.savefig("plots_distributed/loglog_time.png")
    plt.close()

    print("\nDistributed plots saved in folder: plots_distributed/")



def main():
    matrix_sizes = [1000, 2500, 5000, 10000]

    results = run_experiments(matrix_sizes)
    plot_distributed_results(results)

    print("\n=========== FINAL RESULTS ===========")
    for r in results:
        print(f"Size: {r['size']}x{r['size']} | "
              f"Time: {r['time']:.2f}s | "
              f"Network: {r['network_MB']:.2f}MB | "
              f"CPU: {r['cpu']:.1f}% | "
              f"Mem: {r['memory']:.1f}MB | "
              f"Blocks: {r['blocks']}")

if __name__ == "__main__":
    main()


 Distributed multiplication 1000x1000

--- Splitting matrices into blocks ---
Total blocks A: 4
Total blocks B: 4
Estimated data transferred: 16.00 MB

Execution time: 0.05 s
CPU usage: 13.0 %
Memory usage: 96.9 MB

 Distributed multiplication 2500x2500

--- Splitting matrices into blocks ---
Total blocks A: 25
Total blocks B: 25
Estimated data transferred: 100.00 MB

Execution time: 0.71 s
CPU usage: 13.3 %
Memory usage: 224.8 MB

 Distributed multiplication 5000x5000

--- Splitting matrices into blocks ---
Total blocks A: 100
Total blocks B: 100
Estimated data transferred: 400.00 MB

Execution time: 5.22 s
CPU usage: 48.0 %
Memory usage: 694.7 MB

 Distributed multiplication 10000x10000

--- Splitting matrices into blocks ---
Total blocks A: 400
Total blocks B: 400
Estimated data transferred: 1600.00 MB

Execution time: 36.18 s
CPU usage: 10.0 %
Memory usage: 2561.8 MB

Distributed plots saved in folder: plots_distributed/

Size: 1000x1000 | Time: 0.05s | Network: 16.00MB | CPU: 13.