In [2]:
import psycopg2
print(psycopg2.__version__)


2.9.9 (dt dec pq3 ext lo64)


In [2]:
import os
print(os.getenv("DB_HOST"))
print(os.getenv("DB_PORT"))
print(os.getenv("DB_USER"))
print(os.getenv("DB_NAME"))


localhost
5432
postgres
stocks


In [30]:
from sqlalchemy import create_engine
engine = get_engine()
engine.connect().close()
print("Connection successful")


Connection successful


In [14]:
from sqlalchemy import create_engine
import os

engine = create_engine(
    f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@"
    f"{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
)

engine.connect().close()
print("Postgres connection works")


Postgres connection works


In [28]:
from sqlalchemy import create_engine
engine = get_engine()
engine.connect().close()
print("Connection successful")

Connection successful


In [12]:
import sqlalchemy
import psycopg2

print(sqlalchemy.__version__)
print(psycopg2.__version__)


1.4.39
2.9.11 (dt dec pq3 ext lo64)


In [30]:
import os

print(os.getenv("DB_HOST"))
print(os.getenv("DB_PORT"))
print(os.getenv("DB_USER"))
print(os.getenv("DB_NAME"))

localhost
5432
postgres
stocks


In [24]:
#This is AAPL ETL data pipeline from 2020 to 2025

import yfinance as yf
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import timedelta
import os

TICKER = "AAPL"
START_DATE = "2020-01-01"
END_DATE = "2025-12-31"


def get_engine():
    required = [
        "DB_HOST",
        "DB_PORT",
        "DB_USER",
        "DB_PASSWORD",
        "DB_NAME",
    ]

    missing = [v for v in required if not os.getenv(v)]
    if missing:
        raise RuntimeError(f"Missing env vars: {missing}")

    return create_engine(
        f"postgresql+psycopg2://"
        f"{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@"
        f"{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/"
        f"{os.getenv('DB_NAME')}"
    )


def get_last_loaded_date(engine):
    query = text("""
        SELECT last_loaded_date
        FROM etl_metadata
        WHERE ticker = :ticker
    """)
    with engine.connect() as conn:
        result = conn.execute(query, {"ticker": TICKER}).fetchone()
        return result[0] if result else None


def update_last_loaded_date(engine, last_date):
    query = text("""
        INSERT INTO etl_metadata (ticker, last_loaded_date)
        VALUES (:ticker, :last_date)
        ON CONFLICT (ticker)
        DO UPDATE SET last_loaded_date = EXCLUDED.last_loaded_date
    """)

    with engine.begin() as conn:
        conn.execute(
            query,
            {"ticker": TICKER, "last_date": last_date}
        )


def extract_data(start_date):
    df = yf.download(
        TICKER,
        start=start_date,
        end=END_DATE,
        interval="1d"
    )
    return df.reset_index()


def transform_data(df):
    df = df.copy()

    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.get_level_values(0)

    df["ticker"] = TICKER

    if "Adj Close" in df.columns:
        df["adj_close"] = df["Adj Close"]
    else:
        df["adj_close"] = df["Close"]

    df["daily_return"] = df["adj_close"].pct_change()
    df["ma_20"] = df["adj_close"].rolling(20).mean()

    df.rename(
        columns={
            "Date": "trade_date",
            "Open": "open",
            "High": "high",
            "Low": "low",
            "Close": "close",
            "Volume": "volume"
        },
        inplace=True
    )

    return df[
        [
            "trade_date",
            "ticker",
            "open",
            "high",
            "low",
            "close",
            "adj_close",
            "volume",
            "daily_return",
            "ma_20"
        ]
    ]


def validate_data(df):
    if df.empty:
        raise ValueError("No data extracted")

    if df["trade_date"].isnull().any():
        raise ValueError("Null trade_date detected")

    if df["close"].isnull().any():
        raise ValueError("Null close prices detected")

    if df["adj_close"].isnull().any():
        raise ValueError("Null adjusted close detected")

    if (df["volume"] < 0).any():
        raise ValueError("Negative volume detected")

def load_data(engine, df):
    df.to_sql(
        "daily_prices",
        engine,
        if_exists="append",
        index=False,
        method="multi"
    )


def run_pipeline():
    engine = get_engine()

    last_date = get_last_loaded_date(engine)

    start_date = (
        last_date + timedelta(days=1)
        if last_date
        else START_DATE
    )

    raw = extract_data(start_date)

    if raw.empty:
        print(f"No data found for {TICKER} starting {start_date}. Skipping load.")
        return

    clean = transform_data(raw)
    validate_data(clean)
    load_data(engine, clean)

    update_last_loaded_date(
        engine,
        clean["trade_date"].max()
    )

In [26]:
#This check if the data made it to the postgres database

from sqlalchemy import text
import pandas as pd

engine = get_engine()

# Fetch first 10 rows
df = pd.read_sql(text("SELECT * FROM daily_prices LIMIT 10"), engine)
print(df)

# Count total rows
total_rows = pd.read_sql(text("SELECT COUNT(*) FROM daily_prices"), engine)
print("Total rows:", total_rows.iloc[0, 0])

# Latest trade date
latest_date = pd.read_sql(text("SELECT MAX(trade_date) FROM daily_prices"), engine)
print("Latest trade_date:", latest_date.iloc[0, 0])

   trade_date ticker       open       high        low      close  adj_close  \
0  2020-01-02   AAPL  71.476615  72.528597  71.223274  72.468277  72.468277   
1  2020-01-03   AAPL  71.696183  72.523769  71.539352  71.763741  71.763741   
2  2020-01-06   AAPL  70.885472  72.374162  70.634539  72.335556  72.335556   
3  2020-01-07   AAPL  72.345197  72.600952  71.775781  71.995346  71.995346   
4  2020-01-08   AAPL  71.698589  73.455103  71.698589  73.153503  73.153503   
5  2020-01-09   AAPL  74.130683  74.900365  73.879757  74.707344  74.707344   
6  2020-01-10   AAPL  74.941386  75.440836  74.374378  74.876236  74.876236   
7  2020-01-13   AAPL  75.192306  76.502451  75.074074  76.475906  76.475906   
8  2020-01-14   AAPL  76.413185  76.623097  75.320190  75.443237  75.443237   
9  2020-01-15   AAPL  75.242997  76.123665  74.688049  75.119942  75.119942   

      volume  daily_return ma_20             load_timestamp  
0  135480400           NaN  None 2026-01-27 00:50:08.982442  
1  146

In [34]:
#This verify the daily_price table, and see a summary of each ticker

import pandas as pd
from sqlalchemy import text
from datetime import datetime

engine = get_engine()  # reuse your existing get_engine function

# Fetch all tickers with basic stats
query = text("""
    SELECT
        ticker,
        COUNT(*) AS total_rows,
        MIN(trade_date) AS first_date,
        MAX(trade_date) AS last_date,
        MIN(adj_close) AS min_adj_close,
        MAX(adj_close) AS max_adj_close
    FROM daily_prices
    GROUP BY ticker
    ORDER BY ticker
""")

df_summary = pd.read_sql(query, engine)

print("=== Daily Prices Summary by Ticker ===")
print(df_summary)


=== Daily Prices Summary by Ticker ===
  ticker  total_rows  first_date   last_date  min_adj_close  max_adj_close
0   AAPL        1507  2020-01-02  2025-12-30      54.264336     286.190002
