In [3]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import yfinance as yf
import logging
from datetime import datetime
from io import StringIO

def _now_ts() -> str:
    return datetime.now().strftime("%Y%m%d_%H%M%S")

def _write_csv(df: pd.DataFrame, prefix: str) -> str:
    """Write DataFrame to a local CSV and return the full path."""
    os.makedirs(DATA_DIR, exist_ok=True)
    filename = f"{prefix}_{_now_ts()}.csv"
    path = os.path.join(DATA_DIR, filename)
    df.to_csv(path, index=False)
    logging.info("CSV written: %s", path)
    return path

def fetch_yfinance_data():
    """Pull current-day data for every S&P 500 ticker using BeautifulSoup to bypass JS issues."""
    logging.info("Fetching S&P 500 tickers from Wikipedia using BeautifulSoup...")

    url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    }

    try:
        response = requests.get(url, headers=headers, timeout=15)
        response.raise_for_status()
    except requests.RequestException as e:
        raise RuntimeError(f"Failed to fetch Wikipedia: {e}")

    # Use BeautifulSoup to find the correct table
    soup = BeautifulSoup(response.text, 'html.parser')
    table = soup.find("table", {"id": "constituents"})  # This is the correct table ID

    if not table:
        raise ValueError("Could not find S&P 500 constituents table on Wikipedia.")

    # Use pandas to read the table from the parsed HTML
    df = pd.read_html(StringIO(str(table)))[0]

    logging.info("Wikipedia table columns: %s", df.columns.tolist())

    # Normalize column name
    symbol_col = None
    for col in ["Symbol", "Ticker", "symbol", "ticker"]:
        if col in df.columns:
            symbol_col = col
            break

    if symbol_col is None:
        raise KeyError(f"Symbol column not found. Available: {df.columns.tolist()}")

    # Clean symbols: replace '.' with '-' (Yahoo Finance format)
    symbols = df[symbol_col].astype(str).str.replace(".", "-", regex=False).tolist()
    logging.info("Found %d symbols", len(symbols))

    rows = []
    for sym in symbols:
        try:
            ticker = yf.Ticker(sym)
            hist = ticker.history(period="1d", auto_adjust=False)
            if not hist.empty:
                r = hist.iloc[-1]
                rows.append({
                    "Datetime": r.name.tz_localize(None).strftime("%Y-%m-%d"),
                    "Symbol": sym,
                    "Open": round(r["Open"], 6),
                    "High": round(r["High"], 6),
                    "Low": round(r["Low"], 6),
                    "Close": round(r["Close"], 6),
                    "Adj Close": round(r.get("Adj Close", r["Close"]), 6),
                    "Volume": int(r["Volume"]),
                })
        except Exception as exc:
            logging.warning("Failed for %s: %s", sym, exc)

    df_data = pd.DataFrame(rows)
    logging.info("Fetched %d rows of market data", len(df_data))

    raw_path = _write_csv(df_data, "sp500_raw")
    return raw_path

def transform_data(raw_path):
    """Rename, calculate derived columns and write the final CSV."""
    if not raw_path or not os.path.exists(raw_path):
        raise FileNotFoundError(f"Raw CSV missing: {raw_path}")
    
    df = pd.read_csv(raw_path)
    df = df.rename(columns={"Datetime": "Date"})
    df = df.sort_values(["Symbol", "Date"]).reset_index(drop=True)
    
    df["close_change"] = df.groupby("Symbol")["Close"].diff().fillna(0.0)
    df["close_pct_change"] = df.groupby("Symbol")["Close"].pct_change().fillna(0.0) * 100
    
    numeric_cols = ["Open", "High", "Low", "Close", "Adj Close", "Volume",
                    "close_change", "close_pct_change"]
    for c in numeric_cols:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    
    df["DAILY_RANGE"] = df["High"] - df["Low"]
    df["DAILY_RANGE_PCT"] = (df["DAILY_RANGE"] / df["Close"]) * 100
    
    final_cols = [
        "Date", "Symbol", "Open", "High", "Low", "Close", "Adj Close", "Volume",
        "close_change", "close_pct_change", "DAILY_RANGE", "DAILY_RANGE_PCT"
    ]
    final_df = df[final_cols].copy()
    
    rename_map = {
        "Date": "DATE",
        "Symbol": "SYMBOL",
        "Open": "OPEN",
        "High": "HIGH",
        "Low": "LOW",
        "Close": "CLOSE",
        "Adj Close": "ADJ_CLOSE",
        "Volume": "VOLUME",
        "close_change": "CLOSE_CHANGE",
        "close_pct_change": "CLOSE_PCT_CHANGE",
        "DAILY_RANGE": "DAILY_RANGE",
        "DAILY_RANGE_PCT": "DAILY_RANGE_PCT",
    }
    final_df.rename(columns=rename_map, inplace=True)
    
    logging.info("Transformed shape: %s columns: %s", final_df.shape, final_df.columns.tolist())
    
    transformed_path = _write_csv(final_df, "sp500_transformed")
    return transformed_path

def upload_transformed_to_s3(transformed_path):
    """Upload only the transformed CSV to S3."""
    if not transformed_path or not os.path.exists(transformed_path):
        raise FileNotFoundError(f"Transformed CSV missing: {transformed_path}")
    
    s3 = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION
    )
    
    s3_key = f"{S3_PREFIX}transformed/sp500_transformed_{_now_ts()}.csv"
    s3.upload_file(
        Filename=transformed_path,
        Bucket=S3_BUCKET,
        Key=s3_key
    )
    
    s3_url = f"s3://{S3_BUCKET}/{s3_key}"
    logging.info("Uploaded transformed CSV â†’ %s", s3_url)
    return s3_url, s3_key

def load_to_snowflake(s3_key):
    """Load the data from S3 to Snowflake using COPY INTO."""
    conn = snowflake.connector.connect(
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        account=SNOWFLAKE_ACCOUNT,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        schema=SNOWFLAKE_SCHEMA
    )
    
    cur = conn.cursor()
    
    # Assuming the table exists and matches the CSV structure.
    # Also assuming a storage integration is set up in Snowflake for the S3 bucket.
    query = f"""
    COPY INTO {SNOWFLAKE_DATABASE}.{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE}
    FROM 's3://{S3_BUCKET}/{s3_key}'
    STORAGE_INTEGRATION = {SNOWFLAKE_STORAGE_INTEGRATION}
    FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '"');
    """
    
    try:
        cur.execute(query)
        conn.commit()
        logging.info("Data loaded to Snowflake table: %s", SNOWFLAKE_TABLE)
    except Exception as e:
        logging.error("Failed to load to Snowflake: %s", e)
        raise
    finally:
        cur.close()
        conn.close()

if __name__ == "__main__":
    try:
        raw_path = fetch_yfinance_data()
        transformed_path = transform_data(raw_path)
        s3_url, s3_key = upload_transformed_to_s3(transformed_path)
        load_to_snowflake(s3_key)
    except Exception as e:
        logging.error("Pipeline failed: %s", e)

INFO:root:Fetching S&P 500 tickers from Wikipedia using BeautifulSoup...
INFO:root:Wikipedia table columns: ['Symbol', 'Security', 'GICS Sector', 'GICS Sub-Industry', 'Headquarters Location', 'Date added', 'CIK', 'Founded']
INFO:root:Found 503 symbols
INFO:root:Fetched 503 rows of market data
INFO:root:CSV written: ./data\sp500_raw_20251114_002312.csv
INFO:root:Transformed shape: (503, 12) columns: ['DATE', 'SYMBOL', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'ADJ_CLOSE', 'VOLUME', 'CLOSE_CHANGE', 'CLOSE_PCT_CHANGE', 'DAILY_RANGE', 'DAILY_RANGE_PCT']
INFO:root:CSV written: ./data\sp500_transformed_20251114_002312.csv
ERROR:root:Pipeline failed: Failed to upload ./data\sp500_transformed_20251114_002312.csv to YOUR_S3_BUCKET/your/prefix/transformed/sp500_transformed_20251114_002315.csv: An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.
