In [10]:
import yfinance as yf
import pandas as pd
from pyspark.sql.functions import col


symbols = {
    "BTC-USD": "BTC",
    "ETH-USD": "ETH",
    "SOL-USD": "SOL"
}

all_data = []

# Hourly interval with up to 7d range for free access
for yf_symbol, short_symbol in symbols.items():
    ticker = yf.Ticker(yf_symbol)
    hist = ticker.history(period="7d", interval="1h")  # Hourly data for 7 days
    hist = hist.reset_index()
    hist["Symbol"] = short_symbol
    all_data.append(hist)

In [14]:
hist.columns

Index(['Datetime', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dividends',
       'Stock Splits', 'Symbol'],
      dtype='object')

In [11]:
bronze_df = pd.concat(all_data, ignore_index=True)
bronze_df = bronze_df[["Symbol", "Datetime", "Open", "High", "Low", "Close", "Volume"]]
bronze_df.head()


Unnamed: 0,Symbol,Datetime,Open,High,Low,Close,Volume
0,BTC,2025-09-19 00:00:00+00:00,117145.195312,117438.828125,117094.859375,117412.007812,0
1,BTC,2025-09-19 01:00:00+00:00,117443.320312,117475.351562,117180.289062,117256.796875,0
2,BTC,2025-09-19 02:00:00+00:00,117274.789062,117322.726562,117116.414062,117172.523438,0
3,BTC,2025-09-19 03:00:00+00:00,117173.84375,117321.570312,116949.757812,117009.984375,0
4,BTC,2025-09-19 04:00:00+00:00,117017.921875,117126.15625,116780.101562,117058.445312,0


In [6]:
def enforce_schema(df):
    schema = {
        "Symbol": "string",
        "Datetime": "datetime64[ns]",
        "Open": "float64",
        "High": "float64",
        "Low": "float64",
        "Close": "float64", 
        "Volume": "Int64",
    }

    # apply schema selectively (ignore cols not in dict)
    for col, dtype in schema.items():
        if col in df.columns:
            if dtype.startswith("datetime"):
                df[col] = pd.to_datetime(df[col], errors="coerce")
            else:
                df[col] = df[col].astype(dtype, errors="ignore")
    return df
silver_df = enforce_schema(bronze_df)
silver_df.head()

Unnamed: 0,Symbol,Datetime,Open,High,Low,Close,Volume
0,BTC,2025-09-19 00:00:00+00:00,117145.195312,117438.828125,117094.859375,117412.007812,0
1,BTC,2025-09-19 01:00:00+00:00,117443.320312,117475.351562,117180.289062,117256.796875,0
2,BTC,2025-09-19 02:00:00+00:00,117274.789062,117322.726562,117116.414062,117172.523438,0
3,BTC,2025-09-19 03:00:00+00:00,117173.84375,117321.570312,116949.757812,117009.984375,0
4,BTC,2025-09-19 04:00:00+00:00,117017.921875,117126.15625,116780.101562,117058.445312,0


In [10]:
import os
from dotenv import load_dotenv
import psycopg2 #for connecting to PostgreSQL database and executing queries
from sqlalchemy import create_engine #To efficiently manage and reuse datavase connections
load_dotenv()

True

In [11]:
DB_USERNAME = os.getenv('DB_USERNAME', 'postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = os.getenv('DB_PORT', '5432')
DB_NAME = os.getenv('DB_NAME', 'ben')

In [12]:
from sqlalchemy import create_engine, text

# Create engine
engine = create_engine(f'postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

In [17]:
with engine.begin() as conn:
    conn.execute(text("""
         DROP TABLE IF EXISTS yfinance_hourly;

        CREATE TABLE yfinance_hourly (
            Symbol VARCHAR(10),
            Datetime TIMESTAMP,
            Open FLOAT,
            High FLOAT,
            Low FLOAT,
            Close FLOAT,
            Volume BIGINT
        );
    """))


In [18]:
from sqlalchemy import inspect

inspector = inspect(engine)
print(inspector.get_table_names())


['prices', 'yfinance_historical', 'yfinance_hourly']


In [19]:
columns_to_keep = ["Symbol","Datetime", "Open", "High","Low","Close","Volume"]

# Keep only the desired columns
df_subset = silver_df[columns_to_keep]

# Optionally lowercase column names (useful for Postgres)
df_subset.columns = [col.lower() for col in df_subset.columns]

# Push into SQL
df_subset.to_sql(
    "yfinance_hourly",
    con=engine,
    if_exists="append",
    index=False
)


489