In [125]:
import os, pathlib, datetime as dt
import pandas as pd
from dotenv import load_dotenv
from typing import Union

In [126]:
PROJECT_ROOT = pathlib.Path().resolve().parent
load_dotenv(PROJECT_ROOT / ".env")

RAW_DIR = PROJECT_ROOT / os.getenv("DATA_DIR_RAW")
PROC_DIR = PROJECT_ROOT / os.getenv("DATA_DIR_PROCESSED")
RAW_DIR.mkdir(parents=True, exist_ok=True)
PROC_DIR.mkdir(parents=True, exist_ok=True)

print("RAW_DIR:", RAW_DIR)
print("PROC_DIR:", PROC_DIR)

RAW_DIR: D:\文心远\研究生\5040-Bootcamp\project\data\raw
PROC_DIR: D:\文心远\研究生\5040-Bootcamp\project\data\processed


In [127]:
csv_files = sorted(RAW_DIR.glob("api_source-yfinance_symbol-MSFT_*.csv"))
print("Stage4 CSV files found:", [f.name for f in csv_files])

latest_csv = csv_files[-1]
df_raw = pd.read_csv(latest_csv, parse_dates=['date'])
print("Loaded Stage4 CSV:", latest_csv)
print(df_raw.head())

Stage4 CSV files found: ['api_source-yfinance_symbol-MSFT_20250821-215314.csv']
Loaded Stage4 CSV: D:\文心远\研究生\5040-Bootcamp\project\data\raw\api_source-yfinance_symbol-MSFT_20250821-215314.csv
        date        open        high         low       close   adj_close  \
0 2025-02-24  408.510010  409.369995  399.320007  404.000000  403.259674   
1 2025-02-25  401.100006  401.920013  396.700012  397.899994  397.170837   
2 2025-02-26  398.010010  403.600006  394.250000  399.730011  398.997498   
3 2025-02-27  401.269989  405.739990  392.170013  392.529999  391.810699   
4 2025-02-28  392.660004  397.630005  386.570007  396.989990  396.262512   

     volume  
0  26443700  
1  29387400  
2  19619000  
3  21127400  
4  32845700  


In [128]:
def ensure_dir(path: pathlib.Path):
    path.parent.mkdir(parents=True, exist_ok=True)

def detect_format(path: Union[str, pathlib.Path]):
    suf = str(path).lower()
    if suf.endswith('.csv'): return 'csv'
    if suf.endswith('.parquet') or suf.endswith('.pq') or suf.endswith('.parq'): return 'parquet'
    raise ValueError('Unsupported format for: ' + str(path))

def write_df(df: pd.DataFrame, path: Union[str, pathlib.Path]):
    path = pathlib.Path(path)
    ensure_dir(path)
    fmt = detect_format(path)
    if fmt == 'csv':
        df.to_csv(path, index=False)
    elif fmt == 'parquet':
        try:
            df.to_parquet(path)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e
    return path

def read_df(path: Union[str, pathlib.Path]):
    path = pathlib.Path(path)
    fmt = detect_format(path)
    if fmt == 'csv':
        return pd.read_csv(path, parse_dates=['date']) if 'date' in pd.read_csv(path, nrows=0).columns else pd.read_csv(path)
    elif fmt == 'parquet':
        try:
            return pd.read_parquet(path)
        except Exception as e:
            raise RuntimeError('Parquet engine not available. Install pyarrow or fastparquet.') from e


Save to CSV (raw)

In [129]:
def ts():
    return dt.datetime.now().strftime('%Y%m%d-%H%M%S')
    
csv_path = RAW_DIR / f"MSFT_raw_{ts()}.csv"
write_df(df_raw, csv_path)
print("Saved CSV →", csv_path)


Saved CSV → D:\文心远\研究生\5040-Bootcamp\project\data\raw\MSFT_raw_20250821-224835.csv


Save to Parquet (processed)

In [130]:
parq_path = PROC_DIR / f"prices_{ts()}.parquet"
try:
    df_raw.to_parquet(parq_path)  # uses installed engine if available
    print("Saved Parquet →", parq_path)
except Exception as e:
    print("Parquet save failed (engine missing?). Skipping Parquet demo.")
    print("Error:", e)

Saved Parquet → D:\文心远\研究生\5040-Bootcamp\project\data\processed\prices_20250821-224835.parquet


Reload & Validate

In [131]:
def validate_loaded(original: pd.DataFrame, reloaded: pd.DataFrame, cols=('date','adj_close')):
    checks = {
        'shape_equal': original.shape == reloaded.shape,
        'cols_present': all(c in reloaded.columns for c in cols)
    }
    if 'adj_close' in reloaded.columns:
        checks['adj_close_is_numeric'] = pd.api.types.is_numeric_dtype(reloaded['adj_close'])
    if 'date' in reloaded.columns:
        checks['date_is_datetime'] = pd.api.types.is_datetime64_any_dtype(reloaded['date'])
    return checks

df_csv = pd.read_csv(csv_path, parse_dates=['date'])
print('CSV validation:', validate_loaded(df_api, df_csv))

df_parq = pd.read_parquet(parq_path)
print('Parquet validation:', validate_loaded(df_api, df_parq))

CSV validation: {'shape_equal': True, 'cols_present': True, 'adj_close_is_numeric': True, 'date_is_datetime': True}
Parquet validation: {'shape_equal': True, 'cols_present': True, 'adj_close_is_numeric': True, 'date_is_datetime': True}
