Exercise 2.3.3: In the form of relational algebra implemented in SQL, relations are not sets, but bags; that is, tuples are allowed to appear more than once. There are extended definitions of union, intersection, and difference for bags, which we shall define below. Write MapReduce algorithms for computing the following operations on bags R and S:


A. Bag Union

In [1]:
def mapper(_, line):
    relation, tuple_id, count = line.strip().split()
    yield (relation, int(count))

In [2]:
def reducer(relation, counts):  
    yield (relation, sum(counts))

In [26]:
# Each line represent relation, tuple_id, count

input_data = [
    "R Apple 1",
    "R Banana 2",
    "S Grape 3",
    "S Apple 4",
    "S Banana 5"
]

In [27]:
mapped_values = []
for line in input_data:
    for result in mapper(None, line):
        mapped_values.append(result)

In [28]:
shuffled_data = {}
for key, value in mapped_values:
    if key not in shuffled_data:
        shuffled_data[key] = []
    shuffled_data[key].append(value)

In [31]:
final_output = []
for key, values in shuffled_data.items():
    for result in reducer(key, values):
        final_output.append(result)

In [32]:
print("bag Union result: ", final_output)


bag Union result:  [('R', 3), ('S', 12)]


B. Bag Intersection

In [73]:
def mapper(_, line):
    relation, tuple_id, count = line.strip().split()
    yield (tuple_id, (relation, int(count)))

In [74]:
def reducer(tuple_id, values):
    count_R = 0
    count_S = 0

    for relation, count in values:
        if relation == 'R':
            count_R += count
        elif relation == 'S':
            count_S += count
    
    if count_R > 0 and count_S > 0:
        yield (tuple_id, min(count_R, count_S))

In [75]:
new_data = [
    "R Banana 2",
    "R Grape 5",
    "R Grape 3",
    "S Apple 1",
    "S Banana 4"
]

In [76]:
from collections import defaultdict

def run_mapreduce(input_data):
    # MAP PHASE
    mapped_values = []
    for line in input_data:
        for result in mapper(None, line):
            mapped_values.append(result)

    # SHUFFLE PHASE (Grouping Keys)
    shuffled_data = defaultdict(list)
    for key, value in mapped_values:
        shuffled_data[key].append(value)

    # REDUCE PHASE
    final_output = []
    for key, values in shuffled_data.items():
        for result in reducer(key, values):
            final_output.append(result)

    return final_output

if __name__ == "__main__":
    output = run_mapreduce(new_data)

    print("Bag Intersection Result:", output)


Bag Intersection Result: [('Banana', 2)]
