In [1]:
# Original input data
data = [
    "hello world",
    "hello map reduce",
    "hello world world",
    "map reduce map",
    "world of map reduce",
    "hello distributed world"
]

num_workers = 4

# Split data into chunks (one per worker)
chunks = [[] for _ in range(num_workers)]

for i, line in enumerate(data):
    worker_id = i % num_workers
    chunks[worker_id].append(line)

chunks


[['hello world', 'world of map reduce'],
 ['hello map reduce', 'hello distributed world'],
 ['hello world world'],
 ['map reduce map']]

In [2]:
def map_function(line):
    return [(word, 1) for word in line.split()]

worker_maps = []

for wid, chunk in enumerate(chunks):
    local_result = []
    for line in chunk:
        local_result.extend(map_function(line))
    worker_maps.append(local_result)
    
    print(f"Worker {wid} map output:")
    print(local_result)
    print()


Worker 0 map output:
[('hello', 1), ('world', 1), ('world', 1), ('of', 1), ('map', 1), ('reduce', 1)]

Worker 1 map output:
[('hello', 1), ('map', 1), ('reduce', 1), ('hello', 1), ('distributed', 1), ('world', 1)]

Worker 2 map output:
[('hello', 1), ('world', 1), ('world', 1)]

Worker 3 map output:
[('map', 1), ('reduce', 1), ('map', 1)]



In [4]:
def assign_reducer(key, num_reducers):
    return hash(key) % num_reducers

num_reducers = 2

In [5]:
from collections import defaultdict

shuffle_buffers = [defaultdict(list) for _ in range(num_reducers)]

for wid, mapped_data in enumerate(worker_maps):
    for key, value in mapped_data:
        rid = assign_reducer(key, num_reducers)
        shuffle_buffers[rid][key].append(value)


In [6]:
for rid, buffer in enumerate(shuffle_buffers):
    print(f"Reducer {rid} received:")
    for k, v in buffer.items():
        print(f"  {k}: {v}")
    print()


Reducer 0 received:
  reduce: [1, 1, 1]

Reducer 1 received:
  hello: [1, 1, 1, 1]
  world: [1, 1, 1, 1, 1]
  of: [1]
  map: [1, 1, 1, 1]
  distributed: [1]



In [7]:
# aggregation
def reduce_function(key, values):
    return (key, sum(values))

final_results = []

for rid, buffer in enumerate(shuffle_buffers):
    for key, values in buffer.items():
        final_results.append(reduce_function(key, values))

final_results


[('reduce', 3),
 ('hello', 4),
 ('world', 5),
 ('of', 1),
 ('map', 4),
 ('distributed', 1)]