In [1]:
from utils import fetch_historical_prices

df = fetch_historical_prices("AAPL", "2024-01-01", "2024-12-31")
df.head()



Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits
0,2024-01-02 00:00:00-05:00,185.399081,186.677021,182.169586,183.903214,82488700,0.0,0.0
1,2024-01-03 00:00:00-05:00,182.496496,184.14097,181.713879,182.526215,58414500,0.0,0.0
2,2024-01-04 00:00:00-05:00,180.445875,181.377083,179.187767,180.20813,71983600,0.0,0.0
3,2024-01-05 00:00:00-05:00,180.28739,181.050175,178.484409,179.484955,62379700,0.0,0.0
4,2024-01-08 00:00:00-05:00,180.386422,183.863594,179.801946,183.823959,59144500,0.0,0.0


In [4]:
import pandas as pd
import numpy as np

def check_market_moved_before_date(df: pd.DataFrame, target_date: str) -> dict:
    """
    Determines whether a stock has already made a significant move before a given sentiment-spike date.
    
    Parameters:
    -----------
    df : pd.DataFrame
        DataFrame with columns: Open, High, Low, Close, Volume (indexed by date)
    target_date : str
        The sentiment-spike date to check (format: 'YYYY-MM-DD')
    
    Returns:
    --------
    dict : Dictionary containing all intermediate signals and the final market_moved_flag,
           or None if calculation cannot be performed
    """
    try:
        # Handle multi-level columns if present
        if isinstance(df.columns, pd.MultiIndex):
            # Flatten multi-level columns - take the first level (Price names)
            df = df.copy()
            df.columns = [col[0] if isinstance(col, tuple) else col for col in df.columns]
        
        # Ensure df has a datetime index
        if not isinstance(df.index, pd.DatetimeIndex):
            df.index = pd.to_datetime(df.index)
        
        # Convert target_date to datetime
        target_dt = pd.to_datetime(target_date)
        
        # Check if target_date exists in the dataframe
        if target_dt not in df.index:
            print(f"Warning: {target_date} not in dataframe")
            return None
        
        # Get data up to and including target_date
        df_until_target = df[df.index <= target_dt].copy()
        
        # Check minimum required history (need at least 20 days for rolling stats)
        if len(df_until_target) < 20:
            print(f"Warning: Insufficient history (need at least 20 days, got {len(df_until_target)})")
            return None
        
        # 1. Calculate daily returns
        df_until_target['returns'] = df_until_target['Close'].pct_change()
        
        # 2. Compute 3-day percentage price change (t-3 to t)
        if len(df_until_target) < 3:
            return None
        close_t = df_until_target.loc[target_dt, 'Close']
        close_t_minus_3 = df_until_target['Close'].iloc[-4] if len(df_until_target) >= 4 else np.nan
        pct_change_3d = ((close_t - close_t_minus_3) / close_t_minus_3) * 100 if not pd.isna(close_t_minus_3) else np.nan
        
        # 3. Compute abnormal return z-score (20-day rolling)
        rolling_mean = df_until_target['returns'].rolling(window=20, min_periods=20).mean()
        rolling_std = df_until_target['returns'].rolling(window=20, min_periods=20).std()
        df_until_target['ret_z'] = (df_until_target['returns'] - rolling_mean) / rolling_std
        ret_z = df_until_target.loc[target_dt, 'ret_z']
        
        # 4. Compute volatility expansion (3-day vs 20-day std)
        std_3d = df_until_target['returns'].iloc[-3:].std() if len(df_until_target) >= 3 else np.nan
        std_20d = df_until_target['returns'].iloc[-20:].std() if len(df_until_target) >= 20 else np.nan
        vol_expansion = std_3d / std_20d if not pd.isna(std_20d) and std_20d > 0 else np.nan
        
        # 5. Compute abnormal volume z-score (20-day rolling)
        vol_rolling_mean = df_until_target['Volume'].rolling(window=20, min_periods=20).mean()
        vol_rolling_std = df_until_target['Volume'].rolling(window=20, min_periods=20).std()
        df_until_target['vol_z'] = (df_until_target['Volume'] - vol_rolling_mean) / vol_rolling_std
        vol_z = df_until_target.loc[target_dt, 'vol_z']
        
        # 6. Compute ATR-14 using True Range
        df_until_target['prev_close'] = df_until_target['Close'].shift(1)
        
        # Calculate True Range components
        hl = df_until_target['High'] - df_until_target['Low']
        hc = (df_until_target['High'] - df_until_target['prev_close']).abs()
        lc = (df_until_target['Low'] - df_until_target['prev_close']).abs()
        
        # True Range is the maximum of the three
        df_until_target['tr'] = pd.concat([hl, hc, lc], axis=1).max(axis=1)
        df_until_target['atr_14'] = df_until_target['tr'].rolling(window=14, min_periods=14).mean()
        atr_14 = df_until_target.loc[target_dt, 'atr_14']
        
        # 7. Compute ATR-based move (3-day absolute price move / ATR-14)
        abs_3d_move = abs(close_t - close_t_minus_3) if not pd.isna(close_t_minus_3) else np.nan
        atr_move = abs_3d_move / atr_14 if not pd.isna(atr_14) and atr_14 > 0 else np.nan
        
        # 8. Combine signals into market_moved_flag
        market_moved_flag = False
        
        if not pd.isna(ret_z) and abs(ret_z) >= 2:
            market_moved_flag = True
        if not pd.isna(vol_z) and abs(vol_z) >= 2:
            market_moved_flag = True
        if not pd.isna(vol_expansion) and vol_expansion >= 1.5:
            market_moved_flag = True
        if not pd.isna(atr_move) and atr_move >= 1.5:
            market_moved_flag = True
        if not pd.isna(pct_change_3d) and abs(pct_change_3d) >= 4.0:
            market_moved_flag = True
        
        # 9. Return all intermediate values and final flag
        return {
            'target_date': target_date,
            'pct_change_3d': round(float(pct_change_3d), 2) if not pd.isna(pct_change_3d) else None,
            'ret_z': round(float(ret_z), 2) if not pd.isna(ret_z) else None,
            'vol_z': round(float(vol_z), 2) if not pd.isna(vol_z) else None,
            'vol_expansion': round(float(vol_expansion), 2) if not pd.isna(vol_expansion) else None,
            'atr_14': round(float(atr_14), 2) if not pd.isna(atr_14) else None,
            'atr_move': round(float(atr_move), 2) if not pd.isna(atr_move) else None,
            'market_moved_flag': market_moved_flag
        }
        
    except Exception as e:
        print(f"Error in check_market_moved_before_date: {str(e)}")
        return None


# Example usage:
result = check_market_moved_before_date(df, "2024-06-26")
print(result)

None
