In [2]:
import dlt
import requests

# List of cities with their coordinates
CITIES = {
    "paris": (48.8566, 2.3522),
    "new_york": (40.7128, -74.0060),
    "tokyo": (35.6762, 139.6503)
}

def fetch_weather():
    for city, (lat, lon) in CITIES.items():
        url = (
            f"https://api.open-meteo.com/v1/forecast"
            f"?latitude={lat}&longitude={lon}&current_weather=true"
        )
        resp = requests.get(url)
        data = resp.json()
        data['city'] = city
        yield data

# Define the pipeline
pipeline = dlt.pipeline(
    pipeline_name="weather_pipeline",
    destination="postgres",
    dataset_name="weather_data"
)

# Run the pipeline
load_info = pipeline.run(
    fetch_weather(),
    table_name="current_weather",
    write_disposition="replace"
)

print(load_info)

Pipeline weather_pipeline load step completed in 0.12 seconds
1 load package(s) were loaded to destination postgres and into dataset weather_data
The postgres destination used postgresql://postgres:***@localhost:5432/sandbox location to store data
Load package 1749319631.5142121 is LOADED and contains no failed jobs


In [None]:
import dlt
import requests

# Define cities and their coordinates
CITIES = {
    "paris": (48.8566, 2.3522),
    "new_york": (40.7128, -74.0060),
    "tokyo": (35.6895, 139.6917),
}

def all_rain_data(start_year=2010, end_year=2011):
    for city_name, (lat, lon) in CITIES.items():
        for year in range(start_year, end_year):
            start_date = f"{year}-01-01"
            end_date = f"{year}-12-31"

            url = (
                "https://archive-api.open-meteo.com/v1/archive"
                f"?latitude={lat}&longitude={lon}"
                f"&start_date={start_date}&end_date={end_date}"
                "&daily=precipitation_sum,rain_sum"
                "&timezone=UTC"
            )

            print(f"Fetching {city_name} - {year}")
            print(url)
            response = requests.get(url)
            response.raise_for_status()
            data = response.json()

            for i, date in enumerate(data["daily"]["time"]):
                yield {
                    "date": date,
                    "city": city_name,
                    "precipitation_sum": data["daily"]["precipitation_sum"][i],
                    "rain_sum": data["daily"]["rain_sum"][i]
                }

@dlt.source
def open_meteo_source(start_year=2010, end_year=2011):
    yield all_rain_data(start_year=start_year, end_year=end_year)

if __name__ == "__main__":
    pipeline = dlt.pipeline(
        pipeline_name="historical_weather_pipeline",
        destination="postgres",
        dataset_name="weather_data"
    )

    load_info = pipeline.run(
        open_meteo_source(start_year=2010, end_year=2025),
            ="merge",
        primary_key=["date", "city"]        
        )
    
    print(load_info)


Fetching paris - 2010
https://archive-api.open-meteo.com/v1/archive?latitude=48.8566&longitude=2.3522&start_date=2010-01-01&end_date=2010-12-31&daily=precipitation_sum,rain_sum&timezone=UTC
Fetching paris - 2011
https://archive-api.open-meteo.com/v1/archive?latitude=48.8566&longitude=2.3522&start_date=2011-01-01&end_date=2011-12-31&daily=precipitation_sum,rain_sum&timezone=UTC
Fetching paris - 2012
https://archive-api.open-meteo.com/v1/archive?latitude=48.8566&longitude=2.3522&start_date=2012-01-01&end_date=2012-12-31&daily=precipitation_sum,rain_sum&timezone=UTC
Fetching paris - 2013
https://archive-api.open-meteo.com/v1/archive?latitude=48.8566&longitude=2.3522&start_date=2013-01-01&end_date=2013-12-31&daily=precipitation_sum,rain_sum&timezone=UTC
Fetching paris - 2014
https://archive-api.open-meteo.com/v1/archive?latitude=48.8566&longitude=2.3522&start_date=2014-01-01&end_date=2014-12-31&daily=precipitation_sum,rain_sum&timezone=UTC
Fetching paris - 2015
https://archive-api.open-met