In [12]:
# TSLA Stock Analysis - Stages 03-06
# Python Fundamentals, Data Acquisition, Storage, and Preprocessing

import os
import pandas as pd
import numpy as np
import requests
from dotenv import load_dotenv
import yfinance as yf
from sklearn.preprocessing import StandardScaler
import pathlib
import datetime as dt
from typing import Union, Dict, Any, List

# ==================== STAGE 03: PYTHON FUNDAMENTALS ====================
print("=== STAGE 03: PYTHON FUNDAMENTALS ===")

# Utility Functions (reusable across stages)
def ts() -> str:
    """Generate timestamp string for consistent filenames."""
    return dt.datetime.now().strftime('%Y%m%d-%H%M%S')

def validate_data(df: pd.DataFrame, required_columns: list) -> Dict[str, Any]:
    """Validate DataFrame structure and data quality."""
    validation = {
        'missing_columns': [col for col in required_columns if col not in df.columns],
        'shape': df.shape,
        'na_total': int(df.isna().sum().sum()),
        'na_by_column': df.isna().sum().to_dict(),
        'dtypes': df.dtypes.to_dict()
    }
    return validation

def detect_format(path: Union[str, pathlib.Path]) -> str:
    """Detect file format from extension."""
    path_str = str(path).lower()
    if path_str.endswith('.csv'):
        return 'csv'
    if any(path_str.endswith(ext) for ext in ['.parquet', '.pq', '.parq']):
        return 'parquet'
    raise ValueError(f'Unsupported format: {path_str}')

def read_df(path: Union[str, pathlib.Path]) -> pd.DataFrame:
    """Read DataFrame from file with automatic format detection."""
    path_obj = pathlib.Path(path)
    fmt = detect_format(path_obj)
    
    if fmt == 'csv':
        try:
            sample = pd.read_csv(path_obj, nrows=5)
            date_cols = [col for col in sample.columns if 'date' in col.lower()]
            if date_cols:
                return pd.read_csv(path_obj, parse_dates=date_cols)
            return pd.read_csv(path_obj)
        except Exception as e:
            raise RuntimeError(f'Failed to read CSV: {e}')
    
    elif fmt == 'parquet':
        try:
            # Specify engine explicitly here too
            return pd.read_parquet(path_obj, engine='pyarrow')
        except Exception as e:
            raise RuntimeError(f'Failed to read Parquet: {e}. Install: pip install pyarrow')

def write_df(df: pd.DataFrame, path: Union[str, pathlib.Path]) -> pathlib.Path:
    path_obj = pathlib.Path(path)
    path_obj.parent.mkdir(parents=True, exist_ok=True)
    fmt = detect_format(path_obj)
    
    if fmt == 'csv':
        df.to_csv(path_obj, index=False)
    elif fmt == 'parquet':
        try:
            # EXPLICITLY specify engine and compression
            df.to_parquet(
                path_obj, 
                engine='pyarrow',  # ← Explicit engine
                compression='snappy'  # ← Optional: add compression
            )
        except Exception as e:
            # More helpful error message
            raise RuntimeError(f'Failed to write Parquet: {e}. Install: pip install pyarrow')
    
    return path_obj

def calculate_technical_indicators(df: pd.DataFrame, price_col: str = 'close') -> pd.DataFrame:
    """Calculate common technical indicators for stock data."""
    df = df.copy()
    
    # Simple Moving Averages
    df['sma_10'] = df[price_col].rolling(window=10).mean()
    df['sma_20'] = df[price_col].rolling(window=20).mean()
    df['sma_50'] = df[price_col].rolling(window=50).mean()
    
    # Price Returns
    df['daily_return'] = df[price_col].pct_change()
    df['weekly_return'] = df[price_col].pct_change(5)
    
    # Volatility
    df['volatility_10'] = df['daily_return'].rolling(window=10).std()
    df['volatility_20'] = df['daily_return'].rolling(window=20).std()
    
    # Price vs SMA ratios
    df['price_sma_10_ratio'] = df[price_col] / df['sma_10']
    df['price_sma_20_ratio'] = df[price_col] / df['sma_20']
    
    return df

def ensure_data_directories():
    """Ensure all data directories exist."""
    directories = ['data/raw', 'data/processed']
    for dir_path in directories:
        pathlib.Path(dir_path).mkdir(parents=True, exist_ok=True)

def get_latest_file(directory: Union[str, pathlib.Path], pattern: str = "*") -> pathlib.Path:
    """Get the most recent file in a directory."""
    dir_path = pathlib.Path(directory)
    files = list(dir_path.glob(pattern))
    if not files:
        raise FileNotFoundError(f"No files found in {directory} matching {pattern}")
    return max(files, key=lambda x: x.stat().st_mtime)

print("Utility functions defined and ready for use\n")

# ==================== STAGE 04: DATA ACQUISITION ====================
print("=== STAGE 04: DATA ACQUISITION ===")

# Load environment variables
load_dotenv()
SYMBOL = os.getenv('YFINANCE_SYMBOL', 'TSLA')
ALPHAVANTAGE_API_KEY = os.getenv('ALPHAVANTAGE_API_KEY')

def fetch_from_yfinance(symbol, period='6mo', interval='1d'):
    """Fetch data from Yahoo Finance"""
    try:
        ticker = yf.Ticker(symbol)
        df = ticker.history(period=period, interval=interval)
        df = df.reset_index()
        df.columns = [col.lower() for col in df.columns]
        return df
    except Exception as e:
        print(f"Yahoo Finance error: {e}")
        return None

def fetch_from_alphavantage(symbol, api_key):
    """Fetch data from Alpha Vantage API"""
    if not api_key:
        return None
    
    url = 'https://www.alphavantage.co/query'
    params = {
        'function': 'TIME_SERIES_DAILY_ADJUSTED',
        'symbol': symbol,
        'outputsize': 'compact',
        'apikey': api_key
    }
    
    try:
        response = requests.get(url, params=params, timeout=30)
        response.raise_for_status()
        data = response.json()
        
        time_series = data.get('Time Series (Daily)', {})
        df = pd.DataFrame.from_dict(time_series, orient='index')
        df = df.reset_index().rename(columns={'index': 'date'})
        
        column_mapping = {
            '1. open': 'open', '2. high': 'high', '3. low': 'low',
            '4. close': 'close', '5. adjusted close': 'adj_close',
            '6. volume': 'volume'
        }
        df = df.rename(columns=column_mapping)
        
        numeric_cols = ['open', 'high', 'low', 'close', 'adj_close', 'volume']
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors='coerce')
            
        df['date'] = pd.to_datetime(df['date'])
        return df
        
    except Exception as e:
        print(f"Alpha Vantage API error: {e}")
        return None

# Acquire data
df = fetch_from_yfinance(SYMBOL)

if df is None or df.empty:
    df = fetch_from_alphavantage(SYMBOL, ALPHAVANTAGE_API_KEY)

if df is not None and not df.empty:
    validation = validate_data(df, ['date', 'open', 'high', 'low', 'close', 'volume'])
    print(f"✓ Data acquired: {df.shape[0]} rows, {df.shape[1]} columns")
    print(f"   Missing values: {validation['na_total']}")
else:
    print("Creating sample data for demonstration...")
    dates = pd.date_range('2023-01-01', periods=100, freq='D')
    df = pd.DataFrame({
        'date': dates,
        'open': 150 + np.random.randn(100).cumsum(),
        'high': 155 + np.random.randn(100).cumsum(),
        'low': 145 + np.random.randn(100).cumsum(),
        'close': 150 + np.random.randn(100).cumsum(),
        'volume': np.random.randint(1000000, 5000000, 100)
    })
    print("Sample data created")

print("\nFirst few rows:")
display(df.head())

# ==================== STAGE 05: DATA STORAGE ====================
print("\n=== STAGE 05: DATA STORAGE ===")

# Ensure directories exist
ensure_data_directories()
RAW_DIR = os.getenv('DATA_DIR_RAW', 'C:/Users/박서아/bootcamp_andrew_song/project/data/raw')
PROCESSED_DIR = os.getenv('DATA_DIR_PROCESSED', 'C:/Users/박서아/bootcamp_andrew_song/project/data/processed')

# Save raw data
raw_filename = f"tsla_raw_{ts()}.csv"
raw_path = f"{RAW_DIR}/{raw_filename}"
write_df(df, raw_path)

print(f"✓ Raw data saved: {raw_path}")

processed_filename = f"tsla_{ts()}.parquet"
processed_path = f"{PROCESSED_DIR}/{processed_filename}"
write_df(df_clean, processed_path)
print(f"✓ Raw data saved: {processed_path}")

# ==================== STAGE 06: DATA PREPROCESSING ====================
print("\n=== STAGE 06: DATA PREPROCESSING ===")

class DataCleaner:
    def __init__(self):
        self.required_columns = ['date', 'open', 'high', 'low', 'close', 'volume']
    
    def drop_missing_columns(self, df: pd.DataFrame, threshold: float = 0.5) -> pd.DataFrame:
        missing_ratio = df.isnull().mean()
        columns_to_drop = missing_ratio[missing_ratio > threshold].index.tolist()
        return df.drop(columns=columns_to_drop)
    
    def fill_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
        df_clean = df.copy()
        numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
        
        for col in numeric_cols:
            if df_clean[col].isnull().any():
                df_clean[col] = df_clean[col].fillna(df_clean[col].median())
        
        if 'date' in df_clean.columns:
            df_clean = df_clean.sort_values('date').ffill()
        
        return df_clean
    
    def add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
        return calculate_technical_indicators(df)
    
    def create_target_variable(self, df: pd.DataFrame) -> pd.DataFrame:
        df_target = df.copy()
        df_target['next_day_close'] = df_target['close'].shift(-1)
        df_target['target'] = (df_target['next_day_close'] > df_target['close']).astype(int)
        return df_target.dropna(subset=['target'])
    
    def normalize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        numeric_cols = [col for col in numeric_cols if col not in ['target', 'next_day_close']]
        
        scaler = StandardScaler()
        df_normalized = df.copy()
        df_normalized[numeric_cols] = scaler.fit_transform(df_normalized[numeric_cols])
        
        return df_normalized, scaler
    
    def clean_data(self, df: pd.DataFrame) -> Dict[str, Any]:
        results = {}
        initial_validation = validate_data(df, self.required_columns)
        results['initial_validation'] = initial_validation
        
        df_clean = self.drop_missing_columns(df)
        df_clean = self.fill_missing_values(df_clean)
        df_clean = self.add_technical_indicators(df_clean)
        df_clean = self.create_target_variable(df_clean)
        df_clean, scaler = self.normalize_data(df_clean)
        
        results['final_validation'] = validate_data(df_clean, [])
        results['scaler'] = scaler
        
        return df_clean, results

# Preprocess the data
cleaner = DataCleaner()
df_clean, results = cleaner.clean_data(df)

# Save processed data as CSV
processed_filename = f"tsla_processed.csv"
processed_path = f"{PROCESSED_DIR}/{processed_filename}"
write_df(df_clean, processed_path)  

print(f"✓ Processed data saved: {processed_path}")

# Display results
print(f"\nPreprocessing results:")
print(f"Rows: {results['initial_validation']['shape'][0]} → {results['final_validation']['shape'][0]}")
print(f"Columns: {results['initial_validation']['shape'][1]} → {results['final_validation']['shape'][1]}")
print(f"Missing values: {results['initial_validation']['na_total']} → {results['final_validation']['na_total']}")

print("\nProcessed data sample:")
display(df_clean.head())


=== STAGE 03: PYTHON FUNDAMENTALS ===
Utility functions defined and ready for use

=== STAGE 04: DATA ACQUISITION ===
✓ Data acquired: 126 rows, 8 columns
   Missing values: 0

First few rows:


Unnamed: 0,date,open,high,low,close,volume,dividends,stock splits
0,2025-02-24 00:00:00-05:00,338.140015,342.399994,324.700012,330.529999,76052300,0.0,0.0
1,2025-02-25 00:00:00-05:00,327.019989,328.890015,297.25,302.799988,134228800,0.0,0.0
2,2025-02-26 00:00:00-05:00,303.709991,309.0,288.040009,290.799988,100118300,0.0,0.0
3,2025-02-27 00:00:00-05:00,291.160004,297.230011,280.880005,281.950012,101748200,0.0,0.0
4,2025-02-28 00:00:00-05:00,279.5,293.880005,273.600006,292.980011,115697000,0.0,0.0



=== STAGE 05: DATA STORAGE ===
✓ Raw data saved: C:/Users/박서아/bootcamp_andrew_song/project/data/raw/tsla_raw_20250823-222146.csv
✓ Raw data saved: C:/Users/박서아/bootcamp_andrew_song/project/data/processed/tsla_20250823-222146.parquet

=== STAGE 06: DATA PREPROCESSING ===
✓ Processed data saved: C:/Users/박서아/bootcamp_andrew_song/project/data/processed/tsla_processed.csv

Preprocessing results:
Rows: 126 → 126
Columns: 8 → 19
Missing values: 0 → 142

Processed data sample:


Unnamed: 0,date,open,high,low,close,volume,dividends,stock splits,sma_10,sma_20,sma_50,daily_return,weekly_return,volatility_10,volatility_20,price_sma_10_ratio,price_sma_20_ratio,next_day_close,target
0,2025-02-24 00:00:00-05:00,1.057447,1.002706,0.884151,0.861557,-0.986105,0.0,0.0,,,,,,,,,,302.799988,0
1,2025-02-25 00:00:00-05:00,0.756665,0.625205,0.155447,0.097643,0.575563,0.0,0.0,,,,-1.80492,,,,,,290.799988,0
2,2025-02-26 00:00:00-05:00,0.126159,0.06943,-0.089047,-0.232936,-0.340086,0.0,0.0,,,,-0.867485,,,,,,281.950012,0
3,2025-02-27 00:00:00-05:00,-0.213303,-0.259451,-0.27912,-0.476738,-0.296334,0.0,0.0,,,,-0.672717,,,,,,292.980011,1
4,2025-02-28 00:00:00-05:00,-0.528691,-0.353059,-0.472379,-0.172881,0.078102,0.0,0.0,,,,0.800258,,,,,,284.649994,0
