# Homework Starter — Stage 05: Data Storage
Name: 
Date: 

Objectives:
- Env-driven paths to `data/raw/` and `data/processed/`
- Save CSV and Parquet; reload and validate
- Abstract IO with utility functions; document choices

In [7]:
RAW_DIR = pathlib.Path("../data/raw")
PROC_DIR = pathlib.Path("../data/processed")

RAW_DIR.mkdir(parents=True, exist_ok=True)
PROC_DIR.mkdir(parents=True, exist_ok=True)


In [8]:
import pandas as pd

df = pd.DataFrame({
    "Ticker": ["AAA", "BBB", "CCC"],
    "Price": [101.2, 202.5, 303.7]
})


In [9]:
import os, pathlib, pandas as pd
from datetime import datetime, timezone
from dotenv import load_dotenv
import datetime as dt
import pathlib
import pandas as pd


# load .env from repo root
load_dotenv("../../.env")

RAW_DIR = pathlib.Path(os.getenv("DATA_DIR_RAW", "homework/homework5/data/raw"))
PROC_DIR = pathlib.Path(os.getenv("DATA_DIR_PROCESSED", "homework/homework5/data/processed"))
RAW_DIR.mkdir(parents=True, exist_ok=True)
PROC_DIR.mkdir(parents=True, exist_ok=True)

def ts():  # timestamp for filenames
    return datetime.now(timezone.utc).strftime("%Y%m%d-%H%M")

# tiny sample dataset (or replace with your own)
df = pd.DataFrame({
    "id": [1, 2, 3, 4],
    "ticker": ["AAPL", "MSFT", "NVDA", "AMZN"],
    "price": [189.4, 428.7, 111.9, 175.2]
})
df


Unnamed: 0,id,ticker,price
0,1,AAPL,189.4
1,2,MSFT,428.7
2,3,NVDA,111.9
3,4,AMZN,175.2


## 1) Create or Load a Sample DataFrame
You may reuse data from prior stages or create a small synthetic dataset.

In [2]:
import numpy as np
dates = pd.date_range('2024-01-01', periods=20, freq='D')
df = pd.DataFrame({'date': dates, 'ticker': ['AAPL']*20, 'price': 150 + np.random.randn(20).cumsum()})
df.head()

Unnamed: 0,date,ticker,price
0,2024-01-01,AAPL,150.563895
1,2024-01-02,AAPL,150.549637
2,2024-01-03,AAPL,151.041593
3,2024-01-04,AAPL,151.092771
4,2024-01-05,AAPL,149.624326


## 2) Save CSV to data/raw/ and Parquet to data/processed/ (TODO)
- Use timestamped filenames.
- Handle missing Parquet engine gracefully.

In [3]:
# CSV → data/raw
csv_path = RAW_DIR / f"sample_{ts()}.csv"
df.to_csv(csv_path, index=False)
print("CSV saved →", csv_path)

# Parquet → data/processed (install a parquet engine if needed)
try:
    pq_path = PROC_DIR / f"sample_{ts()}.parquet"
    df.to_parquet(pq_path, index=False)  # needs pyarrow or fastparquet
    print("Parquet saved →", pq_path)
except Exception as e:
    print("Parquet save failed:", e)
    print("Tip: pip install pyarrow  (or fastparquet) and re-run this cell.")


CSV saved → homework/homework5/data/raw/sample_20250821-0153.csv
Parquet saved → homework/homework5/data/processed/sample_20250821-0153.parquet


## 3) Reload and Validate (TODO)
- Compare shapes and key dtypes.

In [13]:
# find the newest files we just wrote
latest_csv = sorted(RAW_DIR.glob("*.csv"))[-1]
latest_pq  = sorted(PROC_DIR.glob("*.parquet"))[-1]
print("CSV:", latest_csv)
print("PARQUET:", latest_pq)

# reload
df_csv = pd.read_csv(latest_csv)
df_pq  = pd.read_parquet(latest_pq)

# simple validation
expected = {"Ticker": "object", "Price": "float64"}
result = {
    "rows_csv": len(df_csv),
    "rows_parquet": len(df_pq),
    "shape_equal": df_csv.shape == df_pq.shape,
    "dtypes_ok": {c: (str(df_csv[c].dtype)==expected[c] and str(df_pq[c].dtype)==expected[c])
                  for c in expected if c in df_csv.columns and c in df_pq.columns}
}
result


CSV: homework/homework5/data/raw/utildemo_20250820-221323.csv
PARQUET: homework/homework5/data/processed/utildemo_20250821-0159.parquet


{'rows_csv': 4, 'rows_parquet': 4, 'shape_equal': True, 'dtypes_ok': {}}

In [5]:
# Find the most recent files we just wrote
latest_csv = sorted(RAW_DIR.glob("sample_*.csv"))[-1]
latest_pq  = sorted(PROC_DIR.glob("sample_*.parquet"))[-1]

df_csv = pd.read_csv(latest_csv)
df_pq  = pd.read_parquet(latest_pq)

def validate_shapes_and_types(df1, df2, dtype_expect: dict):
    out = {}
    out["shape_equal"] = (df1.shape == df2.shape)
    out["shape_csv"]   = df1.shape
    out["shape_pq"]    = df2.shape

    # dtype checks only on columns present in both
    common = [c for c in dtype_expect if c in df1.columns and c in df2.columns]
    out["dtypes_ok"] = {
        c: (pd.api.types.is_dtype_equal(df1[c].dtype, dtype_expect[c]) and 
            pd.api.types.is_dtype_equal(df2[c].dtype, dtype_expect[c]))
        for c in common
    }
    return out

expected = {"id": "int64", "ticker": "object", "price": "float64"}
results = validate_shapes_and_types(df_csv, df_pq, expected)
results


{'shape_equal': True,
 'shape_csv': (20, 3),
 'shape_pq': (20, 3),
 'dtypes_ok': {'ticker': True, 'price': True}}

In [None]:
def validate_loaded(original, reloaded):
    checks = {
        'shape_equal': original.shape == reloaded.shape,
        'date_is_datetime': pd.api.types.is_datetime64_any_dtype(reloaded['date']) if 'date' in reloaded.columns else False,
        'price_is_numeric': pd.api.types.is_numeric_dtype(reloaded['price']) if 'price' in reloaded.columns else False,
    }
    return checks

df_csv = pd.read_csv(csv_path, parse_dates=['date'])
validate_loaded(df, df_csv)

In [None]:
if pq_path:
    try:
        df_pq = pd.read_parquet(pq_path)
        validate_loaded(df, df_pq)
    except Exception as e:
        print('Parquet read failed:', e)

## 4) Utilities (TODO)
- Implement `detect_format`, `write_df`, `read_df`.
- Use suffix to route; create parent dirs if needed; friendly errors for Parquet.

In [10]:
def ts():
    return dt.datetime.now().strftime('%Y%m%d-%H%M%S')
    
def ensure_parent(path: pathlib.Path):
    path.parent.mkdir(parents=True, exist_ok=True)

def write_df(df: pd.DataFrame, path: str | pathlib.Path):
    p = pathlib.Path(path)
    ensure_parent(p)
    suffix = p.suffix.lower()
    if suffix == ".csv":
        df.to_csv(p, index=False)
    elif suffix == ".parquet":
        try:
            df.to_parquet(p, index=False)
        except Exception as e:
            raise RuntimeError(
                "Parquet write failed — install a parquet engine: "
                "`pip install pyarrow` or `pip install fastparquet`"
            ) from e
    else:
        raise ValueError(f"Unsupported file type: {suffix}")
    return p

def read_df(path: str | pathlib.Path) -> pd.DataFrame:
    p = pathlib.Path(path)
    if not p.exists():
        raise FileNotFoundError(p)
    suffix = p.suffix.lower()
    if suffix == ".csv":
        return pd.read_csv(p)
    elif suffix == ".parquet":
        try:
            return pd.read_parquet(p)
        except Exception as e:
            raise RuntimeError(
                "Parquet read failed — install a parquet engine: "
                "`pip install pyarrow` or `pip install fastparquet`"
            ) from e
    else:
        raise ValueError(f"Unsupported file type: {suffix}")

# --- quick demo ---
demo_csv = RAW_DIR / f"utildemo_{ts()}.csv"
demo_pq  = PROC_DIR / f"utildemo_{ts()}.parquet"

write_df(df, demo_csv)
try:
    write_df(df, demo_pq)
except RuntimeError as e:
    print(e)  # engine hint

df_r_csv = read_df(demo_csv)
print("Reloaded CSV rows:", len(df_r_csv))


Reloaded CSV rows: 4


In [11]:
import pandas as pd

# Create a demo DataFrame
df = pd.DataFrame({
    "Ticker": ["AAA", "BBB", "CCC"],
    "Price": [101.2, 202.5, 303.7]
})

# Save into raw folder
demo_csv = RAW_DIR / f"util_demo_{ts()}.csv"
write_df(df, demo_csv)

# Save into processed folder
demo_pq = PROC_DIR / f"util_demo_{ts()}.parquet"
write_df(df, demo_pq)


PosixPath('homework/homework5/data/processed/util_demo_20250820-221329.parquet')

In [12]:
import typing as t, pathlib

def detect_format(path: t.Union[str, pathlib.Path]):
    s = str(path).lower()
    if s.endswith('.csv'): return 'csv'
    if s.endswith('.parquet') or s.endswith('.pq') or s.endswith('.parq'): return 'parquet'
    raise ValueError('Unsupported format: ' + s)

def write_df(df: pd.DataFrame, path: t.Union[str, pathlib.Path]):
    p = pathlib.Path(path); p.parent.mkdir(parents=True, exist_ok=True)
    fmt = detect_format(p)
    if fmt == 'csv':
        df.to_csv(p, index=False)
    else:
        try:
            df.to_parquet(p)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e
    return p

def read_df(path: t.Union[str, pathlib.Path]):
    p = pathlib.Path(path)
    fmt = detect_format(p)
    if fmt == 'csv':
        return pd.read_csv(p, parse_dates=['date']) if 'date' in pd.read_csv(p, nrows=0).columns else pd.read_csv(p)
    else:
        try:
            return pd.read_parquet(p)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e

# Demo
p_csv = RAW / f"util_{ts()}.csv"
p_pq  = PROC / f"util_{ts()}.parquet"
write_df(df, p_csv); read_df(p_csv).head()
try:
    write_df(df, p_pq)
    read_df(p_pq).head()
except RuntimeError as e:
    print('Skipping Parquet util demo:', e)

NameError: name 'RAW' is not defined

## 5) Documentation (TODO)
- Update README with a **Data Storage** section (folders, formats, env usage).
- Summarize validation checks and any assumptions.

## Storage Notes (HW5)

- **Paths from `.env`** → `DATA_DIR_RAW` and `DATA_DIR_PROCESSED` are loaded; files written to `homework/homework5/data/raw/` and `.../processed/`.
- **Formats saved:** CSV (raw) and Parquet (processed).
- **Utilities:** `write_df(df, path)` / `read_df(path)` route by suffix, create folders, and raise a clear hint if a Parquet engine is missing.
- **Validation:** reloaded newest CSV/Parquet, compared shapes and checked dtypes on key columns.
- **Assumptions:** parquet engine available (`pyarrow`); paths come from `.env`; filenames include UTC timestamp from `ts()`.
