# Project — Data Storage
Owner: Kushal Mamillapalli   
Date: 08/20

## Goals
- Configure env-driven paths for `data/raw/` and `data/processed/`.
- Save CSV and Parquet; reload and validate with strict dtypes.
- Provide reusable IO utilities and document tradeoffs.

In [1]:
import os, pathlib, datetime as dt
import pandas as pd
from dotenv import load_dotenv

load_dotenv()
RAW = pathlib.Path(os.getenv('DATA_DIR_RAW', 'data/raw'))
PROC = pathlib.Path(os.getenv('DATA_DIR_PROCESSED', 'data/processed'))
RAW.mkdir(parents=True, exist_ok=True)
PROC.mkdir(parents=True, exist_ok=True)
print('RAW ->', RAW.resolve())
print('PROC ->', PROC.resolve())

RAW -> /Users/Kushal/FRE/bootcamp_kushal_mamillapalli/project/data/raw
PROC -> /Users/Kushal/FRE/bootcamp_kushal_mamillapalli/project/data/processed


## 1) Create or Load a Sample DataFrame
Use a small synthetic dataset or reuse prior raw files to test IO paths and validations.

In [4]:
import numpy as np
import pandas as pd

dates = pd.date_range('2024-01-01', periods=20, freq='D')
df = pd.read_csv('../data/raw/fraud_kaggle_creditcard_source-kaggle_dataset-creditcardfraud_20250820-192958.csv')
df.head()


Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


## 2) Persist Files
- Save CSV to `data/raw/` and Parquet to `data/processed/` using timestamped filenames.
- Handle missing Parquet engine gracefully with a clear error and continue pipeline.

In [7]:
def ts(): return dt.datetime.now().strftime('%Y%m%d-%H%M%S')



# TODO: Save Parquet
pq_path = PROC / f"fraud_kaggle_creditcard_source-kaggle_dataset-creditcardfraud_{ts()}.parquet"
try:
    df.to_parquet(pq_path)
except Exception as e:
    print('Parquet engine not available. Install pyarrow or fastparquet to complete this step.')
    pq_path = None
pq_path

PosixPath('../data/processed/fraud_kaggle_creditcard_source-kaggle_dataset-creditcardfraud_20250820-213245.parquet')

## 3) Reload and Validate
- Compare shapes and key dtypes.
- Ensure `date` is datetime-like and price columns are numeric.

In [None]:
def validate_loaded(original, reloaded):
    checks = {
        'shape_equal': original.shape == reloaded.shape,
        'amount_is_numeric': pd.api.types.is_numeric_dtype(reloaded['Amount']) if 'Amount' in reloaded.columns else False,
    }
    return checks
csv_path = "../data/raw/fraud_kaggle_creditcard_source-kaggle_dataset-creditcardfraud_20250820-192958.csv"
df_csv = pd.read_csv(csv_path, parse_dates=['Time'])
validate_loaded(df, df_csv)
#looks good

  df_csv = pd.read_csv(csv_path, parse_dates=['Time'])


{'shape_equal': True, 'amount_is_numeric': True}

In [15]:
if pq_path:
    try:
        df_pq = pd.read_parquet(pq_path)
        validate_loaded(df, df_pq)
    except Exception as e:
        print('Parquet read failed:', e)

## 4) Utilities
- Implement `detect_format`, `write_df`, `read_df`.
- Route by suffix; create parent dirs; provide friendly errors for missing Parquet engine.
- Prefer explicit dtype parsing on CSV read (e.g., `parse_dates=['date']`).

In [16]:
import typing as t, pathlib

def detect_format(path: t.Union[str, pathlib.Path]):
    s = str(path).lower()
    if s.endswith('.csv'): return 'csv'
    if s.endswith('.parquet') or s.endswith('.pq') or s.endswith('.parq'): return 'parquet'
    raise ValueError('Unsupported format: ' + s)

def write_df(df: pd.DataFrame, path: t.Union[str, pathlib.Path]):
    p = pathlib.Path(path); p.parent.mkdir(parents=True, exist_ok=True)
    fmt = detect_format(p)
    if fmt == 'csv':
        df.to_csv(p, index=False)
    else:
        try:
            df.to_parquet(p)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e
    return p

def read_df(path: t.Union[str, pathlib.Path]):
    p = pathlib.Path(path)
    fmt = detect_format(p)
    if fmt == 'csv':
        return pd.read_csv(p, parse_dates=['date']) if 'date' in pd.read_csv(p, nrows=0).columns else pd.read_csv(p)
    else:
        try:
            return pd.read_parquet(p)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e

# Demo
p_csv = RAW / f"util_{ts()}.csv"
p_pq  = PROC / f"util_{ts()}.parquet"
write_df(df, p_csv); read_df(p_csv).head()
try:
    write_df(df, p_pq)
    read_df(p_pq).head()
except RuntimeError as e:
    print('Skipping Parquet util demo:', e)

## Documentation & Insights
- Storage strategy: CSV for portability, Parquet for efficient analytics due to column structure and defined type; both stored with timestamps to enable lineage.
- Validation: Compare shapes and critical dtypes on reload to validate
- Practical insight: Parquet is better for efficiency but CSV is easier to use
