In [None]:
def mapper(text):
    """
    Simulates the mapper function of MapReduce.
    Splits text into words and emits <word, 1>.
    """
    word_list = text.strip().split()
    for word in word_list:
        yield (word, 1)


def combiner(mapped_data):
    """
    Local aggregation (same as reducer logic).
    """
    combined = defaultdict(int)
    for word, count in mapped_data:
        combined[word] += count
    return combined.items()


def reducer(grouped_data):
    """
    Reduces grouped values by summing counts.
    """
    reduced = {}
    for word, counts in grouped_data.items():
        reduced[word] = sum(counts)
    return reduced


if __name__ == "__main__":

    input1 = "Hello World Bye World"
    input2 = "Hello Hadoop Goodbye Hadoop"


    mapped1 = list(mapper(input1))
    mapped2 = list(mapper(input2))

    print("Mapper Output (Input1):", mapped1)
    print("Mapper Output (Input2):", mapped2)


    combined1 = dict(combiner(mapped1))
    combined2 = dict(combiner(mapped2))

    print("Combiner Output (Input1):", combined1)
    print("Combiner Output (Input2):", combined2)


    shuffled = defaultdict(list)
    for word, count in list(combined1.items()) + list(combined2.items()):
        shuffled[word].append(count)

    print("Shuffled Data:", dict(shuffled))

    final_output = reducer(shuffled)

    print("\nFinal Word Count Output:")
    for word, count in sorted(final_output.items()):
        print(f"<{word},{count}>")


Mapper Output (Input1): [('Hello', 1), ('World', 1), ('Bye', 1), ('World', 1)]
Mapper Output (Input2): [('Hello', 1), ('Hadoop', 1), ('Goodbye', 1), ('Hadoop', 1)]
Combiner Output (Input1): {'Hello': 1, 'World': 2, 'Bye': 1}
Combiner Output (Input2): {'Hello': 1, 'Hadoop': 2, 'Goodbye': 1}
Shuffled Data: {'Hello': [1, 1], 'World': [2], 'Bye': [1], 'Hadoop': [2], 'Goodbye': [1]}

Final Word Count Output:
<Bye,1>
<Goodbye,1>
<Hadoop,2>
<Hello,2>
<World,2>


In [None]:
from collections import defaultdict

def mapper(line):
    """
    Extracts date and temperature from input.
    Emits (date, temperature).
    Input format: "YYYY-MM-DD HH:MM,temp"
    Example: "2025-09-01 14:00,35"
    """
    try:
        line = line.strip()
        datetime, temp = line.split(",")
        date = datetime.split(" ")[0]
        temp = float(temp)
        return date, temp
    except:
        return None


def reducer(mapped_data):
    """
    Groups temperatures by date and computes max & min per day.
    """
    grouped = defaultdict(list)
    for item in mapped_data:
        if item:
            date, temp = item
            grouped[date].append(temp)

    results = []
    for date in sorted(grouped.keys()):
        results.append((date, max(grouped[date]), min(grouped[date])))
    return results


if __name__ == "__main__":

    weather_data = [
        "2025-09-01 14:00,35",
        "2025-09-01 15:00,33",
        "2025-09-01 16:00,37",
        "2025-09-02 14:00,32",
        "2025-09-02 15:00,34"
    ]


    mapped = [mapper(line) for line in weather_data]
    print("Mapper Output:", mapped)

    final_output = reducer(mapped)

    print("\nFinal Weather Report (Daily Max & Min):")
    for date, max_temp, min_temp in final_output:
        print(f"{date} max={max_temp} min={min_temp}")


Mapper Output: [('2025-09-01', 35.0), ('2025-09-01', 33.0), ('2025-09-01', 37.0), ('2025-09-02', 32.0), ('2025-09-02', 34.0)]

Final Weather Report (Daily Max & Min):
2025-09-01 max=37.0 min=33.0
2025-09-02 max=34.0 min=32.0
