In [12]:
from pymongo import MongoClient

In [13]:
# Connect to local Mongo
client = MongoClient("mongodb://localhost:27017")
db = client["weather"]
obs = db["obs"]

In [14]:
print("Connected to DB:", db.name)

Connected to DB: weather


In [18]:
print(client.list_database_names())  # should list 'weather' if created
print(db.list_collection_names()) 

['admin', 'config', 'local', 'sales_weatherinfo_db', 'weather']
['obs']


In [15]:
import requests
from datetime import datetime, timezone

In [25]:
API_KEY = "cb3b7ff5870bf1836fcc688dc62dcb4f"
BASE_URL = "https://api.openweathermap.org/data/2.5/weather"

In [26]:
# Example cities 
CITIES = [
    {"id": 1275339, "name": "Mumbai"},
    {"id": 1273294, "name": "Delhi"},
    {"id": 1275004, "name": "Bengaluru"}
]

In [27]:
def celsius_from_kelvin(k):
    return round(k - 273.15, 2) if k is not None else None

def fetch_city_weather(city):
    params = {"appid": API_KEY, "id": city["id"]}
    r = requests.get(BASE_URL, params=params)
    data = r.json()
    return data

In [28]:
def transform(raw):
    city_info = {
        "id": raw.get("id"),
        "name": raw.get("name"),
        "country": raw.get("sys", {}).get("country"),
        "coord": raw.get("coord")
    }
    obs_time = datetime.fromtimestamp(raw["dt"], tz=timezone.utc)
    
    doc = {
        "city": city_info,
        "obs_time": obs_time,
        "main": {
            "temp_c": celsius_from_kelvin(raw["main"]["temp"]),
            "feels_like_c": celsius_from_kelvin(raw["main"]["feels_like"]),
            "pressure_hpa": raw["main"]["pressure"],
            "humidity_pct": raw["main"]["humidity"],
        },
        "wind": raw.get("wind"),
        "clouds_pct": raw.get("clouds", {}).get("all"),
        "visibility_m": raw.get("visibility"),
        "weather": raw.get("weather"),
        "ingest_time": datetime.now(timezone.utc)
    }
    return doc

In [29]:
from pymongo import errors

In [30]:
# Create unique index if not already
obs.create_index([("city.id", 1), ("obs_time", 1)], unique=True)

def load(doc):
    try:
        obs.update_one(
            {"city.id": doc["city"]["id"], "obs_time": doc["obs_time"]},
            {"$setOnInsert": doc},
            upsert=True
        )
        print(f"Inserted {doc['city']['name']} at {doc['obs_time']}")
    except errors.DuplicateKeyError:
        print("Duplicate skipped.")

In [32]:
for city in CITIES:
    raw = fetch_city_weather(city)
    doc = transform(raw)
    load(doc)


Inserted Mumbai at 2025-09-22 16:49:44+00:00
Inserted Delhi at 2025-09-22 16:56:56+00:00
Inserted Kolkata at 2025-09-22 16:55:10+00:00


In [33]:
pipeline = [
    {"$addFields": {
        "day": {"$dateToString": {"date": "$obs_time", "format": "%Y-%m-%d", "timezone": "UTC"}}
    }},
    {"$group": {
        "_id": {"city_id": "$city.id", "city_name": "$city.name", "day": "$day"},
        "min_temp": {"$min": "$main.temp_c"},
        "max_temp": {"$max": "$main.temp_c"},
        "avg_temp": {"$avg": "$main.temp_c"},
        "samples": {"$sum": 1}
    }},
    {"$sort": {"_id.city_id": 1, "_id.day": -1}}
]

results = list(obs.aggregate(pipeline))
results

[{'_id': {'city_id': 1273294, 'city_name': 'Delhi', 'day': '2025-09-22'},
  'min_temp': 31.05,
  'max_temp': 31.05,
  'avg_temp': 31.05,
  'samples': 1},
 {'_id': {'city_id': 1275004, 'city_name': 'Kolkata', 'day': '2025-09-22'},
  'min_temp': 27.97,
  'max_temp': 27.97,
  'avg_temp': 27.97,
  'samples': 1},
 {'_id': {'city_id': 1275339, 'city_name': 'Mumbai', 'day': '2025-09-22'},
  'min_temp': 27.99,
  'max_temp': 27.99,
  'avg_temp': 27.99,
  'samples': 1}]

In [None]:
import time

while True:
    for city in CITIES:
        raw = fetch_city_weather(city)
        doc = transform(raw)
        load(doc)
    print("ETL cycle complete, sleeping 30 min...")
    time.sleep(1800)  # 30 minutes


Inserted Mumbai at 2025-09-22 16:49:44+00:00
Inserted Delhi at 2025-09-22 16:56:56+00:00
Inserted Kolkata at 2025-09-22 16:55:10+00:00
ETL cycle complete, sleeping 30 min...
