In [1]:
%pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-21.0.0-cp313-cp313-macosx_12_0_arm64.whl.metadata (3.3 kB)
Downloading pyarrow-21.0.0-cp313-cp313-macosx_12_0_arm64.whl (31.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.2/31.2 MB[0m [31m61.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: pyarrow
Successfully installed pyarrow-21.0.0
Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
CWD = Path.cwd()
PROJECT_ROOT = CWD.parent if CWD.name == "notebooks" else CWD
env_path = PROJECT_ROOT / ".env"
load_dotenv(env_path, override=False)
RAW_REL = os.getenv("DATA_DIR_RAW", "data/raw")
PROC_REL = os.getenv("DATA_DIR_PROCESSED", "data/processed")

DATA_DIR_RAW = PROJECT_ROOT / RAW_REL
DATA_DIR_PROCESSED = PROJECT_ROOT / PROC_REL
DATA_DIR_RAW.mkdir(parents=True, exist_ok=True)
DATA_DIR_PROCESSED.mkdir(parents=True, exist_ok=True)

print("PROJECT_ROOT:", PROJECT_ROOT)
print("DATA_DIR_RAW:", DATA_DIR_RAW)
print("DATA_DIR_PROCESSED:", DATA_DIR_PROCESSED)

def _ensure_parent(p: Path) -> Path:
    p.parent.mkdir(parents=True, exist_ok=True)
    return p

if "df_api" in globals() and isinstance(df_api, pd.DataFrame) and not df_api.empty:
    df = df_api.copy()
    critical_cols = ["date", "adj_close"] if set(["date","adj_close"]).issubset(df.columns) else list(df.columns[:2])
else:
    df = pd.DataFrame({
        "id": range(1, 6),
        "value": [10.0, 12.5, 9.9, 11.2, 13.7],
        "stamp": pd.date_range("2025-01-01", periods=5, freq="D")
    })
    critical_cols = ["id", "value"]  # expected dtypes: int/float

print("Using DataFrame with shape:", df.shape)
print("Critical columns for dtype check:", critical_cols)
df.head()

PROJECT_ROOT: /Users/wangyuhan/bootcamp_Serena_Wang/homework/homework05
DATA_DIR_RAW: /Users/wangyuhan/bootcamp_Serena_Wang/homework/homework05/data/raw
DATA_DIR_PROCESSED: /Users/wangyuhan/bootcamp_Serena_Wang/homework/homework05/data/processed
Using DataFrame with shape: (5, 3)
Critical columns for dtype check: ['id', 'value']


Unnamed: 0,id,value,stamp
0,1,10.0,2025-01-01
1,2,12.5,2025-01-02
2,3,9.9,2025-01-03
3,4,11.2,2025-01-04
4,5,13.7,2025-01-05


In [7]:
def write_df(df: pd.DataFrame, path: Path) -> Path:
    path = Path(path)
    _ensure_parent(path)
    suf = path.suffix.lower()
    if suf == ".csv":
        df.to_csv(path, index=False)
    elif suf == ".parquet":
        try:
            # prefer pyarrow; pandas will raise if no engine is available
            df.to_parquet(path, index=False)
        except Exception as e:
            raise RuntimeError(
                "Failed to write Parquet. Install a Parquet engine, e.g.: "
                "`pip install pyarrow` or `pip install fastparquet`.\n"
                f"Original error: {e}"
            )
    else:
        raise ValueError(f"Unsupported file suffix: {suf} (use .csv or .parquet)")
    return path
def read_df(path: Path) -> pd.DataFrame:
    path = Path(path)
    suf = path.suffix.lower()
    if not path.exists():
        raise FileNotFoundError(f"Missing file: {path}")
    if suf == ".csv":
        return pd.read_csv(path)
    elif suf == ".parquet":
        try:
            return pd.read_parquet(path)
        except Exception as e:
            raise RuntimeError(
                "Failed to read Parquet. Install a Parquet engine, e.g.: "
                "`pip install pyarrow` or `pip install fastparquet`.\n"
                f"Original error: {e}"
            )
    else:
        raise ValueError(f"Unsupported file suffix: {suf} (use .csv or .parquet)")

# Save in two format
raw_csv_path = DATA_DIR_RAW / "example_dataset.csv"
proc_parquet_path = DATA_DIR_PROCESSED / "example_dataset.parquet"

p1 = write_df(df, raw_csv_path)
p2 = write_df(df, proc_parquet_path)

print("Saved CSV to", p1)
print("Saved Parquet to", p2)




Saved CSV to /Users/wangyuhan/bootcamp_Serena_Wang/homework/homework05/data/raw/example_dataset.csv
Saved Parquet to /Users/wangyuhan/bootcamp_Serena_Wang/homework/homework05/data/processed/example_dataset.parquet


In [8]:
# Read

import pandas as pd
from pathlib import Path

p = Path("/Users/wangyuhan/bootcamp_Serena_Wang/homework/homework05/data/processed/example_dataset.parquet")

# If you don't have a Parquet engine yet, install one once:
# %pip install pyarrow   # or: %pip install fastparquet

df = pd.read_parquet(p)            # or: pd.read_parquet(p, engine="pyarrow")
print(df.shape)
df.head()


(5, 3)


Unnamed: 0,id,value,stamp
0,1,10.0,2025-01-01
1,2,12.5,2025-01-02
2,3,9.9,2025-01-03
3,4,11.2,2025-01-04
4,5,13.7,2025-01-05


In [11]:
# Reload and Validation

df_csv = read_df(raw_csv_path)
df_parq = read_df(proc_parquet_path)

print("CSV shape:", df_csv.shape, "| Parquet shape:", df_parq.shape)

def validate_storage_roundtrip(df_original: pd.DataFrame,
                               df_csv: pd.DataFrame,
                               df_parquet: pd.DataFrame,
                               critical_cols: list[str]) -> pd.DataFrame:
    report = []
    # Shapes
    report.append(("shape_match_csv", df_original.shape == df_csv.shape))
    report.append(("shape_match_parquet", df_original.shape == df_parquet.shape))

    # Dtype expectations: use original df as truth
    for col in critical_cols:
        if col not in df_original.columns:
            report.append((f"dtype_{col}_present_in_original", False))
            continue
        orig_kind = str(df_original[col].dtype)
        csv_kind = str(df_csv[col].dtype) if col in df_csv.columns else "MISSING"
        pq_kind  = str(df_parquet[col].dtype) if col in df_parquet.columns else "MISSING"
        report.append((f"dtype_{col}_orig", orig_kind))
        report.append((f"dtype_{col}_csv", csv_kind))
        report.append((f"dtype_{col}_parquet", pq_kind))
        report.append((f"dtype_equal_csv_{col}", (csv_kind == orig_kind)))
        report.append((f"dtype_equal_parquet_{col}", (pq_kind == orig_kind)))

    # Assemble DataFrame
    return pd.DataFrame(report, columns=["check", "value"])

validation_report = validate_storage_roundtrip(df, df_csv, df_parq, critical_cols)
validation_report


CSV shape: (5, 3) | Parquet shape: (5, 3)


Unnamed: 0,check,value
0,shape_match_csv,True
1,shape_match_parquet,True
2,dtype_id_orig,int64
3,dtype_id_csv,int64
4,dtype_id_parquet,int64
5,dtype_equal_csv_id,True
6,dtype_equal_parquet_id,True
7,dtype_value_orig,float64
8,dtype_value_csv,float64
9,dtype_value_parquet,float64


In [12]:
# Refactor
try:
    tmp = DATA_DIR_PROCESSED / "engine_check.parquet"
    write_df(df.head(1), tmp)
    _ = read_df(tmp)
    print("Parquet read/write OK:", tmp)
    tmp.unlink(missing_ok=True)
except RuntimeError as e:
    print("As expected, Parquet engine missing message:\n", e)

Parquet read/write OK: /Users/wangyuhan/bootcamp_Serena_Wang/homework/homework05/data/processed/engine_check.parquet
