In [5]:
import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()

DB_CONFIG = {
    "host": os.getenv("DB_HOST"),
    "dbname": os.getenv("DB_NAME"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "port": os.getenv("DB_PORT", 5432)
}

def create_weather_table(cursor):
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS weather_data (
            id SERIAL PRIMARY KEY,
            timestamp TIMESTAMPTZ,
            city TEXT,
            description TEXT,
            temperature REAL,
            humidity INTEGER,
            pressure INTEGER,
            wind_speed REAL
        )
    ''')

def insert_weather_data(cursor, weather):
    cursor.execute('''
        INSERT INTO weather_data (
            timestamp, city, description, temperature,
            humidity, pressure, wind_speed
        ) VALUES (%s, %s, %s, %s, %s, %s, %s)
    ''', (
        weather["timestamp"],
        weather["city"],
        weather["description"],
        weather["temperature"],
        weather["humidity"],
        weather["pressure"],
        weather["wind_speed"]
    ))

def load_to_db(weather):
    if not weather:
        print("[INFO] No weather data to load.")
        return

    try:
        with psycopg2.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cur:
                create_weather_table(cur)
                insert_weather_data(cur, weather)
        print("[INFO] Weather data loaded.")
    except psycopg2.DatabaseError as e:
        print(f"[ERROR] DB error: {e}")


In [3]:
import os
import logging
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Any, Optional
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
import requests
import psycopg2
from psycopg2.extras import execute_batch
from psycopg2 import pool
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Global connection pool
DB_POOL = None

# --- Configuration ---
class Config:
    """Configuration class to centralize settings"""
    # API settings
    OPENWEATHER_API_KEY = os.getenv("OPENWEATHER_API_KEY")
    CITIES = os.getenv("CITIES", "Nairobi,London,New York,Tokyo").split(",")
    WEATHER_UNITS = os.getenv("WEATHER_UNITS", "metric")
    API_TIMEOUT = int(os.getenv("API_TIMEOUT", "10"))
    
    # Database settings  
    DB_CONFIG = {
        "host": os.getenv("DB_HOST", "localhost"),
        "dbname": os.getenv("DB_NAME", "weather_db"),
        "user": os.getenv("DB_USER", "postgres"),
        "password": os.getenv("DB_PASSWORD", ""),
        "port": int(os.getenv("DB_PORT", "5432"))
    }
    
    # Email settings
    EMAIL_ENABLED = os.getenv("EMAIL_ENABLED", "false").lower() == "true"
    EMAIL_SENDER = os.getenv("EMAIL_SENDER", "")
    EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "")
    EMAIL_RECIPIENT = os.getenv("EMAIL_RECIPIENT", "")
    SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.gmail.com")
    SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
    
    # Schedule settings
    SCHEDULE_INTERVAL_HOURS = int(os.getenv("SCHEDULE_INTERVAL_HOURS", "24"))

# --- Database Functions ---
def initialize_db():
    """Initialize database connection pool and create tables if needed"""
    global DB_POOL
    
    try:
        # Create connection pool
        DB_POOL = pool.SimpleConnectionPool(
            1, 10, **Config.DB_CONFIG
        )
        
        # Create tables if they don't exist
        with DB_POOL.getconn() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    CREATE TABLE IF NOT EXISTS weather_data (
                        id SERIAL PRIMARY KEY,
                        timestamp TIMESTAMPTZ,
                        city TEXT,
                        description TEXT,
                        temperature REAL,
                        feels_like REAL,
                        humidity INTEGER,
                        pressure INTEGER,
                        wind_speed REAL,
                        visibility INTEGER,
                        sunrise TIMESTAMPTZ,
                        sunset TIMESTAMPTZ,
                        latitude REAL,
                        longitude REAL
                    )
                """)
                
                # Create index on city and timestamp for faster queries
                cur.execute("""
                    CREATE INDEX IF NOT EXISTS idx_weather_city_timestamp 
                    ON weather_data (city, timestamp)
                """)
                
            conn.commit()
            DB_POOL.putconn(conn)
        
        logger.info("Database initialized successfully")
        return True
    except Exception as e:
        logger.error(f"Database initialization error: {e}")
        return False

# --- Tasks ---
@task(
    retries=3,
    retry_delay_seconds=30,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def extract_weather_data(city: str) -> Optional[Dict[str, Any]]:
    """Fetch weather data from OpenWeather API
    
    Args:
        city: Name of the city to get weather data for
        
    Returns:
        Dictionary containing weather data or None if retrieval failed
    """
    logger = get_run_logger()
    
    if not Config.OPENWEATHER_API_KEY:
        logger.error("OpenWeather API key not set")
        return None
        
    url = (
        f"https://api.openweathermap.org/data/2.5/weather"
        f"?q={city}&appid={Config.OPENWEATHER_API_KEY}&units={Config.WEATHER_UNITS}"
    )
    
    try:
        logger.info(f"Fetching data for {city}")
        response = requests.get(url, timeout=Config.API_TIMEOUT)
        
        # Handle API errors
        if response.status_code == 404:
            logger.error(f"City not found: {city}")
            return None
        elif response.status_code == 429:
            logger.error("API rate limit exceeded")
            # Wait longer before retry
            raise requests.exceptions.RequestException("Rate limit exceeded")
        elif response.status_code != 200:
            logger.error(f"API error: {response.status_code} - {response.text}")
            return None
            
        data = response.json()
        
        # Extract data with validation
        current_time = datetime.now(timezone.utc)
        
        # Sunrise and sunset times from UTC timestamps
        sunrise = datetime.fromtimestamp(data["sys"].get("sunrise", 0), tz=timezone.utc)
        sunset = datetime.fromtimestamp(data["sys"].get("sunset", 0), tz=timezone.utc)
        
        return {
            "timestamp": current_time,
            "city": data.get("name", city),
            "description": data["weather"][0].get("description", "Unknown"),
            "temperature": data["main"].get("temp"),
            "feels_like": data["main"].get("feels_like"),
            "humidity": data["main"].get("humidity", 0),
            "pressure": data["main"].get("pressure", 0),
            "wind_speed": data["wind"].get("speed", 0),
            "visibility": data.get("visibility", 0),
            "sunrise": sunrise,
            "sunset": sunset,
            "latitude": data["coord"].get("lat", 0),
            "longitude": data["coord"].get("lon", 0)
        }
    except requests.exceptions.Timeout:
        logger.error(f"Request timed out for {city}")
        return None
    except requests.exceptions.ConnectionError:
        logger.error(f"Connection error when fetching data for {city}")
        return None
    except requests.exceptions.RequestException as e:
        logger.error(f"Request error for {city}: {e}")
        return None
    except KeyError as e:
        logger.error(f"Parsing error for {city}: Missing key {e}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error extracting data for {city}: {e}")
        return None

@task(retries=2, retry_delay_seconds=15)
def load_to_db(weather_data_list: List[Dict[str, Any]]) -> bool:
    """Load weather data into PostgreSQL
    
    Args:
        weather_data_list: List of weather data dictionaries
        
    Returns:
        Boolean indicating success or failure
    """
    logger = get_run_logger()
    
    # Filter out None values (failed extractions)
    valid_data = [data for data in weather_data_list if data is not None]
    
    if not valid_data:
        logger.warning("No valid data to load")
        return False
    
    try:
        conn = DB_POOL.getconn()
        with conn.cursor() as cur:
            # Use batch insert for efficiency
            query = """
                INSERT INTO weather_data (
                    timestamp, city, description, temperature, feels_like,
                    humidity, pressure, wind_speed, visibility, 
                    sunrise, sunset, latitude, longitude
                ) VALUES (
                    %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
                )
                ON CONFLICT DO NOTHING
            """
            
            data_tuples = [
                (
                    data["timestamp"],
                    data["city"],
                    data["description"],
                    data["temperature"],
                    data["feels_like"],
                    data["humidity"],
                    data["pressure"],
                    data["wind_speed"],
                    data["visibility"],
                    data["sunrise"],
                    data["sunset"],
                    data["latitude"],
                    data["longitude"]
                )
                for data in valid_data
            ]
            
            execute_batch(cur, query, data_tuples)
            
        conn.commit()
        DB_POOL.putconn(conn)
        logger.info(f"Successfully loaded {len(valid_data)} records")
        return True
    except Exception as e:
        logger.error(f"Database error: {e}")
        # Try to return connection to pool if it was acquired
        if 'conn' in locals():
            DB_POOL.putconn(conn)
        return False

@task
def generate_weather_report() -> Optional[str]:
    """Generate weekly weather report with statistics
    
    Returns:
        String containing HTML report or None if generation failed
    """
    logger = get_run_logger()
    
    # Skip if not Sunday and not in debug mode
    today = datetime.now()
    is_sunday = today.weekday() == 6
    debug_mode = os.getenv("DEBUG_REPORT", "false").lower() == "true"
    
    if not is_sunday and not debug_mode:
        logger.info("Skipping report (not Sunday)")
        return None
    
    try:
        conn = DB_POOL.getconn()
        with conn.cursor() as cur:
            # Get data for the past week
            one_week_ago = datetime.now(timezone.utc) - timedelta(days=7)
            
            # Query average temperatures by city for the past week
            cur.execute("""
                SELECT 
                    city, 
                    AVG(temperature) as avg_temp,
                    MIN(temperature) as min_temp,
                    MAX(temperature) as max_temp,
                    AVG(humidity) as avg_humidity
                FROM weather_data
                WHERE timestamp > %s
                GROUP BY city
                ORDER BY city
            """, (one_week_ago,))
            
            results = cur.fetchall()
            
        DB_POOL.putconn(conn)
        
        if not results:
            logger.warning("No data for report")
            return None
            
        # Generate HTML report
        html = f"""
        <html>
        <head>
            <style>
                body {{ font-family: Arial, sans-serif; }}
                table {{ border-collapse: collapse; width: 100%; }}
                th, td {{ padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }}
                th {{ background-color: #f2f2f2; }}
                .header {{ background-color: #4CAF50; color: white; padding: 15px; }}
            </style>
        </head>
        <body>
            <div class="header">
                <h1>Weekly Weather Report</h1>
                <p>Period: {one_week_ago.strftime('%Y-%m-%d')} to {today.strftime('%Y-%m-%d')}</p>
            </div>
            
            <h2>Temperature Summary</h2>
            <table>
                <tr>
                    <th>City</th>
                    <th>Avg Temp (°C)</th>
                    <th>Min Temp (°C)</th>
                    <th>Max Temp (°C)</th>
                    <th>Avg Humidity (%)</th>
                </tr>
        """
        
        for city, avg_temp, min_temp, max_temp, avg_humidity in results:
            html += f"""
                <tr>
                    <td>{city}</td>
                    <td>{avg_temp:.1f}</td>
                    <td>{min_temp:.1f}</td>
                    <td>{max_temp:.1f}</td>
                    <td>{avg_humidity:.0f}</td>
                </tr>
            """
            
        html += """
            </table>
            <p>This report was automatically generated by the Weather ETL Pipeline.</p>
        </body>
        </html>
        """
        
        return html
    except Exception as e:
        logger.error(f"Report generation error: {e}")
        # Try to return connection to pool if it was acquired
        if 'conn' in locals():
            DB_POOL.putconn(conn)
        return None

@task
def send_email_report(report_html: Optional[str]) -> bool:
    """Send email with weather report
    
    Args:
        report_html: HTML content of the report
        
    Returns:
        Boolean indicating success or failure
    """
    logger = get_run_logger()
    
    if not report_html:
        logger.info("No report to send")
        return False
        
    if not Config.EMAIL_ENABLED:
        logger.info("Email sending disabled")
        return False
        
    if not all([Config.EMAIL_SENDER, Config.EMAIL_PASSWORD, Config.EMAIL_RECIPIENT]):
        logger.error("Email configuration incomplete")
        return False
    
    try:
        msg = MIMEMultipart()
        msg['From'] = Config.EMAIL_SENDER
        msg['To'] = Config.EMAIL_RECIPIENT
        msg['Subject'] = f"Weekly Weather Report - {datetime.now().strftime('%Y-%m-%d')}"
        
        msg.attach(MIMEText(report_html, 'html'))
        
        with smtplib.SMTP(Config.SMTP_SERVER, Config.SMTP_PORT) as server:
            server.starttls()
            server.login(Config.EMAIL_SENDER, Config.EMAIL_PASSWORD)
            server.send_message(msg)
            
        logger.info("Email report sent successfully")
        return True
    except Exception as e:
        logger.error(f"Email sending failed: {e}")
        return False

# --- Main Flow ---
@flow(name="Weather Data Pipeline")
def weather_pipeline():
    """Main flow for weather data collection and reporting"""
    logger = get_run_logger()
    logger.info("Starting weather pipeline")
    
    # Initialize database
    if not initialize_db():
        logger.error("Database initialization failed, aborting pipeline")
        return
    
    # 1. Extract data for all cities in parallel
    weather_data_list = []
    for city in Config.CITIES:
        result = extract_weather_data(city)
        if result:
            weather_data_list.append(result)
    
    # 2. Load data to database
    load_success = load_to_db(weather_data_list)
    
    # 3. Generate and send weekly report
    if load_success:
        report_html = generate_weather_report()
        if report_html:
            send_email_report(report_html)
    
    logger.info("Weather pipeline completed")

# --- Run the flow ---
if __name__ == "__main__":
    weather_pipeline()

# --- Create deployment (uncomment to deploy) ---
if __name__ == "__main__":
#     # For Prefect 2.x deployments, use the CLI or the following:
#     # Import needed only if deploying
    from prefect.deployments import Deployment
    from prefect.orion.schemas.schedules import IntervalSchedule
    
    deployment = Deployment.build_from_flow(
        flow=weather_pipeline,
        name="scheduled-weather-pipeline",
        schedule=IntervalSchedule(interval=timedelta(hours=Config.SCHEDULE_INTERVAL_HOURS)),
        work_queue_name="default"
    )
    deployment.apply()

#     # For newer Prefect versions (preferred method):
#     # weather_pipeline.serve(
#     #     name="scheduled-weather-pipeline",
#     #     interval=timedelta(hours=Config.SCHEDULE_INTERVAL_HOURS)
#     # )

PrefectImportError: `prefect.deployments:Deployment` has been removed. Use `flow.serve()`, `flow.deploy()`, or `prefect deploy` instead.