# Homework Starter — Stage 05: Data Storage
Name: 
Date: 

Objectives:
- Env-driven paths to `data/raw/` and `data/processed/`
- Save CSV and Parquet; reload and validate
- Abstract IO with utility functions; document choices

In [1]:
import os, pathlib, datetime as dt
import pandas as pd
from dotenv import load_dotenv

load_dotenv()
RAW = pathlib.Path(os.getenv('DATA_DIR_RAW', 'data/raw'))
PROC = pathlib.Path(os.getenv('DATA_DIR_PROCESSED', 'data/processed'))
RAW.mkdir(parents=True, exist_ok=True)
PROC.mkdir(parents=True, exist_ok=True)
print('RAW ->', RAW.resolve())
print('PROC ->', PROC.resolve())

RAW -> C:\Users\qochi\bootcamp_millicent_qochiwa\homework\homework5\data\raw
PROC -> C:\Users\qochi\bootcamp_millicent_qochiwa\homework\homework5\data\processed


## 1) Create or Load a Sample DataFrame
You may reuse data from prior stages or create a small synthetic dataset.

In [2]:
# import numpy as np
# dates = pd.date_range('2024-01-01', periods=20, freq='D')
# df = pd.DataFrame({'date': dates, 'ticker': ['AAPL']*20, 'price': 150 + np.random.randn(20).cumsum()})
# df

#sample DataFrame
import numpy as np
dates = pd.date_range('2024-01-01', periods=20, freq='D')
np.random.seed(0)  # deterministic for notebook runs
df = pd.DataFrame({
     'date': dates,
     'ticker': ['AAPL'] * len(dates),
     'price': 150 + np.random.randn(len(dates)).cumsum()
 })
df.head()


Unnamed: 0,date,ticker,price
0,2024-01-01,AAPL,151.764052
1,2024-01-02,AAPL,152.16421
2,2024-01-03,AAPL,153.142948
3,2024-01-04,AAPL,155.383841
4,2024-01-05,AAPL,157.251399


## 2) Save CSV to data/raw/ and Parquet to data/processed/ (TODO)
- Use timestamped filenames.
- Handle missing Parquet engine gracefully.

In [11]:
# def ts(): return dt.datetime.now().strftime('%Y%m%d-%H%M%S')

# # TODO: Save CSV
# csv_path = RAW / f"sample_{ts()}.csv"
# df.to_csv(csv_path, index=False)
# csv_path

# # TODO: Save Parquet
# pq_path = PROC / f"sample_{ts()}.parquet"
# try:
#     df.to_parquet(pq_path)
# except Exception as e:
#     print('Parquet engine not available. Install pyarrow or fastparquet to complete this step.')
#     pq_path = None
# pq_path

Parquet engine not available. Install pyarrow or fastparquet to complete this step.


In [3]:
import typing as t
import pathlib
import pandas as pd

def ts(fmt: str = '%Y%m%d-%H%M%S') -> str:
    """Timestamp helper for filenames."""
    return dt.datetime.now().strftime(fmt)

def detect_format(path: t.Union[str, pathlib.Path]) -> str:
    s = str(path).lower()
    if s.endswith('.csv'):
        return 'csv'
    if s.endswith('.parquet') or s.endswith('.pq') or s.endswith('.parq'):
        return 'parquet'
    raise ValueError('Unsupported format: ' + s)

def _get_parquet_engine() -> t.Optional[str]:
    """
    Return 'pyarrow' or 'fastparquet' if available, otherwise None.
    We avoid importing pandas internals and directly check these backends.
    """
    try:
        import pyarrow  # type: ignore
        return 'pyarrow'
    except Exception:
        try:
            import fastparquet  # type: ignore
            return 'fastparquet'
        except Exception:
            return None

def _infer_date_columns_csv(path: t.Union[str, pathlib.Path]) -> t.List[str]:
    """Look at the CSV header to infer columns that seem like dates."""
    p = pathlib.Path(path)
    cols = pd.read_csv(p, nrows=0).columns.tolist()
    # common heuristics: name contains 'date' or 'time'
    date_cols = [c for c in cols if ('date' in c.lower()) or ('time' in c.lower())]
    return date_cols

def write_df(df: pd.DataFrame, path: t.Union[str, pathlib.Path]) -> pathlib.Path:
    """
    Write DataFrame either as CSV or Parquet based on file suffix.
    Creates parent dir if missing. Raises RuntimeError with a friendly message
    if Parquet engine not available.
    """
    p = pathlib.Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    fmt = detect_format(p)
    if fmt == 'csv':
        df.to_csv(p, index=False)
        print(f'WROTE CSV -> {p}')
    else:
        engine = _get_parquet_engine()
        if engine is None:
            raise RuntimeError(
                "Parquet engine not available. Install one: `pip install pyarrow` or `pip install fastparquet`."
            )
        df.to_parquet(p, index=False, engine=engine)
        print(f'WROTE PARQUET -> {p} (engine={engine})')
    return p

def read_df(path: t.Union[str, pathlib.Path]) -> pd.DataFrame:
    """
    Read CSV or Parquet based on extension.
    For CSV: parses inferred date columns (names containing 'date'/'time').
    For Parquet: uses available parquet engine or raises friendly RuntimeError.
    """
    p = pathlib.Path(path)
    fmt = detect_format(p)
    if fmt == 'csv':
        date_cols = _infer_date_columns_csv(p)
        if date_cols:
            return pd.read_csv(p, parse_dates=date_cols)
        else:
            return pd.read_csv(p)
    else:
        engine = _get_parquet_engine()
        if engine is None:
            raise RuntimeError(
                "Parquet engine not available for reading. Install `pyarrow` or `fastparquet`."
            )
        return pd.read_parquet(p, engine=engine)


In [4]:
csv_path = RAW / f"sample_{ts()}.csv"
pq_path  = PROC / f"sample_{ts()}.parquet"

# Save CSV (always works)
write_df(df, csv_path)

# Save Parquet with fastparquet
try:
    df.to_parquet(pq_path, engine="fastparquet", index=False)
    print(f"WROTE PARQUET -> {pq_path} (engine=fastparquet)")
except Exception as e:
    print("Parquet save skipped:", e)
    pq_path = None


WROTE CSV -> data\raw\sample_20250820-225407.csv
WROTE PARQUET -> data\processed\sample_20250820-225407.parquet (engine=fastparquet)


## 3) Reload and Validate (TODO)
- Compare shapes and key dtypes.

In [5]:
def validate_loaded(original, reloaded):
    checks = {
        'shape_equal': original.shape == reloaded.shape,
        'date_is_datetime': pd.api.types.is_datetime64_any_dtype(reloaded['date']) if 'date' in reloaded.columns else False,
        'price_is_numeric': pd.api.types.is_numeric_dtype(reloaded['price']) if 'price' in reloaded.columns else False,
    }
    return checks

df_csv = pd.read_csv(csv_path, parse_dates=['date'])
validate_loaded(df, df_csv)

{'shape_equal': True, 'date_is_datetime': True, 'price_is_numeric': True}

In [6]:
if pq_path:
    try:
        df_pq = pd.read_parquet(pq_path)
        validate_loaded(df, df_pq)
    except Exception as e:
        print('Parquet read failed:', e)

In [7]:
# Cell 5: Validation helpers and reloading
import pandas as pd
import numpy as np

def validate_loaded(original: pd.DataFrame, loaded: pd.DataFrame, verbose: bool = True) -> dict:
    """
    Compare shape, column presence, dtypes, and column-wise equality for intersection columns.
    Returns a dict of checks.
    """
    checks = {}
    checks['shape_original'] = original.shape
    checks['shape_loaded'] = loaded.shape
    checks['shape_equal'] = original.shape == loaded.shape

    orig_cols = list(original.columns)
    loaded_cols = list(loaded.columns)
    checks['orig_cols'] = orig_cols
    checks['loaded_cols'] = loaded_cols
    checks['missing_in_loaded'] = [c for c in orig_cols if c not in loaded_cols]
    checks['extra_in_loaded'] = [c for c in loaded_cols if c not in orig_cols]

    # dtype equality for intersection
    common = [c for c in orig_cols if c in loaded_cols]
    dtype_equal = {}
    for c in common:
        dtype_equal[c] = {
            'original': str(original[c].dtype),
            'loaded': str(loaded[c].dtype),
            'dtype_equal': pd.api.types.is_dtype_equal(original[c].dtype, loaded[c].dtype)
        }
    checks['dtype_equal'] = dtype_equal

    # quick sample equality check (exact equality)
    equality = {}
    for c in common:
        try:
            # use .equals for exact identical series (including NaNs)
            equality[c] = bool(original[c].equals(loaded[c]))
        except Exception:
            equality[c] = False
    checks['column_values_identical'] = equality

    if verbose:
        print("SHAPE: original", checks['shape_original'], "loaded", checks['shape_loaded'], "==", checks['shape_equal'])
        if checks['missing_in_loaded']:
            print("WARNING: columns missing in loaded:", checks['missing_in_loaded'])
        if checks['extra_in_loaded']:
            print("NOTE: extra columns in loaded:", checks['extra_in_loaded'])
        print("\nDType comparison (original -> loaded):")
        for c, d in dtype_equal.items():
            print(f"  {c}: {d['original']} -> {d['loaded']} ; equal? {d['dtype_equal']}")
        print("\nExact-value equality for columns (True means identical):")
        for c, v in equality.items():
            print(f"  {c}: {v}")
    return checks

# Read CSV back
df_csv = read_df(csv_path)
csv_checks = validate_loaded(df, df_csv)

# Read parquet back (if created)
if pq_path:
    try:
        df_pq = read_df(pq_path)
        pq_checks = validate_loaded(df, df_pq)
    except RuntimeError as e:
        print("Parquet re-load skipped:", e)


SHAPE: original (20, 3) loaded (20, 3) == True

DType comparison (original -> loaded):
  date: datetime64[ns] -> datetime64[ns] ; equal? True
  ticker: object -> object ; equal? True
  price: float64 -> float64 ; equal? True

Exact-value equality for columns (True means identical):
  date: True
  ticker: True
  price: False
SHAPE: original (20, 3) loaded (20, 3) == True

DType comparison (original -> loaded):
  date: datetime64[ns] -> datetime64[ns] ; equal? True
  ticker: object -> object ; equal? True
  price: float64 -> float64 ; equal? True

Exact-value equality for columns (True means identical):
  date: True
  ticker: True
  price: True


## 4) Utilities (TODO)
- Implement `detect_format`, `write_df`, `read_df`.
- Use suffix to route; create parent dirs if needed; friendly errors for Parquet.

In [8]:
import typing as t, pathlib, pandas as pd

def detect_format(path: t.Union[str, pathlib.Path]):
    s = str(path).lower()
    if s.endswith('.csv'):
        return 'csv'
    if s.endswith(('.parquet', '.pq', '.parq')):
        return 'parquet'
    raise ValueError('Unsupported format: ' + s)

def write_df(df: pd.DataFrame, path: t.Union[str, pathlib.Path]):
    p = pathlib.Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)  # ensure folder exists
    fmt = detect_format(p)
    if fmt == 'csv':
        df.to_csv(p, index=False)
    else:  # parquet
        try:
            df.to_parquet(p, engine="fastparquet", index=False)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install fastparquet.') from e
    return p

def read_df(path: t.Union[str, pathlib.Path]):
    p = pathlib.Path(path)
    fmt = detect_format(p)
    if fmt == 'csv':
        cols = pd.read_csv(p, nrows=0).columns
        return pd.read_csv(p, parse_dates=['date']) if 'date' in cols else pd.read_csv(p)
    else:
        try:
            return pd.read_parquet(p, engine="fastparquet")
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install fastparquet.') from e

# --- Demo ---
p_csv = RAW / f"util_{ts()}.csv"
p_pq  = PROC / f"util_{ts()}.parquet"

# test CSV
write_df(df, p_csv)
print("CSV HEAD:\n", read_df(p_csv).head(), "\n")

# test Parquet
try:
    write_df(df, p_pq)
    print("PARQUET HEAD:\n", read_df(p_pq).head())
except RuntimeError as e:
    print('Skipping Parquet util demo:', e)


CSV HEAD:
         date ticker       price
0 2024-01-01   AAPL  151.764052
1 2024-01-02   AAPL  152.164210
2 2024-01-03   AAPL  153.142948
3 2024-01-04   AAPL  155.383841
4 2024-01-05   AAPL  157.251399 

PARQUET HEAD:
         date ticker       price
0 2024-01-01   AAPL  151.764052
1 2024-01-02   AAPL  152.164210
2 2024-01-03   AAPL  153.142948
3 2024-01-04   AAPL  155.383841
4 2024-01-05   AAPL  157.251399


## 5) Documentation (TODO)
- Update README with a **Data Storage** section (folders, formats, env usage).
- Summarize validation checks and any assumptions.