<a href="https://colab.research.google.com/github/amp77777/MapReduceProject/blob/main/Main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install psutil



In [None]:
# ==========================
# MapReduce Systems Project
# Parallel Sorting and Max-Value Aggregation
# ==========================

import time, os, random, heapq, psutil
import threading
import multiprocessing as mp

# ---------------------------
# Utility helpers
# ---------------------------

def current_rss():
    """Return current process RSS memory in bytes."""
    p = psutil.Process(os.getpid())
    return p.memory_info().rss

def human_bytes(n):
    for unit in ['B','KB','MB','GB']:
        if n < 1024.0:
            return f"{n:.2f}{unit}"
        n /= 1024.0
    return f"{n:.2f}TB"

# ---------------------------
# Helper: Split array indices evenly among workers
# ---------------------------
def chunk_indices(n_total, n_chunks):
    base = n_total // n_chunks
    remainder = n_total % n_chunks
    pairs, start = [], 0
    for i in range(n_chunks):
        extra = 1 if i < remainder else 0
        end = start + base + extra
        pairs.append((start, end))
        start = end
    return pairs

# ==========================
# PART 1: Parallel Sorting (MapReduce Style)
# ==========================

def threaded_sort_map(data, n_workers):
    n = len(data)
    pairs = chunk_indices(n, n_workers)
    sorted_chunks = [None]*n_workers

    def worker(i, s, e):
        arr = data[s:e]
        arr.sort()
        sorted_chunks[i] = (i, arr)

    threads = []
    t0 = time.perf_counter()
    rss_before = current_rss()

    for i,(s,e) in enumerate(pairs):
        t = threading.Thread(target=worker, args=(i,s,e))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()

    t1 = time.perf_counter()
    rss_after = current_rss()

    sorted_chunks.sort(key=lambda x:x[0])
    merged = list(heapq.merge(*[c[1] for c in sorted_chunks]))
    return merged, t1 - t0, rss_before, rss_after

def process_sort_worker(args):
    i, arr = args
    arr.sort()
    return (i, arr)

def multiprocessing_sort_map(data, n_workers):
    n = len(data)
    pairs = chunk_indices(n, n_workers)
    args = [(i, data[s:e]) for i,(s,e) in enumerate(pairs)]

    t0 = time.perf_counter()
    rss_before = current_rss()
    with mp.Pool(n_workers) as pool:
        results = pool.map(process_sort_worker, args)
    t1 = time.perf_counter()
    rss_after = current_rss()

    results.sort(key=lambda x:x[0])
    merged = list(heapq.merge(*[r[1] for r in results]))
    return merged, t1 - t0, rss_before, rss_after

# ==========================
# PART 2: Max-Value Aggregation with Constrained Shared Memory
# ==========================

def threaded_max_map_reduce(data, n_workers):
    pairs = chunk_indices(len(data), n_workers)
    shared = {'value': float('-inf')}
    lock = threading.Lock()

    def worker(s,e):
        local_max = max(data[s:e]) if e > s else float('-inf')
        with lock:
            if local_max > shared['value']:
                shared['value'] = local_max

    threads = []
    t0 = time.perf_counter()
    rss_before = current_rss()

    for (s,e) in pairs:
        t = threading.Thread(target=worker, args=(s,e))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()

    t1 = time.perf_counter()
    rss_after = current_rss()
    return shared['value'], t1 - t0, rss_before, rss_after

def multiprocessing_max_map_reduce(data, n_workers):
    pairs = chunk_indices(len(data), n_workers)
    shared_val = mp.Value('i', -2**31)
    lock = mp.Lock()

    def worker(local):
        local_max = max(local) if local else -2**31
        with lock:
            if local_max > shared_val.value:
                shared_val.value = int(local_max)

    t0 = time.perf_counter()
    rss_before = current_rss()
    procs = []
    for (s,e) in pairs:
        local = data[s:e]
        p = mp.Process(target=worker, args=(local,))
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
    t1 = time.perf_counter()
    rss_after = current_rss()
    return shared_val.value, t1 - t0, rss_before, rss_after

# ==========================
# EXPERIMENTS
# ==========================

def run_sort_experiment(sizes=(32,131072), workers=(1,2,4,8)):
    random.seed(0)
    for n in sizes:
        print(f"\n=== Sorting Experiment | Input Size: {n} ===")
        data = [random.randint(0,100000) for _ in range(n)]
        for w in workers:
            sorted_thread, t_thread, b1,a1 = threaded_sort_map(data, w)
            sorted_proc, t_proc, b2,a2 = multiprocessing_sort_map(data, w)
            if n == 32:
                correct = sorted_thread == sorted(data) and sorted_proc == sorted(data)
                print(f"\nWorkers={w} | Correctness: {'PASS' if correct else 'FAIL'}")
            print(f"Threads  | Time={t_thread:.4f}s | RSS Δ={human_bytes(a1-b1)}")
            print(f"Processes| Time={t_proc:.4f}s | RSS Δ={human_bytes(a2-b2)}")

def run_max_experiment(sizes=(32,131072), workers=(1,2,4,8)):
    random.seed(1)
    for n in sizes:
        print(f"\n=== Max Aggregation | Input Size: {n} ===")
        data = [random.randint(-1000000,1000000) for _ in range(n)]
        truth = max(data)
        for w in workers:
            val_thread, t_thread, b1,a1 = threaded_max_map_reduce(data, w)
            val_proc, t_proc, b2,a2 = multiprocessing_max_map_reduce(data, w)
            print(f"\nWorkers={w}:")
            print(f"Threads  | Value={val_thread}, OK={val_thread==truth}, Time={t_thread:.6f}s")
            print(f"Processes| Value={val_proc}, OK={val_proc==truth}, Time={t_proc:.6f}s")

if __name__ == "__main__":
    mp.set_start_method('spawn', force=True)
    print("Running MapReduce-style experiments...\n")
    run_sort_experiment()
    run_max_experiment()
    print("\nAll experiments completed.")

Running MapReduce-style experiments...


=== Sorting Experiment | Input Size: 32 ===
