# Install the required libraries

In [1]:
!pip install --quiet apache-beam

In [2]:
!pip install --quiet apache-beam[interactive]

# Import the required libraries

In [3]:
import apache_beam as beam
import csv
from io import StringIO

# Create an input directory

In [4]:
!mkdir -p weather_data

# Write to the input file as we are creating our own data to demostrate the apache beam lab

In [5]:
with open('weather_data/temps_large.csv', 'w') as f:
    f.write("""city,date,temperature
New York,2025-11-01,13
Boston,2025-11-01,9
Chicago,2025-11-01,6
Denver,2025-11-01,12
San Francisco,2025-11-01,18
Seattle,2025-11-01,11
New York,2025-11-02,15
Boston,2025-11-02,10
Chicago,2025-11-02,5
Denver,2025-11-02,13
San Francisco,2025-11-02,17
Seattle,2025-11-02,10
New York,2025-11-03,14
Boston,2025-11-03,11
Chicago,2025-11-03,7
Denver,2025-11-03,14
San Francisco,2025-11-03,18
Seattle,2025-11-03,12
New York,2025-11-04,13
Boston,2025-11-04,10
Chicago,2025-11-04,6
Denver,2025-11-04,12
San Francisco,2025-11-04,19
Seattle,2025-11-04,11
New York,2025-11-05,16
Boston,2025-11-05,12
Chicago,2025-11-05,8
Denver,2025-11-05,13
San Francisco,2025-11-05,18
Seattle,2025-11-05,12
New York,2025-11-06,15
Boston,2025-11-06,11
Chicago,2025-11-06,7
Denver,2025-11-06,14
San Francisco,2025-11-06,17
Seattle,2025-11-06,13
New York,2025-11-07,14
Boston,2025-11-07,10
Chicago,2025-11-07,6
Denver,2025-11-07,12
San Francisco,2025-11-07,18
Seattle,2025-11-07,11
New York,2025-11-08,13
Boston,2025-11-08,9
Chicago,2025-11-08,5
Denver,2025-11-08,13
San Francisco,2025-11-08,17
Seattle,2025-11-08,12
New York,2025-11-09,15
Boston,2025-11-09,11
Chicago,2025-11-09,7
Denver,2025-11-09,14
San Francisco,2025-11-09,18
Seattle,2025-11-09,13
""")


In [6]:
def parse_csv_line(line):
    line = line.strip()
    if not line or line.startswith("city"):
        return
    try:
        reader = csv.reader(StringIO(line))
        city, date, temp = next(reader)
        yield (city, float(temp))
    except Exception:
        return  # ignore malformed lines

# Apache beam pipeline

In [8]:
input_pattern = 'weather_data/*.csv'
output_prefix = 'weather_output/avg_temps'

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Read CSV' >> beam.io.ReadFromText(input_pattern)
        | 'Parse CSV' >> beam.FlatMap(parse_csv_line)
        | 'Group by city and average' >> beam.CombinePerKey(lambda temps: sum(list(temps))/len(list(temps)))
        | 'Format results' >> beam.Map(lambda x: f"{x[0]}: {x[1]:.1f}°C avg")
        | 'Write results' >> beam.io.WriteToText(output_prefix)
    )


In [9]:
!cat weather_output/avg_temps-00000-of-*

New York: 14.2°C avg
Boston: 10.3°C avg
Chicago: 6.3°C avg
Denver: 13.0°C avg
San Francisco: 17.8°C avg
Seattle: 11.7°C avg
