In [25]:
import os
import time
import logging
from datetime import datetime, timezone

import requests
import psycopg2
import schedule
from dotenv import load_dotenv
import pandas as pd
import json



In [26]:
# --- CONFIGURATION ---
# Load environment variables from .env file
load_dotenv()

# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Load credentials and settings from environment variables
OPENWEATHER_API_KEY = os.getenv("OPENWEATHER_API_KEY")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")
DB_HOST = os.getenv("DB_HOST") # This will be 'db' from docker-compose

# List of cities to monitor
with open("city.list.json", "r", encoding="utf-8") as read:
    cities = json.load(read)

# Filter cities for Nigeria (country code = 'NG')
nigeria_cities = [c for c in cities if c["country"] == "NG"]

# Extract just the city names into a list
nigeria_city_names = [c["name"] for c in nigeria_cities]

# Print some details
print(f"Total Nigerian cities found: {len(nigeria_city_names)}")
print(nigeria_city_names[:30])  # print first 30 cities for preview


Total Nigerian cities found: 521
['Zuru', 'Zungeru', 'Zaria', 'Zango', 'Zalanga', 'Zaki Biam', 'Zadawa', 'Yuli', 'Yola', 'Yenagoa', 'Yelwa', 'Yashikira', 'Yandev', 'Yanda Bayo', 'Yamrat', 'Yajiwa', 'Wuyo', 'Wusasa', 'Wurno', 'Wukari', 'Wudil', 'Wawa', 'Wauro Jabbe', 'Wasagu', 'Warri', 'Wamba', 'Walmayo', 'Wagini', 'Vom', 'Uyo']


In [5]:
def fetch_weather(city: str) -> dict | None:
    """Fetches weather data for a given city from the OpenWeatherMap API."""
    api_url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={OPENWEATHER_API_KEY}"
    try:
        response = requests.get(api_url, timeout=10)
        response.raise_for_status()  # Raises an HTTPError for bad responses (4xx or 5xx)
        return response.json()
    except requests.exceptions.RequestException as e:
        logging.error(f"API request failed for {city}: {e}")
        return None

def process_weather_data(city: str, data: dict) -> dict | None:
    """Processes raw JSON data into a structured dictionary."""
    if not data:
        return None
    try:
        processed = {
            "city_name": city,
            # Convert temperature from Kelvin to Celsius
            "temperature": round(data['main']['temp'] - 273.15, 2),
            "humidity": data['main']['humidity'],
            "pressure": data['main']['pressure'],
            "wind_speed": data['wind']['speed'],
            "weather_main": data['weather'][0]['main'],
            "weather_desc": data['weather'][0]['description'],
            # Convert Unix timestamp to a timezone-aware datetime object
            "reading_timestamp": datetime.fromtimestamp(data['dt'], tz=timezone.utc)
        }
        return processed
    except (KeyError, IndexError) as e:
        logging.error(f"Error processing data for {city}. Missing key: {e}")
        return None


In [None]:
import logging
import pandas as pd
import os

def weather_pipeline_job():
    """Main ETL process for all Nigerian cities."""
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    logging.info("Starting weather data pipeline job...")

    # Ensure data folder exists
    os.makedirs("./data", exist_ok=True)

    all_raw_data = []
    all_processed_data = []

    for city in nigeria_city_names:
        logging.info(f"Fetching weather data for {city}...")

        try:
            raw_data = fetch_weather(city)

            # Skip empty or failed responses
            if not raw_data:
                logging.warning(f"No data returned for {city}, skipping.")
                continue

            # Normalize raw API data safely
            pd_raw_data = pd.json_normalize(raw_data)
            pd_raw_data["city"] = city
            all_raw_data.append(pd_raw_data)

            # Processed data (assumes your function returns dict or list of dicts)
            processed_data = process_weather_data(city, raw_data)
            pd_processed_data = pd.DataFrame([processed_data])
            pd_processed_data["city"] = city
            all_processed_data.append(pd_processed_data)

        except Exception as e:
            logging.error(f"Error processing {city}: {e}")

    # Combine and save once after the loop
    if all_raw_data:
        df_raw = pd.concat(all_raw_data, ignore_index=True)
        df_raw.to_csv("./data/raw.csv", index=False)
        logging.info(f"Saved raw data for {len(df_raw)} cities.")

    if all_processed_data:
        df_processed = pd.concat(all_processed_data, ignore_index=True)
        df_processed.to_csv("./data/processed.csv", index=False)
        logging.info(f"Saved processed data for {len(df_processed)} cities.")

    logging.info("Weather data pipeline job finished.")



In [28]:
weather_pipeline_job()

2025-10-17 12:23:28,348 - INFO - Starting weather data pipeline job...
2025-10-17 12:26:26,931 - INFO - Weather data pipeline job finished.
