In [16]:
!pip install --upgrade pandas pyarrow



In [20]:
from dotenv import load_dotenv
from pathlib import Path
import os

load_dotenv()

RAW = Path(os.getenv("DATA_DIR_RAW", "data/raw"))
PROC = Path(os.getenv("DATA_DIR_PROCESSED", "data/processed"))

RAW.mkdir(parents=True, exist_ok=True)
PROC.mkdir(parents=True, exist_ok=True)

In [25]:
import os
os.environ["PYARROW_IGNORE_DUPLICATE_EXTENSIONS"] = "1"

import pandas as pd
from datetime import datetime

# Sample dataset
df = pd.DataFrame({
    "date": pd.date_range("2024-01-01", periods=5, freq="D"),
    "price": [100, 101.5, 102.2, 101.9, 103.1]
})

ts = datetime.now().strftime("%Y%m%d-%H%M")

# Save raw CSV
csv_file = RAW / f"sample_{ts}.csv"
df.to_csv(csv_file, index=False)

# Save processed Parquet
parquet_file = PROC / f"sample_{ts}.parquet"
df.to_parquet(parquet_file, engine="fastparquet", index=False)

print("Saved:", csv_file, parquet_file)


Saved: data/raw/sample_20250821-0358.csv data/processed/sample_20250821-0358.parquet


In [26]:
def validate_df(df, required_cols=None, dtypes=None):
    msgs = {}
    if required_cols:
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            msgs["missing_cols"] = missing
    if dtypes:
        for c, t in dtypes.items():
            if c in df.columns and df[c].dtype != t:
                msgs[f"{c}_dtype"] = (df[c].dtype, t)
    return msgs or {"status": "ok"}


In [28]:
df_csv = pd.read_csv(csv_file, parse_dates=["date"])
df_parquet = pd.read_parquet(parquet_file, engine="fastparquet")

print("Shapes:", df.shape, df_csv.shape, df_parquet.shape)
print("Validation CSV:", validate_df(df_csv, ["date","price"], {"price":"float64"}))
print("Validation Parquet:", validate_df(df_parquet, ["date","price"], {"price":"float64"}))


Shapes: (5, 2) (5, 2) (5, 2)
Validation CSV: {'status': 'ok'}
Validation Parquet: {'status': 'ok'}


In [31]:
def write_df(df, path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    suffix = path.suffix.lower()
    if suffix == ".csv":
        df.to_csv(path, index=False)
    elif suffix == ".parquet":
        try:
            df.to_parquet(path, engine="fastparquet", index=False)
        except ImportError:
            raise RuntimeError("Parquet engine missing. Install pyarrow or fastparquet.")
    else:
        raise ValueError(f"Unsupported format: {suffix}")

def read_df(path: Path, **kwargs):
    suffix = path.suffix.lower()
    if suffix == ".csv":
        return pd.read_csv(path, **kwargs)
    elif suffix == ".parquet":
        try:
            return pd.read_parquet(path, engine="fastparquet", **kwargs)
        except ImportError:
            raise RuntimeError("Parquet engine missing. Install pyarrow or fastparquet.")
    else:
        raise ValueError(f"Unsupported format: {suffix}")


In [32]:
test_csv = RAW / f"sample_utils_{ts}.csv"
test_parquet = PROC / f"sample_utils_{ts}.parquet"

write_df(df, test_csv)
write_df(df, test_parquet)

print(read_df(test_csv).head())
print(read_df(test_parquet).head())


         date  price
0  2024-01-01  100.0
1  2024-01-02  101.5
2  2024-01-03  102.2
3  2024-01-04  101.9
4  2024-01-05  103.1
        date  price
0 2024-01-01  100.0
1 2024-01-02  101.5
2 2024-01-03  102.2
3 2024-01-04  101.9
4 2024-01-05  103.1
