In [3]:
import multiprocessing as mp
from collections import defaultdict
import random
import io
import sys

def generate_sample_data(num_records=1000):
    data = []
    years = list(range(1900, 2021))  # 1900-2020
    for _ in range(num_records):
        year = random.choice(years)
        temp_tenths = random.randint(-500, 500)
        line = ' ' * 14 + f"{year:05d}" + ' ' * 67 + f"{temp_tenths:05d}" + ' ' * 10
        data.append(line)
    return data

def map_record(line):
    try:
        year_str = line[15:20].strip()
        year = int(year_str)
        temp_str = line[87:92].strip()
        temp_tenths = int(temp_str)
        temp_c = temp_tenths / 10.0
        if -90 <= temp_c <= 60:
            return (str(year), temp_c)
    except ValueError:
        pass
    return None

def map_phase(lines_chunk):
    mapped = []
    for line in lines_chunk:
        result = map_record(line)
        if result:
            mapped.append(result)
    return mapped

def reduce_temps(year_temps_list):
    temps_by_year = defaultdict(list)
    for year, temp in year_temps_list:
        temps_by_year[year].append(temp)

    report = []
    for year, temps in sorted(temps_by_year.items()):
        min_temp = min(temps)
        max_temp = max(temps)
        report.append((int(year), round(min_temp, 1), round(max_temp, 1)))
    return report

def run_mapreduce(num_cores=2, num_records=1000):
    print("Generating sample weather data...")
    lines = generate_sample_data(num_records)

    print("Map phase: Extracting (year, temp) pairs...")

    chunk_size = len(lines) // num_cores
    chunks = [lines[i:i + chunk_size] for i in range(0, len(lines), chunk_size)]

    with mp.Pool(num_cores) as pool:
        mapped_chunks = pool.map(map_phase, chunks)

    all_mapped = [item for chunk in mapped_chunks for item in chunk]

    print("Reduce phase: Computing min/max per year...")
    report = reduce_temps(all_mapped)

    return report, lines

if __name__ == "__main__":
    report, sample_lines = run_mapreduce(num_cores=mp.cpu_count(), num_records=1000)

    print("\n=== Weather Temperature Statistics Report ===")
    print("Year\tMin Temp (째C)\tMax Temp (째C)")
    print("-" * 35)
    for year, min_t, max_t in report:
        print(f"{year}\t{min_t}\t\t{max_t}")

    print(f"\nSample input lines (first 3):")
    for line in sample_lines[:3]:
        print(repr(line[:50] + "..."))

Generating sample weather data...
Map phase: Extracting (year, temp) pairs...
Reduce phase: Computing min/max per year...

=== Weather Temperature Statistics Report ===
Year	Min Temp (째C)	Max Temp (째C)
-----------------------------------
1900	0.7		45.7
1901	7.7		47.9
1902	12.8		46.7
1903	21.6		48.1
1904	4.8		42.5
1905	5.9		47.3
1906	10.4		45.7
1907	3.8		42.8
1908	1.7		31.2
1909	6.4		48.0
1910	3.5		50.0
1911	6.7		45.1
1912	3.2		42.3
1913	6.4		47.0
1914	14.5		41.8
1915	0.0		44.8
1916	3.8		43.3
1917	3.1		49.4
1918	16.2		47.6
1919	11.4		48.3
1920	13.8		49.5
1921	0.3		46.9
1922	0.7		46.7
1923	10.5		40.0
1924	0.1		39.4
1925	40.3		41.8
1926	4.3		41.2
1927	7.1		40.4
1928	11.1		43.6
1929	13.5		43.2
1930	1.8		37.8
1931	4.3		41.8
1932	6.1		49.2
1933	2.8		49.1
1934	2.1		47.5
1935	2.7		46.9
1936	0.8		29.5
1937	2.2		45.7
1938	11.8		29.6
1939	2.4		48.8
1940	3.1		48.8
1941	4.5		45.0
1942	1.6		47.2
1943	0.4		37.4
1944	4.8		36.9
1945	23.9		43.5
1946	0.6		48.5
1947	8.0		46.8
1948	1.2		42.1
1949	6.4		47.0