In [1]:
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from dotenv import load_dotenv
import warnings
warnings.filterwarnings('ignore')

# Load environment variables
load_dotenv()

print("🚀 Data Storage Assignment - Implementation")
print("=" * 50)

# =============================================================================
# TASK 1: SAVE IN TWO FORMATS
# =============================================================================

print("\n📁 TASK 1: Save DataFrame in Two Formats")
print("-" * 40)

# TODO: Load environment variables for data directories
DATA_DIR_RAW = os.getenv("DATA_DIR_RAW", "data/raw")
DATA_DIR_PROCESSED = os.getenv("DATA_DIR_PROCESSED", "data/processed")

print(f"📂 Raw data directory: {DATA_DIR_RAW}")
print(f"📂 Processed data directory: {DATA_DIR_PROCESSED}")

# Create directories if they don't exist
os.makedirs(DATA_DIR_RAW, exist_ok=True)
os.makedirs(DATA_DIR_PROCESSED, exist_ok=True)

# Create sample dataset using our enhanced stock data fetching
print("\n📊 Creating sample dataset...")

# Generate sample stock data (since we may not have API keys in all environments)
np.random.seed(42)  # For reproducible data
dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
n_days = len(dates)

# Simulate realistic stock data for MSFT
base_price = 250.0
daily_returns = np.random.normal(0.0005, 0.02, n_days)  # Small daily returns with volatility
prices = [base_price]

for return_rate in daily_returns[1:]:
    prices.append(prices[-1] * (1 + return_rate))

# Create comprehensive DataFrame
sample_data = {
    'date': dates,
    'symbol': 'MSFT',
    'open': [p * np.random.uniform(0.995, 1.005) for p in prices],
    'high': [p * np.random.uniform(1.01, 1.03) for p in prices],
    'low': [p * np.random.uniform(0.97, 0.99) for p in prices],
    'close': prices,
    'volume': np.random.randint(20000000, 100000000, n_days),
    'data_source': 'simulated',
    'fetch_timestamp': datetime.now()
}

df_stock = pd.DataFrame(sample_data)

# Add some calculated fields for processing
df_stock['daily_return'] = df_stock['close'].pct_change()
df_stock['volatility_20d'] = df_stock['daily_return'].rolling(20).std()
df_stock['sma_20'] = df_stock['close'].rolling(20).mean()
df_stock['sma_50'] = df_stock['close'].rolling(50).mean()

print(f"✅ Created dataset with shape: {df_stock.shape}")
print(f"📅 Date range: {df_stock['date'].min()} to {df_stock['date'].max()}")
print(f"🏷️  Columns: {list(df_stock.columns)}")

# TODO: Save to data/raw/ as CSV
csv_path = os.path.join(DATA_DIR_RAW, "stock_data_raw.csv")
df_stock.to_csv(csv_path, index=False)
print(f"💾 Saved raw data as CSV: {csv_path}")

# TODO: Save to data/processed/ as Parquet
parquet_path = os.path.join(DATA_DIR_PROCESSED, "stock_data_processed.parquet")
try:
    df_stock.to_parquet(parquet_path, index=False, engine='pyarrow')
    print(f"💾 Saved processed data as Parquet: {parquet_path}")
except ImportError as e:
    print(f"❌ Parquet save failed - missing engine: {e}")
    print("💡 Install with: pip install pyarrow")
except Exception as e:
    print(f"❌ Parquet save failed: {e}")

# =============================================================================
# TASK 2: RELOAD AND VALIDATE
# =============================================================================

print(f"\n🔄 TASK 2: Reload and Validate Data")
print("-" * 40)

def validate_dataframe(df_original, df_reloaded, name="DataFrame"):
    """
    Validate that reloaded DataFrame matches original specifications.
    
    Args:
        df_original: Original DataFrame
        df_reloaded: Reloaded DataFrame to validate
        name: Name for reporting
    
    Returns:
        bool: True if validation passes
    """
    print(f"\n🔍 Validating {name}...")
    
    validation_results = {
        'shape_match': False,
        'columns_match': False,
        'dtypes_preserved': False,
        'data_integrity': False
    }
    
    # Check shape
    if df_original.shape == df_reloaded.shape:
        validation_results['shape_match'] = True
        print(f"✅ Shape matches: {df_reloaded.shape}")
    else:
        print(f"❌ Shape mismatch: Original {df_original.shape} vs Reloaded {df_reloaded.shape}")
        return False
    
    # Check columns
    if list(df_original.columns) == list(df_reloaded.columns):
        validation_results['columns_match'] = True
        print(f"✅ All {len(df_reloaded.columns)} columns present")
    else:
        missing_cols = set(df_original.columns) - set(df_reloaded.columns)
        extra_cols = set(df_reloaded.columns) - set(df_original.columns)
        print(f"❌ Column mismatch:")
        if missing_cols:
            print(f"   Missing: {missing_cols}")
        if extra_cols:
            print(f"   Extra: {extra_cols}")
        return False
    
    # Check critical dtypes
    critical_columns = {
        'date': 'datetime64[ns]',
        'close': 'float64',
        'volume': 'int64',
        'symbol': 'object'
    }
    
    dtype_issues = []
    for col, expected_dtype in critical_columns.items():
        if col in df_reloaded.columns:
            actual_dtype = str(df_reloaded[col].dtype)
            # Handle datetime comparison (different representations possible)
            if 'datetime' in expected_dtype and 'datetime' in actual_dtype:
                continue
            elif actual_dtype != expected_dtype:
                dtype_issues.append(f"{col}: expected {expected_dtype}, got {actual_dtype}")
    
    if not dtype_issues:
        validation_results['dtypes_preserved'] = True
        print("✅ Critical dtypes preserved")
    else:
        print("⚠️  Dtype issues found:")
        for issue in dtype_issues:
            print(f"   {issue}")
    
    # Check data integrity (sample of numeric columns)
    numeric_cols = ['open', 'high', 'low', 'close']
    integrity_check = True
    
    for col in numeric_cols:
        if col in df_original.columns and col in df_reloaded.columns:
            orig_sum = df_original[col].sum()
            reload_sum = df_reloaded[col].sum()
            if abs(orig_sum - reload_sum) > 0.01:  # Small tolerance for float precision
                print(f"❌ Data integrity issue in {col}: {orig_sum} vs {reload_sum}")
                integrity_check = False
    
    if integrity_check:
        validation_results['data_integrity'] = True
        print("✅ Data integrity verified")
    
    # Overall result
    all_passed = all(validation_results.values())
    status = "✅ PASSED" if all_passed else "❌ FAILED"
    print(f"\n📋 Validation Summary for {name}: {status}")
    
    return all_passed

# TODO: Reload CSV file
print("📥 Reloading CSV file...")
try:
    df_csv_reloaded = pd.read_csv(csv_path)
    # Convert date column back to datetime (CSV loses this info)
    df_csv_reloaded['date'] = pd.to_datetime(df_csv_reloaded['date'])
    df_csv_reloaded['fetch_timestamp'] = pd.to_datetime(df_csv_reloaded['fetch_timestamp'])
    print(f"✅ CSV reloaded successfully: {df_csv_reloaded.shape}")
except Exception as e:
    print(f"❌ Failed to reload CSV: {e}")
    df_csv_reloaded = pd.DataFrame()

# TODO: Reload Parquet file  
print("\n📥 Reloading Parquet file...")
try:
    df_parquet_reloaded = pd.read_parquet(parquet_path, engine='pyarrow')
    print(f"✅ Parquet reloaded successfully: {df_parquet_reloaded.shape}")
except ImportError as e:
    print(f"❌ Parquet reload failed - missing engine: {e}")
    print("💡 Install with: pip install pyarrow")
    df_parquet_reloaded = pd.DataFrame()
except Exception as e:
    print(f"❌ Failed to reload Parquet: {e}")
    df_parquet_reloaded = pd.DataFrame()

# TODO: Validate both reloaded DataFrames
if not df_csv_reloaded.empty:
    csv_valid = validate_dataframe(df_stock, df_csv_reloaded, "CSV")
    
if not df_parquet_reloaded.empty:
    parquet_valid = validate_dataframe(df_stock, df_parquet_reloaded, "Parquet")

# =============================================================================
# TASK 3: REFACTOR TO UTILITIES  
# =============================================================================

print(f"\n🔧 TASK 3: Utility Functions")
print("-" * 40)

def write_df(df, filepath, **kwargs):
    """
    Write DataFrame to file, routing by file extension.
    
    Args:
        df: pandas DataFrame to write
        filepath: target file path
        **kwargs: additional arguments for pandas write methods
    
    Returns:
        str: filepath if successful
    
    Raises:
        ValueError: for unsupported file extensions
        ImportError: for missing Parquet engine
    """
    # Ensure directory exists
    directory = os.path.dirname(filepath)
    if directory:
        os.makedirs(directory, exist_ok=True)
    
    # Get file extension
    _, ext = os.path.splitext(filepath)
    ext = ext.lower()
    
    try:
        if ext == '.csv':
            df.to_csv(filepath, index=False, **kwargs)
            print(f"💾 Written CSV: {filepath} ({len(df)} rows)")
            
        elif ext == '.parquet':
            try:
                # Default to pyarrow, fallback to fastparquet
                engine = kwargs.get('engine', 'pyarrow')
                df.to_parquet(filepath, index=False, engine=engine, **kwargs)
                print(f"💾 Written Parquet: {filepath} ({len(df)} rows)")
            except ImportError as e:
                raise ImportError(
                    f"Missing Parquet engine. Install with: pip install pyarrow\n"
                    f"Original error: {e}"
                )
                
        elif ext in ['.xlsx', '.xls']:
            df.to_excel(filepath, index=False, **kwargs)
            print(f"💾 Written Excel: {filepath} ({len(df)} rows)")
            
        elif ext == '.json':
            df.to_json(filepath, **kwargs)
            print(f"💾 Written JSON: {filepath} ({len(df)} rows)")
            
        else:
            raise ValueError(f"Unsupported file extension: {ext}")
        
        return filepath
        
    except Exception as e:
        print(f"❌ Failed to write {filepath}: {str(e)}")
        raise

def read_df(filepath, **kwargs):
    """
    Read DataFrame from file, routing by file extension.
    
    Args:
        filepath: source file path
        **kwargs: additional arguments for pandas read methods
    
    Returns:
        pandas.DataFrame: loaded DataFrame
    
    Raises:
        FileNotFoundError: if file doesn't exist
        ValueError: for unsupported file extensions
        ImportError: for missing Parquet engine
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")
    
    # Get file extension
    _, ext = os.path.splitext(filepath)
    ext = ext.lower()
    
    try:
        if ext == '.csv':
            df = pd.read_csv(filepath, **kwargs)
            print(f"📥 Read CSV: {filepath} ({len(df)} rows)")
            
        elif ext == '.parquet':
            try:
                engine = kwargs.get('engine', 'pyarrow')
                df = pd.read_parquet(filepath, engine=engine, **kwargs)
                print(f"📥 Read Parquet: {filepath} ({len(df)} rows)")
            except ImportError as e:
                raise ImportError(
                    f"Missing Parquet engine. Install with: pip install pyarrow\n"
                    f"Original error: {e}"
                )
                
        elif ext in ['.xlsx', '.xls']:
            df = pd.read_excel(filepath, **kwargs)
            print(f"📥 Read Excel: {filepath} ({len(df)} rows)")
            
        elif ext == '.json':
            df = pd.read_json(filepath, **kwargs)
            print(f"📥 Read JSON: {filepath} ({len(df)} rows)")
            
        else:
            raise ValueError(f"Unsupported file extension: {ext}")
        
        return df
        
    except Exception as e:
        print(f"❌ Failed to read {filepath}: {str(e)}")
        raise

# Test utility functions
print("\n🧪 Testing utility functions...")

# Test write_df
test_csv_path = os.path.join(DATA_DIR_RAW, "test_utilities.csv")
test_parquet_path = os.path.join(DATA_DIR_PROCESSED, "test_utilities.parquet")

try:
    # Test CSV
    write_df(df_stock.head(10), test_csv_path)
    
    # Test Parquet
    write_df(df_stock.head(10), test_parquet_path)
    
    # Test read_df
    test_df_csv = read_df(test_csv_path)
    test_df_parquet = read_df(test_parquet_path)
    
    print("✅ Utility functions work correctly!")
    
except Exception as e:
    print(f"❌ Utility function test failed: {e}")

🚀 Data Storage Assignment - Implementation

📁 TASK 1: Save DataFrame in Two Formats
----------------------------------------
📂 Raw data directory: data/raw
📂 Processed data directory: data/processed

📊 Creating sample dataset...
✅ Created dataset with shape: (365, 13)
📅 Date range: 2023-01-01 00:00:00 to 2023-12-31 00:00:00
🏷️  Columns: ['date', 'symbol', 'open', 'high', 'low', 'close', 'volume', 'data_source', 'fetch_timestamp', 'daily_return', 'volatility_20d', 'sma_20', 'sma_50']
💾 Saved raw data as CSV: data/raw\stock_data_raw.csv
💾 Saved processed data as Parquet: data/processed\stock_data_processed.parquet

🔄 TASK 2: Reload and Validate Data
----------------------------------------
📥 Reloading CSV file...
✅ CSV reloaded successfully: (365, 13)

📥 Reloading Parquet file...
✅ Parquet reloaded successfully: (365, 13)

🔍 Validating CSV...
✅ Shape matches: (365, 13)
✅ All 13 columns present
✅ Critical dtypes preserved
✅ Data integrity verified

📋 Validation Summary for CSV: ✅ PASSED



In [2]:
test_df_csv

Unnamed: 0,date,symbol,open,high,low,close,volume,data_source,fetch_timestamp,daily_return,volatility_20d,sma_20,sma_50
0,2023-01-01,MSFT,249.30899,253.381935,246.248259,250.0,76728215,simulated,2025-08-20 16:55:58.540384,,,,
1,2023-01-02,MSFT,250.589112,254.414209,242.599626,249.433678,63440658,simulated,2025-08-20 16:55:58.540384,-0.002265,,,
2,2023-01-03,MSFT,251.55628,257.435396,249.039436,252.789502,51886947,simulated,2025-08-20 16:55:58.540384,0.013454,,,
3,2023-01-04,MSFT,261.840595,267.990646,252.92569,260.616016,95111483,simulated,2025-08-20 16:55:58.540384,0.030961,,,
4,2023-01-05,MSFT,258.340223,264.002112,251.854899,259.525842,85174694,simulated,2025-08-20 16:55:58.540384,-0.004183,,,
5,2023-01-06,MSFT,259.451184,264.025664,252.359782,258.440313,68532681,simulated,2025-08-20 16:55:58.540384,-0.004183,,,
6,2023-01-07,MSFT,266.806066,272.772404,261.33695,266.732178,27667857,simulated,2025-08-20 16:55:58.540384,0.032084,,,
7,2023-01-08,MSFT,272.29527,273.740091,267.005733,270.959535,91537554,simulated,2025-08-20 16:55:58.540384,0.015849,,,
8,2023-01-09,MSFT,267.40627,274.800222,264.164309,268.550843,81279200,simulated,2025-08-20 16:55:58.540384,-0.008889,,,
9,2023-01-10,MSFT,271.745486,275.282299,265.873378,271.599218,95998646,simulated,2025-08-20 16:55:58.540384,0.011351,,,
