In [1]:
import os
import sys
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv

# Ensure project root in sys.path
sys.path.append('/Users/junshao/bootcamp_Jun_Shao')

# Load .env
load_dotenv()

# Verify environment variables
data_dir_raw = os.getenv('DATA_DIR_RAW')
data_dir_processed = os.getenv('DATA_DIR_PROCESSED')
if not all([data_dir_raw, data_dir_processed]):
    raise ValueError('DATA_DIR_RAW or DATA_DIR_PROCESSED not set in .env')

# Define absolute paths
base_dir = '/Users/junshao/bootcamp_Jun_Shao'
raw_dir = os.path.join(base_dir, data_dir_raw)
processed_dir = os.path.join(base_dir, data_dir_processed)

# Create directories
os.makedirs(raw_dir, exist_ok=True)
os.makedirs(processed_dir, exist_ok=True)

# Verify directories
print('Current directory:', os.getcwd())
print(f'Raw directory exists: {os.path.exists(raw_dir)}')
print(f'Processed directory exists: {os.path.exists(processed_dir)}')

Current directory: /Users/junshao/bootcamp_Jun_Shao/homework/hw5/notebooks
Raw directory exists: True
Processed directory exists: True


In [7]:
# Specify input file
input_file = '/Users/junshao/bootcamp_Jun_Shao/homework/hw5/data/raw/api_alphavantage_AAPL_20250820-1804.csv'
if not os.path.exists(input_file):
    raise FileNotFoundError(f'Input file {input_file} not found')

df = pd.read_csv(input_file)

# Ensure correct dtypes
df['date'] = pd.to_datetime(df['date'])
numeric_cols = ['open', 'high', 'low', 'close', 'volume']
df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric)

# Validate input
print('Input Data Columns:', df.columns.tolist())
print('Input Data Info:')
print(df.info())
print('Input Shape:', df.shape)

Input Data Columns: ['date', 'open', 'high', 'low', 'close', 'volume']
Input Data Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 158 entries, 0 to 157
Data columns (total 6 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    158 non-null    datetime64[ns]
 1   open    158 non-null    float64       
 2   high    158 non-null    float64       
 3   low     158 non-null    float64       
 4   close   158 non-null    float64       
 5   volume  158 non-null    int64         
dtypes: datetime64[ns](1), float64(4), int64(1)
memory usage: 7.5 KB
None
Input Shape: (158, 6)


In [15]:
import os
import pandas as pd

# 构造目录路径
raw_dir = '/Users/junshao/bootcamp_Jun_Shao/homework/hw5/data/raw'
processed_dir = '/Users/junshao/bootcamp_Jun_Shao/homework/hw5/data/processed'

# 构造完整的文件名
timestamp = '1804'  # 与输入文件的时间戳一致
csv_filename = os.path.join(raw_dir, f'api_alphavantage_AAPL_20250820-{timestamp}.csv')
parquet_filename = os.path.join(processed_dir, f'api_alphavantage_AAPL_20250820-{timestamp}.parquet')

# 验证 DataFrame
print('DataFrame shape before saving:', df.shape)
print('DataFrame columns:', df.columns.tolist())

# 保存为 CSV
try:
    df.to_csv(csv_filename, index=False)
    print(f'Saved CSV to {csv_filename}')
    print(f'CSV file exists: {os.path.exists(csv_filename)}')
except Exception as e:
    print(f'Error saving CSV to {csv_filename}: {e}')

# 保存为 Parquet
try:
    df.to_parquet(parquet_filename, engine='pyarrow', index=False)
    print(f'Saved Parquet to {parquet_filename}')
    print(f'Parquet file exists: {os.path.exists(parquet_filename)}')
except ImportError:
    print(f'Error: pyarrow not installed. Install with `pip install pyarrow`.')
except Exception as e:
    print(f'Error saving Parquet to {parquet_filename}: {e}')

DataFrame shape before saving: (158, 6)
DataFrame columns: ['date', 'open', 'high', 'low', 'close', 'volume']
Saved CSV to /Users/junshao/bootcamp_Jun_Shao/homework/hw5/data/raw/api_alphavantage_AAPL_20250820-1804.csv
CSV file exists: True
Saved Parquet to /Users/junshao/bootcamp_Jun_Shao/homework/hw5/data/processed/api_alphavantage_AAPL_20250820-1804.parquet
Parquet file exists: True


In [16]:
# Reload files
df_csv = pd.read_csv(csv_filename)
df_csv['date'] = pd.to_datetime(df_csv['date'])  # Convert date to datetime64[ns]
try:
    df_parquet = pd.read_parquet(parquet_filename, engine='pyarrow')
except ImportError:
    print('Error: pyarrow not installed. Install with `pip install pyarrow`.')
    df_parquet = None

# Validation function
def validate_dataframes(df_original, df_csv, df_parquet):
    """Validate reloaded DataFrames against original."""
    errors = []
    
    # Check shapes
    if df_csv.shape != df_original.shape:
        errors.append(f'CSV shape {df_csv.shape} does not match original {df_original.shape}')
    if df_parquet is not None and df_parquet.shape != df_original.shape:
        errors.append(f'Parquet shape {df_parquet.shape} does not match original {df_original.shape}')
    
    # Check critical columns' dtypes
    critical_columns = ['date', 'open', 'high', 'low', 'close', 'volume']
    expected_dtypes = {'date': 'datetime64[ns]', 'open': 'float64', 'high': 'float64', 
                       'low': 'float64', 'close': 'float64', 'volume': 'int64'}
    
    for col in critical_columns:
        if col not in df_csv.columns:
            errors.append(f'CSV missing column: {col}')
        elif df_csv[col].dtype != expected_dtypes[col]:
            errors.append(f'CSV column {col} dtype {df_csv[col].dtype} does not match expected {expected_dtypes[col]}')
        if df_parquet is not None:
            if col not in df_parquet.columns:
                errors.append(f'Parquet missing column: {col}')
            elif df_parquet[col].dtype != expected_dtypes[col]:
                errors.append(f'Parquet column {col} dtype {df_parquet[col].dtype} does not match expected {expected_dtypes[col]}')
    
    return errors

# Run validation
errors = validate_dataframes(df, df_csv, df_parquet)
print('Validation Results:')
if errors:
    for error in errors:
        print(f'- {error}')
else:
    print('All validations passed!')

Validation Results:
All validations passed!


In [17]:
import os
import pandas as pd

# 定义 write_df 和 read_df 函数（保持不变）
def write_df(df, filename, engine='pyarrow'):
    """Write DataFrame to CSV or Parquet based on file suffix."""
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    
    if filename.endswith('.csv'):
        df.to_csv(filename, index=False)
        print(f'Saved {filename}')
    elif filename.endswith('.parquet'):
        try:
            df.to_parquet(filename, engine=engine, index=False)
            print(f'Saved {filename}')
        except ImportError:
            print(f'Error: Cannot save {filename}. Install pyarrow with `pip install pyarrow`.')
    else:
        raise ValueError(f'Unsupported file suffix in {filename}. Use .csv or .parquet')

def read_df(filename, engine='pyarrow'):
    """Read DataFrame from CSV or Parquet based on file suffix."""
    if not os.path.exists(filename):
        raise FileNotFoundError(f'File {filename} does not exist')
    
    if filename.endswith('.csv'):
        df = pd.read_csv(filename)
        if 'date' in df.columns:
            df['date'] = pd.to_datetime(df['date'])  # Convert date to datetime64[ns]
        return df
    elif filename.endswith('.parquet'):
        try:
            return pd.read_parquet(filename, engine=engine)
        except ImportError:
            print(f'Error: Cannot read {filename}. Install pyarrow with `pip install pyarrow`.')
            return None
    else:
        raise ValueError(f'Unsupported file suffix in {filename}. Use .csv or .parquet')

# 构造相对路径（基于 notebooks/ 目录）
raw_dir = os.path.join('..', 'data', 'raw')
processed_dir = os.path.join('..', 'data', 'processed')

# 构造完整的文件名
timestamp = '1804'
csv_filename = os.path.join(raw_dir, f'api_alphavantage_AAPL_20250820-{timestamp}.csv')
parquet_filename = os.path.join(processed_dir, f'api_alphavantage_AAPL_20250820-{timestamp}.parquet')

# 测试工具函数
write_df(df, csv_filename)
write_df(df, parquet_filename, engine='pyarrow')

df_csv_util = read_df(csv_filename)
df_parquet_util = read_df(parquet_filename, engine='pyarrow')

# 验证加载的数据
errors_util = validate_dataframes(df, df_csv_util, df_parquet_util)
print('Utility Validation Results:')
if errors_util:
    for error in errors_util:
        print(f'- {error}')
else:
    print('All utility validations passed!')

Saved ../data/raw/api_alphavantage_AAPL_20250820-1804.csv
Saved ../data/processed/api_alphavantage_AAPL_20250820-1804.parquet
Utility Validation Results:
All utility validations passed!
