<a href="https://colab.research.google.com/github/Sara300804/BDT/blob/main/Ex_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import multiprocessing as mp
from collections import defaultdict
import random
import io
import sys

def generate_sample_data(num_records=1000):
    data = []
    years = list(range(1900, 2021))  # 1900-2020
    for _ in range(num_records):
        year = random.choice(years)
        temp_tenths = random.randint(-500, 500)  # -50°C to 50°C in tenths
        # Build fixed-width line (simplified; positions approximate)
        line = ' ' * 14 + f"{year:05d}" + ' ' * 67 + f"{temp_tenths:05d}" + ' ' * 10
        data.append(line)
    return data

# Step 2: Mapper function - Extract (year, temp) pairs
def map_record(line):
    try:
        year_str = line[15:20].strip()
        year = int(year_str)
        temp_str = line[87:92].strip()
        temp_tenths = int(temp_str)
        temp_c = temp_tenths / 10.0  # Convert to °C
        if -90 <= temp_c <= 60:  # Filter realistic temps
            return (str(year), temp_c)
    except ValueError:
        pass
    return None

# Step 3: Map phase - Parallelize over chunks
def map_phase(lines_chunk):
    mapped = []
    for line in lines_chunk:
        result = map_record(line)
        if result:
            mapped.append(result)
    return mapped

# Step 4: Reduce phase - Min/max per year
def reduce_temps(year_temps_list):
    temps_by_year = defaultdict(list)
    for year, temp in year_temps_list:
        temps_by_year[year].append(temp)

    report = []
    for year, temps in sorted(temps_by_year.items()):
        min_temp = min(temps)
        max_temp = max(temps)
        report.append((int(year), round(min_temp, 1), round(max_temp, 1)))
    return report

# Step 5: Main execution
def run_mapreduce(num_cores=2, num_records=1000):
    print("Generating sample weather data...")
    lines = generate_sample_data(num_records)

    print("Map phase: Extracting (year, temp) pairs...")
    # Split lines into chunks for parallel mapping
    chunk_size = len(lines) // num_cores
    chunks = [lines[i:i + chunk_size] for i in range(0, len(lines), chunk_size)]

    with mp.Pool(num_cores) as pool:
        mapped_chunks = pool.map(map_phase, chunks)

    # Flatten mapped results
    all_mapped = [item for chunk in mapped_chunks for item in chunk]

    print("Reduce phase: Computing min/max per year...")
    report = reduce_temps(all_mapped)

    return report, lines

# Run the POC
if __name__ == "__main__":
    report, sample_lines = run_mapreduce(num_cores=mp.cpu_count(), num_records=1000)

    print("\n=== Weather Temperature Statistics Report ===")
    print("Year\tMin Temp (°C)\tMax Temp (°C)")
    print("-" * 35)
    for year, min_t, max_t in report:
        print(f"{year}\t{min_t}\t\t{max_t}")

    print(f"\nSample input lines (first 3):")
    for line in sample_lines[:3]:
        print(repr(line[:50] + "..."))

Generating sample weather data...
Map phase: Extracting (year, temp) pairs...
Reduce phase: Computing min/max per year...

=== Weather Temperature Statistics Report ===
Year	Min Temp (°C)	Max Temp (°C)
-----------------------------------
1900	4.7		46.8
1901	1.2		47.7
1902	33.4		37.5
1903	1.1		38.1
1904	2.0		48.5
1905	14.9		36.0
1906	8.0		49.2
1907	10.0		47.6
1908	3.5		49.2
1909	3.2		47.8
1910	1.3		49.5
1911	5.5		48.1
1912	0.2		49.6
1913	13.8		47.3
1914	16.6		31.1
1915	6.8		50.0
1916	12.1		49.3
1917	12.0		47.3
1918	3.3		49.9
1919	2.6		49.4
1920	5.7		49.6
1921	8.5		47.4
1922	2.5		48.6
1923	19.7		44.0
1924	13.3		49.2
1925	3.0		48.0
1926	4.3		48.2
1927	0.2		30.5
1928	5.7		39.6
1929	9.4		45.0
1930	0.4		44.8
1931	8.1		45.4
1932	1.0		46.6
1933	30.1		49.6
1934	4.4		49.9
1935	2.8		38.9
1936	4.5		35.7
1937	0.8		44.4
1938	16.6		36.4
1939	1.0		40.9
1940	9.3		46.2
1941	11.9		46.6
1942	14.0		48.6
1943	3.3		45.5
1944	10.9		48.8
1945	2.2		49.7
1946	3.3		38.7
1947	4.7		47.2
1948	8.1		36.4
1949	1.8		45.