
# Daily Stock Price Retrieval & Storage (with Error Email Notification)




## 0. Install Dependencies (first time only)
Run in your **terminal** (not inside the notebook if your environment lacks internet access):
```bash
pip install yfinance pandas SQLAlchemy apscheduler pytz
```


In [None]:

# 1. Configuration Section — edit as needed
from __future__ import annotations
import os
import logging

# List of ticker symbols
TICKERS = [
    "AAPL", "MSFT", "NVDA", "AMZN", "GOOGL",
]

# Local timezone and execution time (24h format)
LOCAL_TZ = "America/Los_Angeles"
RUN_EVERY_DAY_AT = "18:00"  # e.g., 18:00 = 6 PM daily

# Database (SQLite). Replace with PostgreSQL/MySQL URI if needed
DB_URI = "sqlite:///prices.db"

# Email settings (used only for error notifications) — best via environment variables
SMTP_HOST = os.getenv("SMTP_HOST")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASS = os.getenv("SMTP_PASS")
EMAIL_TO  = os.getenv("EMAIL_TO")  # comma-separated allowed

# Retry config (if data fetch fails)
MAX_RETRIES = 3
RETRY_SLEEP_SECONDS = 3

# Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s | %(message)s")
logger = logging.getLogger("daily_prices_nb")
print("Configuration loaded.")


In [None]:

# 2. Import dependencies
import sys
import time
import traceback
from datetime import datetime
from typing import Iterable, List, Tuple, Dict, Any

import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text
from sqlalchemy.exc import IntegrityError, OperationalError
from pytz import timezone

print("Imports successful.")


In [None]:

# 3. Database initialization (SQLite)
CREATE_TABLE_SQL = '''
CREATE TABLE IF NOT EXISTS prices (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    ticker TEXT NOT NULL,
    date TEXT NOT NULL,        -- ISO date (YYYY-MM-DD)
    open REAL,
    high REAL,
    low REAL,
    close REAL,
    adj_close REAL,
    volume INTEGER,
    ts_ingested TEXT NOT NULL, -- ISO timestamp
    UNIQUE (ticker, date)
);
'''

def init_db(engine):
    with engine.begin() as conn:
        conn.execute(text(CREATE_TABLE_SQL))

print("Database initialization ready.")


In [None]:

# 4. Email sending (errors only)
def send_error_email(subject: str, body: str) -> None:
    """Send a plain text email if SMTP variables are set, otherwise skip."""
    if not (SMTP_HOST and SMTP_USER and SMTP_PASS and EMAIL_TO):
        logger.warning("SMTP/EMAIL env vars not fully set; skipping email.")
        return

    import smtplib
    from email.message import EmailMessage

    msg = EmailMessage()
    msg["From"] = SMTP_USER
    msg["To"] = EMAIL_TO
    msg["Subject"] = subject
    msg.set_content(body)

    try:
        with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s:
            s.starttls()
            s.login(SMTP_USER, SMTP_PASS)
            s.send_message(msg)
        logger.info("Error email sent.")
    except Exception:
        logger.exception("Failed to send error email.")

print("Email function ready.")


In [None]:

# 5. Fetch & store functions
def fetch_latest_for_ticker(ticker: str) -> pd.DataFrame:
    """Fetch the most recent daily bar for one ticker."""
    last_exc = None
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            hist = yf.Ticker(ticker).history(period="2d", interval="1d", auto_adjust=False)
            if hist is None or hist.empty:
                raise ValueError(f"No data returned for {ticker}")
            return hist.tail(1)
        except Exception as e:
            last_exc = e
            logger.warning("Attempt %d/%d failed for %s: %s", attempt, MAX_RETRIES, ticker, e)
            time.sleep(RETRY_SLEEP_SECONDS)
    raise last_exc

def store_rows(engine, rows: List[Dict[str, Any]]) -> Tuple[int, int]:
    """Insert rows, skipping duplicates (ticker+date). Returns (inserted, skipped)."""
    inserted = 0
    skipped = 0
    with engine.begin() as conn:
        for r in rows:
            try:
                conn.execute(text(
                    """INSERT INTO prices
                           (ticker, date, open, high, low, close, adj_close, volume, ts_ingested)
                           VALUES (:ticker, :date, :open, :high, :low, :close, :adj_close, :volume, :ts_ingested)"""
                ), r)
                inserted += 1
            except IntegrityError:
                skipped += 1
            except OperationalError:
                raise
    return inserted, skipped

def run_once(tickers: Iterable[str]) -> None:
    """Fetch & insert the latest daily bar for each ticker."""
    logger.info("Starting run for %d tickers.", len(list(tickers)))
    engine = create_engine(DB_URI, future=True)
    init_db(engine)

    errors: List[str] = []
    to_insert: List[Dict[str, Any]] = []
    now_iso = datetime.utcnow().isoformat(timespec="seconds") + "Z"

    for t in tickers:
        try:
            df = fetch_latest_for_ticker(t)
            row = df.iloc[-1]
            idx_ts = pd.to_datetime(df.index[-1]).date().isoformat()
            to_insert.append({
                "ticker": t,
                "date": idx_ts,
                "open": float(row.get("Open", float("nan"))),
                "high": float(row.get("High", float("nan"))),
                "low": float(row.get("Low", float("nan"))),
                "close": float(row.get("Close", float("nan"))),
                "adj_close": float(row.get("Adj Close", float("nan"))),
                "volume": int(row.get("Volume", 0)) if not pd.isna(row.get("Volume", 0)) else None,
                "ts_ingested": now_iso,
            })
        except Exception as e:
            err = f"{t}: {e}\n{traceback.format_exc(limit=1)}"
            logger.error("Error fetching %s: %s", t, e)
            errors.append(err)

    try:
        ins, skip = store_rows(engine, to_insert)
        logger.info("Inserted %d rows, skipped %d duplicates.", ins, skip)
    except Exception as e:
        err = f"DB insert failure: {e}\n{traceback.format_exc(limit=1)}"
        logger.exception("Database insert failed.")
        errors.append(err)

    if errors:
        subject = "[Daily Prices] Errors during retrieval"
        body = "Errors occurred:\n\n" + "\n\n".join(errors)
        send_error_email(subject, body)
    else:
        logger.info("Run completed successfully with no errors.")

print("Fetch & store functions ready.")



## 6. Run Once (validation)
Run the cell below to immediately fetch & insert data into `prices.db`.


In [None]:
run_once(TICKERS)
print('One-off run complete.')


## 7. Daily Scheduled Run (demo only)
- This will start a blocking scheduler inside the notebook — not recommended for production use.
- For production, prefer system `cron` jobs calling `python daily_prices.py --once`, or a service manager like `systemd`.


In [None]:

from apscheduler.schedulers.blocking import BlockingScheduler

def start_scheduler():
    hh, mm = RUN_EVERY_DAY_AT.split(":", 1)
    hh, mm = int(hh), int(mm)
    tz = timezone(LOCAL_TZ)

    logger.info("Scheduling daily job at %02d:%02d (%s). Tickers: %s",
                hh, mm, LOCAL_TZ, ", ".join(TICKERS))

    scheduler = BlockingScheduler(timezone=tz)
    scheduler.add_job(
        run_once,
        "cron",
        args=[TICKERS],
        hour=hh,
        minute=mm,
        id="daily_prices_job",
        max_instances=1,
        misfire_grace_time=60 * 30,
        coalesce=True,
    )
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        logger.info("Scheduler stopped.")

# Uncomment below to demo (note: will block the kernel)
# start_scheduler()
print("Scheduler function defined. Uncomment last line to demo.")
