In [None]:
#!/usr/bin/env python3
"""
combined_weather_etl_optionB.py

Combines Wunderground scraping + METAR (aviationweather.gov) cache polling.

Behaviour (Option B):
 - Scheduled at :25 and :55 every hour.
 - At scheduled times: high-frequency polling (every 10s) of METAR until expected report arrives (or timeout).
 - Background checks every BACKGROUND_POLL_SECONDS update Wunderground values in memory.
 - Any Wunderground-only change will trigger a DB insert (so database reflects Wunderground changes even between METAR reports).
 - METAR rows (when found) are still authoritative for report_time; when available METAR report_time is used as server_timestamp_parsed.
 - Each DB insert prints the latest rows (n=5 by default) to let you monitor progress.

Notes about deprecation warning:
 - BeautifulSoup `.find(..., text=...)` usage was replaced with `.find_all(string=...)` and other `string=` usages to avoid the DeprecationWarning.

Author: refactor for user's pipeline
"""

from __future__ import annotations
import time
import gzip
import io
import logging
import re
import sqlite3
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any, Tuple, List

import requests
from bs4 import BeautifulSoup
import pandas as pd

try:
    from zoneinfo import ZoneInfo
except Exception:
    ZoneInfo = None

# ---------------- CONFIG ----------------
WUNDER_URL = "https://www.wunderground.com/weather/gb/london/EGLC"
WUNDER_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
    "Accept-Language": "en-US,en;q=0.9",
}
METAR_CACHE_URL = "https://aviationweather.gov/data/cache/metars.cache.csv.gz"
METAR_USER_AGENT = "eglc-metar-scheduler/1.0 (+your_email_or_contact)"

SQLITE_DB = "combined_weather_optionB.db"

SCHEDULE_TZ = "Europe/London"
TARGET_MINUTES = [25, 55]

BACKGROUND_POLL_SECONDS = 300  # 5 minutes
HIGH_FREQ_POLL_SECONDS = 10
HIGH_FREQ_TIMEOUT_MINUTES = 10

MAX_RETRIES = 5
INITIAL_BACKOFF = 5  # seconds

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("combined_etl_optionB")


# ---------------- Time / tz helpers ----------------

def get_zone(tz_name: str):
    """Return a tzinfo object for tz_name; fallback to UTC if zoneinfo is missing."""
    if ZoneInfo:
        try:
            return ZoneInfo(tz_name)
        except Exception:
            return timezone.utc
    return timezone.utc


LONDON = get_zone(SCHEDULE_TZ)


def now_in_london() -> datetime:
    """Return the current datetime with Europe/London tzinfo (or UTC fallback)."""
    if ZoneInfo:
        return datetime.now(LONDON)
    return datetime.now(timezone.utc) + timedelta(hours=0)


# ---------------- Wunderground scraping ----------------

def fetch_wunderground_html(timeout=10) -> Optional[str]:
    """
    Fetch Wunderground page HTML. Returns the raw HTML text or None on failure.
    """
    try:
        r = requests.get(WUNDER_URL, headers=WUNDER_HEADERS, timeout=timeout)
        r.raise_for_status()
        return r.text
    except Exception as e:
        logger.warning("Wunderground fetch error: %s", e)
        return None


def parse_wunder_timestamp(ts_str: str) -> Optional[datetime]:
    """
    Parse Wunderground timestamp strings such as:
      "1:15 PM BST on September 21, 2025"
    Return a timezone-aware datetime in Europe/London when possible, otherwise None.
    """
    if not ts_str:
        return None
    try:
        # Remove known timezone tokens and "on"
        ts_clean = ts_str.replace("BST", "").replace("GMT", "").replace("on", "").strip()
        parsed = datetime.strptime(ts_clean, "%I:%M %p %B %d, %Y")
        if ZoneInfo:
            # attach Europe/London tzinfo (this will reflect BST/GMT depending on date)
            parsed = parsed.replace(tzinfo=LONDON)
        else:
            # fallback: assume +1 hour (BST)
            parsed = parsed.replace(tzinfo=timezone(timedelta(hours=1)))
        return parsed
    except Exception:
        logger.debug("Failed to parse Wunderground timestamp: %s", ts_str)
        return None


def extract_wunder_data(html: str) -> Dict[str, Any]:
    """
    Extract relevant fields from the Wunderground HTML. Returns a dict containing:
      - wunder_current_temp (string or None)
      - wunder_high_temp (string or None)
      - wunder_low_temp (string or None)
      - wunder_server_timestamp_raw (string or None)
      - wunder_server_timestamp_parsed (datetime or None)
    Uses robust selectors and avoids find(..., text=...) deprecation by using `string=` or searching all strings.
    """
    soup = BeautifulSoup(html, "html.parser")

    # Attempt several selectors for current temperature (site variations)
    current_temp = None
    current_sel_candidates = [
        ".current-temp .wu-value",
        "div.current-temp span.wu-value",
        "span.wu-value.wu-value-to",
        "span.current-temp",
        "div.current-temp",
        "span.wx-value",
        "p.current-temp",
    ]
    for selector in current_sel_candidates:
        el = soup.select_one(selector)
        if el and el.get_text(strip=True):
            current_temp = el.get_text(strip=True)
            break

    # hi / low
    high_temp = None
    low_temp = None
    high_low_selectors = [
        (".hi-lo .hi", ".hi-lo .low"),
        (".temperature-hi", ".temperature-low"),
        (".wx-temperature .wx-value.high", ".wx-temperature .wx-value.low"),
    ]
    for high_sel, low_sel in high_low_selectors:
        he = soup.select_one(high_sel)
        le = soup.select_one(low_sel)
        if he and he.get_text(strip=True):
            high_temp = he.get_text(strip=True)
        if le and le.get_text(strip=True):
            low_temp = le.get_text(strip=True)
        if high_temp and low_temp:
            break

    # server timestamp: find any string that matches the pattern
    server_ts_text = None
    timestamp_pattern = re.compile(r"\d{1,2}:\d{2} [AP]M (?:BST|GMT) on [A-Za-z]+ \d{1,2}, \d{4}")
    # Avoid soup.find(..., text=...) deprecation: use .find_all(string=True) and search strings
    for s in soup.find_all(string=True):
        if not s or not isinstance(s, str):
            continue
        m = timestamp_pattern.search(s)
        if m:
            server_ts_text = m.group()
            break

    parsed = parse_wunder_timestamp(server_ts_text) if server_ts_text else None

    return {
        "wunder_current_temp": current_temp,
        "wunder_high_temp": high_temp,
        "wunder_low_temp": low_temp,
        "wunder_server_timestamp_raw": server_ts_text,
        "wunder_server_timestamp_parsed": parsed,
    }


# ---------------- METAR cache parsing ----------------

def parse_gz_csv_bytes(gz_bytes: bytes):
    """
    Yield rows from the gzipped CSV cache (simple safe parser).
    Each yielded row is a dict mapping header->value.
    """
    bio = io.BytesIO(gz_bytes)
    with gzip.GzipFile(fileobj=bio, mode="rb") as gf:
        text = gf.read().decode("utf-8", errors="replace")
    lines = text.splitlines()
    if not lines:
        return
    header = lines[0].split(",")
    for line in lines[1:]:
        vals = line.split(",")
        row = {header[i]: vals[i] if i < len(vals) else "" for i in range(len(header))}
        yield row


class RateLimitException(Exception):
    """Raised when METAR cache returns 429"""
    pass


def fetch_metar_cache(timeout=30) -> Tuple[Optional[Dict[str, Any]], Optional[int]]:
    """
    Fetch the METAR gz cache and return the row dict for EGLC (if any) and the HTTP status code.
    Raises RateLimitException on 429 responses to allow exponential backoff handling.
    """
    headers = {"User-Agent": METAR_USER_AGENT}
    try:
        r = requests.get(METAR_CACHE_URL, headers=headers, timeout=timeout)
        status_code = r.status_code
        if status_code == 429:
            raise RateLimitException("429 Too Many Requests from METAR cache")
        r.raise_for_status()
        gz = r.content
        for row in parse_gz_csv_bytes(gz):
            station = (row.get("icaoId") or row.get("station") or row.get("station_id") or row.get("icao") or "").upper()
            if station == "EGLC":
                return row, status_code
        return None, status_code
    except RateLimitException:
        raise
    except Exception as e:
        logger.warning("METAR cache fetch error: %s", e)
        return None, getattr(e, "response", None) and getattr(e.response, "status_code", None)


def extract_time_from_metar_text(raw_text: str) -> Optional[datetime]:
    """
    Extract the ddhhmmZ token from raw METAR text and return a tz-aware datetime in Europe/London.
    Returns None if extraction/parsing fails.
    """
    if not raw_text:
        return None
    m = re.search(r'\s(\d{2})(\d{2})(\d{2})Z', raw_text)
    if not m:
        return None
    day_s, hour_s, minute_s = m.groups()
    try:
        now_utc = datetime.now(timezone.utc)
        d = int(day_s); h = int(hour_s); mn = int(minute_s)
        # guess month/year
        if d > now_utc.day:
            # previous month
            if now_utc.month == 1:
                month = 12; year = now_utc.year - 1
            else:
                month = now_utc.month - 1; year = now_utc.year
        else:
            month = now_utc.month; year = now_utc.year
        obs_utc = datetime(year, month, d, h, mn, tzinfo=timezone.utc)
        if ZoneInfo:
            return obs_utc.astimezone(LONDON)
        return obs_utc + timedelta(hours=1)
    except Exception:
        return None


def normalize_metar_row(row: Dict[str, Any]) -> Dict[str, Any]:
    """
    Normalize the METAR row from the cache to a simple dict with:
      - raw_text
      - temp_c (float or None)
      - report_time (ISO string local, or None)
      - raw_row (original dict)
    """
    raw_text = row.get("rawOb") or row.get("raw_text") or row.get("rawMetar") or row.get("raw") or row.get("METAR") or ""
    temp_val = row.get("temp") or row.get("temperature") or ""
    temp_c = None
    try:
        if temp_val not in (None, ""):
            temp_c = float(temp_val)
    except Exception:
        temp_c = None

    if temp_c is None and raw_text:
        # try raw extraction like "12/08" or "M01/M02"
        m = re.search(r'\s(M?\d{1,2})/(M?\d{1,2})\s', raw_text)
        if m:
            t = m.group(1)
            if t.startswith("M"):
                temp_c = -float(t[1:])
            else:
                temp_c = float(t)

    report_time_iso = row.get("reportTime") or row.get("report_time")
    if not report_time_iso and raw_text:
        rt = extract_time_from_metar_text(raw_text)
        if rt:
            report_time_iso = rt.isoformat()

    return {
        "raw_text": raw_text,
        "temp_c": temp_c,
        "report_time": report_time_iso,
        "raw_row": row
    }


def c_to_f(c: Optional[float]) -> Optional[float]:
    """Convert Celsius to Fahrenheit (rounded to 1 decimal)."""
    if c is None:
        return None
    return round((c * 9.0 / 5.0) + 32.0, 1)


# ---------------- SQLite DB helpers ----------------

def setup_database(conn: sqlite3.Connection):
    """
    Create (if missing) the weather_combined table matching the example schema:
      id, wunder_current_temp, wunder_high_temp, wunder_low_temp,
      weather_30m_temp_F, weather_30m_temp_C,
      server_timestamp_raw, server_timestamp_parsed, recorded_at
    """
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS weather_combined (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            wunder_current_temp TEXT,
            wunder_high_temp TEXT,
            wunder_low_temp TEXT,
            weather_30m_temp_F REAL,
            weather_30m_temp_C REAL,
            server_timestamp_raw TEXT,
            server_timestamp_parsed TEXT,
            recorded_at TEXT
        )
    ''')
    conn.commit()


def get_latest_row(conn: sqlite3.Connection) -> Optional[Dict[str, Any]]:
    """Return the latest row as a dict or None if DB empty."""
    try:
        df = pd.read_sql_query("SELECT * FROM weather_combined ORDER BY id DESC LIMIT 1", conn)
        if df.empty:
            return None
        return df.iloc[0].to_dict()
    except Exception:
        return None


def latest_server_raw(conn: sqlite3.Connection) -> Optional[str]:
    """Return the latest server_timestamp_raw value from DB (or None)."""
    row = get_latest_row(conn)
    return row.get("server_timestamp_raw") if row else None


def should_save_based_on_parsed(conn: sqlite3.Connection, parsed_dt: datetime) -> bool:
    """
    Decide whether to save based on parsed datetime vs last saved parsed datetime.
    If DB has no parsed timestamps, returns True.
    """
    cursor = conn.cursor()
    cursor.execute('''
        SELECT server_timestamp_parsed FROM weather_combined
        WHERE server_timestamp_parsed IS NOT NULL
        ORDER BY id DESC LIMIT 1
    ''')
    row = cursor.fetchone()
    if not row or not row[0]:
        return True
    last_txt = row[0]
    # try to parse last_txt as dd/mm/YYYY HH:MM then fallback to ISO
    for fmt in ("%d/%m/%Y %H:%M", "%Y-%m-%dT%H:%M:%S"):
        try:
            last_dt = datetime.strptime(last_txt, fmt)
            # attach tz if naive
            if last_dt.tzinfo is None:
                last_dt = last_dt.replace(tzinfo=LONDON)
            return parsed_dt > last_dt
        except Exception:
            continue
    # fallback: try pandas
    try:
        last_dt = pd.to_datetime(last_txt)
        if last_dt.tzinfo is None:
            last_dt = last_dt.tz_localize(LONDON)
        return parsed_dt > last_dt.to_pydatetime()
    except Exception:
        # when unsure, default to True to avoid missed saves
        return True


def save_combined_row(conn: sqlite3.Connection, wunder_data: Dict[str, Any], metar_row: Dict[str, Any]) -> int:
    """
    Save a combined row to the DB. Inputs:
      - wunder_data: dict produced by extract_wunder_data (may be partial)
      - metar_row: dict produced by normalize_metar_row (may be empty for Wunderground-only saves)
    Returns the inserted row id.
    """
    # choose server_timestamp_raw: prefer metar raw_text if available, otherwise wunder raw
    server_raw = metar_row.get("raw_text") if metar_row and metar_row.get("raw_text") else wunder_data.get("wunder_server_timestamp_raw")
    # choose parsed: prefer metar.report_time if present else wunder parsed
    server_parsed = None
    if metar_row and metar_row.get("report_time"):
        # ensure local tz and format dd/mm/YYYY HH:MM for consistency with your example
        try:
            dt = datetime.fromisoformat(metar_row["report_time"])
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc).astimezone(LONDON)
            else:
                dt = dt.astimezone(LONDON)
            server_parsed = dt.strftime("%d/%m/%Y %H:%M")
        except Exception:
            server_parsed = metar_row["report_time"]
    else:
        wsp = wunder_data.get("wunder_server_timestamp_parsed")
        if isinstance(wsp, datetime):
            wsp_local = wsp.astimezone(LONDON) if wsp.tzinfo else wsp.replace(tzinfo=LONDON)
            server_parsed = wsp_local.strftime("%d/%m/%Y %H:%M")
        else:
            # no parsed time available — keep None or raw string if that's desired
            server_parsed = wunder_data.get("wunder_server_timestamp_raw")

    recorded_at_dt = now_in_london()
    recorded_at = recorded_at_dt.strftime("%d/%m/%Y %H:%M")

    temp_c = metar_row.get("temp_c") if metar_row else None
    temp_f = c_to_f(temp_c) if temp_c is not None else None

    cursor = conn.cursor()
    cursor.execute('''
        INSERT INTO weather_combined (
            wunder_current_temp,
            wunder_high_temp,
            wunder_low_temp,
            weather_30m_temp_F,
            weather_30m_temp_C,
            server_timestamp_raw,
            server_timestamp_parsed,
            recorded_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    ''', (
        wunder_data.get("wunder_current_temp"),
        wunder_data.get("wunder_high_temp"),
        wunder_data.get("wunder_low_temp"),
        temp_f,
        temp_c,
        server_raw,
        server_parsed,
        recorded_at
    ))
    conn.commit()
    rowid = cursor.lastrowid

    # print the latest rows every time we insert so you can evaluate the process
    print_latest_rows(conn, n=5)
    return rowid


def print_latest_rows(conn: sqlite3.Connection, n=5):
    """
    Print the latest `n` rows from the DB as a nicely formatted table (pandas).
    Helpful for monitoring and evaluation.
    """
    try:
        df = pd.read_sql_query(f"SELECT * FROM weather_combined ORDER BY id DESC LIMIT {n}", conn)
        if df.empty:
            logger.info("DB is empty.")
        else:
            # use logger.info for consistent log output
            logger.info("Latest %d rows:\n%s", n, df.to_string(index=False))
    except Exception as e:
        logger.warning("Could not print latest rows: %s", e)


# ---------------- Scheduling ----------------

def generate_schedule(start_time: datetime, hours_ahead: int = 24) -> List[datetime]:
    """
    Generate scheduled datetime objects (tz-aware in LONDON) for the next hours_ahead hours at TARGET_MINUTES.
    """
    schedule = []
    current = start_time.replace(second=0, microsecond=0)
    for hour_offset in range(hours_ahead):
        for minute in TARGET_MINUTES:
            scheduled = (current + timedelta(hours=hour_offset)).replace(minute=minute)
            schedule.append(scheduled)
    schedule = [t for t in schedule if t >= current]
    schedule.sort()
    return schedule


def get_next_scheduled_time(schedule: List[datetime], current_time: datetime) -> Optional[datetime]:
    """Return the next scheduled datetime greater than current_time, or None."""
    for t in schedule:
        if t > current_time:
            return t
    return None


def get_expected_report_time(scheduled_time: datetime) -> str:
    """
    Return the expected report_time string used in METAR comparison.
    We use an ISO-like local string without seconds: YYYY-MM-DDTHH:MM:00
    """
    return scheduled_time.strftime("%Y-%m-%dT%H:%M:00")


# ---------------- Polling / Backoff logic ----------------

def handle_rate_limit(attempt: int) -> bool:
    """
    Sleep with exponential backoff on rate limiting. Returns True if it slept and we should retry.
    Returns False when MAX_RETRIES exceeded.
    """
    if attempt >= MAX_RETRIES:
        logger.error("Max retries exceeded for rate limiting.")
        return False
    backoff = INITIAL_BACKOFF * (2 ** attempt)
    logger.warning("Rate limited. Sleeping %d seconds (attempt %d/%d)", backoff, attempt + 1, MAX_RETRIES)
    time.sleep(backoff)
    return True


def high_freq_poll(expected_report_time: str, conn: sqlite3.Connection, latest_wunder: Dict[str, Any]) -> bool:
    """
    High-frequency polling loop:
      - Poll METAR cache every HIGH_FREQ_POLL_SECONDS until we get expected_report_time or timeout.
      - Save any new METAR-based combined rows.
      - If METAR report found and equals expected_report_time, return True.
    """
    logger.info("Starting high-frequency polling for expected report: %s", expected_report_time)
    start = time.time()
    timeout = HIGH_FREQ_TIMEOUT_MINUTES * 60
    rate_limit_attempts = 0

    while time.time() - start < timeout:
        try:
            row, status = fetch_metar_cache()
            if status and status != 200:
                logger.info("METAR cache status: %s", status)
            if row:
                normalized = normalize_metar_row(row)
                # Determine if new based on parsed time if available
                report_iso = normalized.get("report_time")
                new_parsed_dt = None
                if report_iso:
                    try:
                        dt = datetime.fromisoformat(report_iso)
                        if dt.tzinfo is None:
                            dt = dt.replace(tzinfo=timezone.utc).astimezone(LONDON)
                        else:
                            dt = dt.astimezone(LONDON)
                        new_parsed_dt = dt
                    except Exception:
                        new_parsed_dt = None

                if new_parsed_dt:
                    if should_save_based_on_parsed(conn, new_parsed_dt):
                        saved_id = save_combined_row(conn, latest_wunder, normalized)
                        logger.info("Saved METAR-based row id=%d report_time=%s", saved_id, report_iso)
                    else:
                        logger.debug("METAR report not newer than last saved; skipping save.")
                else:
                    # If no parsed time available, save if raw_text differs from last server_raw
                    last_raw = latest_server_raw(conn)
                    candidate_raw = normalized.get("raw_text")
                    if candidate_raw and candidate_raw != last_raw:
                        saved_id = save_combined_row(conn, latest_wunder, normalized)
                        logger.info("Saved METAR-based row (no parsed time) id=%d", saved_id)

                # Check for expected_report_time match (compare datetimes if possible)
                try:
                    if report_iso:
                        cur_dt = datetime.fromisoformat(report_iso)
                        exp_dt = datetime.fromisoformat(expected_report_time)
                        if cur_dt.tzinfo is None:
                            cur_dt = cur_dt.replace(tzinfo=timezone.utc).astimezone(LONDON)
                        else:
                            cur_dt = cur_dt.astimezone(LONDON)
                        if exp_dt.tzinfo is None:
                            exp_dt = exp_dt.replace(tzinfo=timezone.utc).astimezone(LONDON)
                        else:
                            exp_dt = exp_dt.astimezone(LONDON)
                        if cur_dt == exp_dt:
                            logger.info("Received expected METAR report for scheduled time.")
                            return True
                except Exception:
                    # fallback to string equality
                    if report_iso == expected_report_time:
                        logger.info("Received expected METAR report (string match).")
                        return True

            rate_limit_attempts = 0
        except RateLimitException:
            if not handle_rate_limit(rate_limit_attempts):
                logger.error("Aborting high-frequency polling due to persistent rate limiting.")
                return False
            rate_limit_attempts += 1
            continue
        except Exception as e:
            logger.warning("High-frequency poll error: %s", e)

        time.sleep(HIGH_FREQ_POLL_SECONDS)

    logger.warning("High-frequency polling timed out after %d minutes", HIGH_FREQ_TIMEOUT_MINUTES)
    return False


# ---------------- Background Wunderground updates (Option B) ----------------

def background_check_and_update_wunder(latest_wunder: Dict[str, Any], conn: sqlite3.Connection) -> Dict[str, Any]:
    """
    Fetch and parse Wunderground in the background.
    If any of the Wunderground fields change (current_temp, high_temp, low_temp, timestamp raw/parsed),
    then:
      - Update the in-memory latest_wunder dict
      - Trigger a DB insert if the Wunderground values are different from the last saved DB row
        (we compare server_timestamp_raw when available; otherwise we assume change => save)
    Returns the updated latest_wunder dict.
    """
    html = fetch_wunderground_html()
    if not html:
        logger.info("Background Wunderground fetch failed; keeping last-known values.")
        return latest_wunder

    w = extract_wunder_data(html)
    changed = False
    for k in ("wunder_current_temp", "wunder_high_temp", "wunder_low_temp", "wunder_server_timestamp_raw", "wunder_server_timestamp_parsed"):
        # A None in the new data means "not found" — treat only non-None as updates
        if w.get(k) is not None and w.get(k) != latest_wunder.get(k):
            latest_wunder[k] = w[k]
            changed = True

    if changed:
        logger.info("Wunderground values changed: current=%s high=%s ts=%s",
                    latest_wunder.get("wunder_current_temp"),
                    latest_wunder.get("wunder_high_temp"),
                    latest_wunder.get("wunder_server_timestamp_raw"))

        # Decide whether to save based on last DB row:
        last_raw = latest_server_raw(conn)
        wunder_raw = latest_wunder.get("wunder_server_timestamp_raw")
        # If wunderground raw differs from last saved server_raw, save.
        # Also if parsed timestamp exists and is newer than last parsed, save.
        to_save = False
        if wunder_raw and wunder_raw != last_raw:
            to_save = True
        else:
            wsp = latest_wunder.get("wunder_server_timestamp_parsed")
            if isinstance(wsp, datetime):
                # compare parsed with last parsed using should_save_based_on_parsed
                try:
                    wsp_local = wsp.astimezone(LONDON) if wsp.tzinfo else wsp.replace(tzinfo=LONDON)
                    if should_save_based_on_parsed(conn, wsp_local):
                        to_save = True
                except Exception:
                    to_save = True  # be conservative and save if unsure

        if to_save:
            # No METAR row available here; pass empty metar_row — saved row will use wunder fields
            saved_id = save_combined_row(conn, latest_wunder, {})
            logger.info("Saved new row due to Wunderground-only change (id=%s)", saved_id)
        else:
            logger.info("Wunderground changed but not newer than last DB record; skipping save.")

    return latest_wunder


# ---------------- Main ----------------

def main():
    logger.info("Starting combined ETL pipeline (Option B)")

    conn = sqlite3.connect(SQLITE_DB, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
    setup_database(conn)

    # initialize latest_wunder dict
    latest_wunder: Dict[str, Any] = {
        "wunder_current_temp": None,
        "wunder_high_temp": None,
        "wunder_low_temp": None,
        "wunder_server_timestamp_raw": None,
        "wunder_server_timestamp_parsed": None
    }

    # initial background fetch and print
    latest_wunder = background_check_and_update_wunder(latest_wunder, conn)
    print_latest_rows(conn)

    # scheduling
    current_time = now_in_london()
    schedule = generate_schedule(current_time, hours_ahead=24)
    logger.info("Schedule generated for next 24 hours: %s", [t.strftime("%Y-%m-%d %H:%M") for t in schedule])
    next_sched = get_next_scheduled_time(schedule, current_time)

    while True:
        current_time = now_in_london()

        if next_sched and current_time >= next_sched:
            expected_report_time = get_expected_report_time(next_sched)
            logger.info("Scheduled time reached: %s -> expected_report=%s", next_sched.strftime("%Y-%m-%d %H:%M"), expected_report_time)

            # refresh Wunderground before high-frequency polling
            latest_wunder = background_check_and_update_wunder(latest_wunder, conn)

            success = high_freq_poll(expected_report_time, conn, latest_wunder)
            if success:
                logger.info("High-frequency polling succeeded for scheduled time %s", next_sched.strftime("%Y-%m-%d %H:%M"))
            else:
                logger.warning("Did not receive expected METAR report for scheduled time %s", next_sched.strftime("%Y-%m-%d %H:%M"))

            # advance schedule
            current_time = now_in_london()
            next_sched = get_next_scheduled_time(schedule, current_time)
            if not next_sched:
                logger.info("Regenerating schedule for next 24 hours")
                schedule = generate_schedule(current_time, hours_ahead=24)
                next_sched = get_next_scheduled_time(schedule, current_time)

        # background check and sleep until next scheduled time or background interval
        if next_sched:
            time_until_next = (next_sched - current_time).total_seconds()
            sleep_time = min(BACKGROUND_POLL_SECONDS, max(1, time_until_next))
            latest_wunder = background_check_and_update_wunder(latest_wunder, conn)
            logger.info("Sleeping %.1f seconds until next scheduled time %s", sleep_time, next_sched.strftime("%Y-%m-%d %H:%M"))
            time.sleep(sleep_time)
        else:
            latest_wunder = background_check_and_update_wunder(latest_wunder, conn)
            logger.info("No scheduled times found; sleeping %d seconds", BACKGROUND_POLL_SECONDS)
            time.sleep(BACKGROUND_POLL_SECONDS)


if __name__ == "__main__":
    main()


2025-09-21 16:15:30,599 [INFO] Starting combined ETL pipeline (Option B)
2025-09-21 16:15:30,845 [INFO] Wunderground values changed: current=60 high=62° ts=2:06 PM BST on September 21, 2025
2025-09-21 16:15:30,857 [INFO] Latest 5 rows:
 id wunder_current_temp wunder_high_temp wunder_low_temp weather_30m_temp_F weather_30m_temp_C              server_timestamp_raw server_timestamp_parsed      recorded_at
  1                  60              62°            None               None               None 2:06 PM BST on September 21, 2025        21/09/2025 14:06 21/09/2025 14:15
2025-09-21 16:15:30,858 [INFO] Saved new row due to Wunderground-only change (id=1)
2025-09-21 16:15:30,863 [INFO] Latest 5 rows:
 id wunder_current_temp wunder_high_temp wunder_low_temp weather_30m_temp_F weather_30m_temp_C              server_timestamp_raw server_timestamp_parsed      recorded_at
  1                  60              62°            None               None               None 2:06 PM BST on September 21, 