In [None]:
# =========================
# FEATURE ENGINEERING + POST-PROCESSING WITH SECTOR MAPPING
# =========================

!pip install pandas_ta tqdm

import pandas as pd
import numpy as np
import pandas_ta as ta
import os
import io
from io import BytesIO
from datetime import datetime
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
from scipy.stats import zscore
from azure.storage.blob import BlobServiceClient

# ===== CONFIG =====
import os
from dotenv import load_dotenv

load_dotenv()  # loads .env file if present (for local dev)

AZURE_CONN_STR = os.getenv("AZURE_CONN_STR")
CONTAINER_NAME = "stock-data"

BLOB_FOLDER_INPUT = "stockdata_us_adjclose/"
BLOB_FOLDER_OUTPUT = "model_ready_data/"
SECTOR_FILE = "sectors_20250809.csv"

OUTLIER_ZSCORE_THRESHOLD = 4.0
VERSION_TIMESTAMP = datetime.now().strftime("%Y%m%d")
LOCAL_FEAT_DIR = "features_us_adjclose"
os.makedirs(LOCAL_FEAT_DIR, exist_ok=True)

# ===== Connect to Azure Blob =====
blob_service_client = BlobServiceClient.from_connection_string(AZURE_CONN_STR)
container_client = blob_service_client.get_container_client(CONTAINER_NAME)

# ===== Load Sector Mapping =====
print(f"📥 Loading sector mapping from {SECTOR_FILE} ...")
sector_blob = container_client.download_blob(SECTOR_FILE).readall()
sector_df = pd.read_csv(io.BytesIO(sector_blob))
sector_df = sector_df.rename(columns={"Ticker": "Ticker", "Sector": "Sector"})
print(f"✅ Sector mapping loaded: {sector_df.shape[0]} tickers")

sector_map = dict(zip(sector_df["Ticker"], sector_df["Sector"]))

# ===== Feature Engineering Function =====
def process_file(blob_name):
    try:
        # Download Parquet
        data_stream = BytesIO()
        data_stream.write(container_client.download_blob(blob_name).readall())
        data_stream.seek(0)
        df = pd.read_parquet(data_stream)

        # Sort by date
        df = df.sort_values("Date").reset_index(drop=True)

        # Technical Indicators
        df["SMA_20"] = ta.sma(df["Adj Close"], length=20)
        df["EMA_20"] = ta.ema(df["Adj Close"], length=20)
        df["RSI_14"] = ta.rsi(df["Adj Close"], length=14)

        macd = ta.macd(df["Adj Close"], fast=12, slow=26)
        df["MACD"] = macd["MACD_12_26_9"]
        df["MACD_signal"] = macd["MACDs_12_26_9"]

        bbands = ta.bbands(df["Adj Close"], length=20)
        df["BB_upper"] = bbands["BBU_20_2.0"]
        df["BB_lower"] = bbands["BBL_20_2.0"]

        df["ATR_14"] = ta.atr(df["High"], df["Low"], df["Close"], length=14)
        df["ADX_14"] = ta.adx(df["High"], df["Low"], df["Close"], length=14)["ADX_14"]

        # Returns & Volatility
        df["daily_return"] = df["Adj Close"].pct_change()
        df["volatility_20d"] = df["daily_return"].rolling(window=20).std()

        # Drop NaNs from indicator calc
        df = df.dropna().reset_index(drop=True)

        # Add sector
        ticker = df["Ticker"].iloc[0]
        df["Sector"] = sector_map.get(ticker, "Unknown")

        # === Post-processing ===
        if "Dividends" in df.columns:
            df["Dividend_event"] = (df["Dividends"] > 0).astype(int)
        else:
            df["Dividend_event"] = 0

        if "Stock Splits" in df.columns:
            df["StockSplit_event"] = (df["Stock Splits"] > 0).astype(int)
        else:
            df["StockSplit_event"] = 0

        # Outlier detection
        num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        num_cols = [c for c in num_cols if not c.endswith("_is_outlier")]

        for col in num_cols:
            if df[col].nunique() <= 1:
                df[f"{col}_is_outlier"] = 0
                continue
            arr = df[col].to_numpy(dtype=float)
            if np.nanstd(arr) == 0 or np.all(np.isnan(arr)):
                df[f"{col}_is_outlier"] = 0
                continue
            z = zscore(arr, nan_policy="omit")
            mask = np.zeros(len(arr), dtype=np.int8)
            valid_idx = ~np.isnan(z)
            mask[valid_idx] = (np.abs(z[valid_idx]) > OUTLIER_ZSCORE_THRESHOLD).astype(np.int8)
            df[f"{col}_is_outlier"] = mask

        # Drop unused raw cols
        drop_cols = []
        if "Dividends" in df.columns:
            drop_cols.append("Dividends")
        if "Stock Splits" in df.columns:
            drop_cols.append("Stock Splits")
        df_model = df.drop(columns=drop_cols) if drop_cols else df

        # Save to Azure
        output_blob_name = f"{BLOB_FOLDER_OUTPUT}{ticker}_{VERSION_TIMESTAMP}.parquet"
        out_buf = io.BytesIO()
        df_model.to_parquet(out_buf, index=False)
        out_buf.seek(0)
        container_client.upload_blob(output_blob_name, out_buf.getvalue(), overwrite=True)

        return ticker, True, None
    except Exception as e:
        return blob_name, False, str(e)

# ===== Run Processing =====
def main():
    blobs = [b.name for b in container_client.list_blobs(name_starts_with=BLOB_FOLDER_INPUT) if b.name.endswith(".parquet")]
    print(f"Found {len(blobs)} parquet files to process.")

    num_cores = min(cpu_count(), 12)
    with Pool(num_cores) as pool:
        results = list(tqdm(pool.imap(process_file, blobs), total=len(blobs)))

    success = [r for r in results if r[1]]
    failed = [r for r in results if not r[1]]
    print(f"\n✅ Success: {len(success)}")
    if failed:
        print(f"❌ Failed: {len(failed)}")
        for f in failed[:5]:
            print(f)

if __name__ == "__main__":
    main()




📥 Loading sector mapping from sectors_20250809.csv ...
✅ Sector mapping loaded: 402 tickers
Found 403 parquet files to process.


100%|██████████| 403/403 [00:39<00:00, 10.24it/s]



✅ Success: 403

🔍 Verifying sample files...

File: model_ready_data/AAPL_20250811.parquet
Shape: (11222, 43)
Columns: ['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Ticker', 'SMA_20', 'EMA_20', 'RSI_14', 'MACD', 'MACD_signal', 'BB_upper', 'BB_lower', 'ATR_14', 'ADX_14', 'daily_return', 'volatility_20d', 'Sector', 'Dividend_event', 'StockSplit_event', 'Open_is_outlier', 'High_is_outlier', 'Low_is_outlier', 'Close_is_outlier', 'Adj Close_is_outlier', 'Volume_is_outlier', 'Dividends_is_outlier', 'Stock Splits_is_outlier', 'SMA_20_is_outlier', 'EMA_20_is_outlier', 'RSI_14_is_outlier', 'MACD_is_outlier', 'MACD_signal_is_outlier', 'BB_upper_is_outlier', 'BB_lower_is_outlier', 'ATR_14_is_outlier', 'ADX_14_is_outlier', 'daily_return_is_outlier', 'volatility_20d_is_outlier', 'Dividend_event_is_outlier', 'StockSplit_event_is_outlier']
                       Date      Open      High       Low     Close  \
0 1981-01-30 05:00:00+00:00  0.127232  0.127232  0.126116  0.126116   
1 

In [None]:
# ===== Verification =====
print("\n🔍 Verifying sample files...")
out_blobs = [b.name for b in container_client.list_blobs(name_starts_with=BLOB_FOLDER_OUTPUT) if b.name.endswith(".parquet")]
sample = out_blobs[:2]
for bname in sample:
    data = container_client.download_blob(bname).readall()
    dfv = pd.read_parquet(io.BytesIO(data))
    print(f"\nFile: {bname}")
    print("Shape:", dfv.shape)
    print("Columns:", dfv.columns.tolist())
    cols_to_show = ["Date", "Ticker", "Sector", "Adj Close", "SMA_20", "RSI_14"]
    pd.set_option("display.max_columns", None)
    print(dfv.head(10))


🔍 Verifying sample files...

File: model_ready_data/AAPL_20250811.parquet
Shape: (11222, 43)
Columns: ['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Ticker', 'SMA_20', 'EMA_20', 'RSI_14', 'MACD', 'MACD_signal', 'BB_upper', 'BB_lower', 'ATR_14', 'ADX_14', 'daily_return', 'volatility_20d', 'Sector', 'Dividend_event', 'StockSplit_event', 'Open_is_outlier', 'High_is_outlier', 'Low_is_outlier', 'Close_is_outlier', 'Adj Close_is_outlier', 'Volume_is_outlier', 'Dividends_is_outlier', 'Stock Splits_is_outlier', 'SMA_20_is_outlier', 'EMA_20_is_outlier', 'RSI_14_is_outlier', 'MACD_is_outlier', 'MACD_signal_is_outlier', 'BB_upper_is_outlier', 'BB_lower_is_outlier', 'ATR_14_is_outlier', 'ADX_14_is_outlier', 'daily_return_is_outlier', 'volatility_20d_is_outlier', 'Dividend_event_is_outlier', 'StockSplit_event_is_outlier']
                       Date      Open      High       Low     Close  \
0 1981-01-30 05:00:00+00:00  0.127232  0.127232  0.126116  0.126116   
1 1981-02-02 05:00

In [None]:
# ----------------- Quick verification: sample a couple of files from OUTPUT_FOLDER -----------------
print("\nRunning verification on sample files from output folder...")
out_blobs = [b.name for b in container_client.list_blobs(name_starts_with=OUTPUT_FOLDER) if b.name.lower().endswith(".parquet")]
if len(out_blobs) == 0:
    print("No files found in output folder. Check for earlier errors.")
else:
    sample = out_blobs[:min(SAMPLE_VERIFY_COUNT, len(out_blobs))]
    for bname in sample:
        try:
            print(f"\nVerifying {bname} ...")
            data = container_client.download_blob(bname).readall()
            dfv = pd.read_parquet(io.BytesIO(data))
            print(dfv.columns)
            print(dfv.head())
            print(dfv.tail())
        except Exception as e:
            print("Verification failed for", bname, ":", e)

print("\nVerification done.")


Running verification on sample files from output folder...

Verifying model_ready_data/AAPL_20250811.parquet ...
Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Ticker',
       'SMA_20', 'EMA_20', 'RSI_14', 'MACD', 'MACD_signal', 'BB_upper',
       'BB_lower', 'ATR_14', 'ADX_14', 'daily_return', 'volatility_20d',
       'Dividend_event', 'StockSplit_event', 'Open_is_outlier',
       'High_is_outlier', 'Low_is_outlier', 'Close_is_outlier',
       'Adj Close_is_outlier', 'Volume_is_outlier', 'Dividends_is_outlier',
       'Stock Splits_is_outlier', 'SMA_20_is_outlier', 'EMA_20_is_outlier',
       'RSI_14_is_outlier', 'MACD_is_outlier', 'MACD_signal_is_outlier',
       'BB_upper_is_outlier', 'BB_lower_is_outlier', 'ATR_14_is_outlier',
       'ADX_14_is_outlier', 'daily_return_is_outlier',
       'volatility_20d_is_outlier', 'Dividend_event_is_outlier',
       'StockSplit_event_is_outlier'],
      dtype='object')
                       Date      Open      High      

In [None]:
import pandas as pd
from azure.storage.blob import BlobServiceClient
import io

# Azure config
import os
from dotenv import load_dotenv

load_dotenv()  # loads .env file if present (for local dev)

AZURE_CONNECTION_STRING = os.getenv("AZURE_CONN_STR")
CONTAINER_NAME = "stock-data"
FOLDER = "model_ready_data"

# Connect to Azure Blob
blob_service_client = BlobServiceClient.from_connection_string(AZURE_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(CONTAINER_NAME)

# Load one sample file (change ticker if needed)
sample_blob = f"{FOLDER}/AAPL_20250811.parquet"
downloader = container_client.download_blob(sample_blob)
df = pd.read_parquet(io.BytesIO(downloader.readall()))

# List outlier columns
outlier_cols = [c for c in df.columns if c.endswith("_is_outlier")]

# Filter rows where any outlier flag != 0
non_zero_outliers = df[df[outlier_cols].sum(axis=1) > 0]

print(f"Total rows with non-zero outlier flags: {len(non_zero_outliers)}")
print(non_zero_outliers[outlier_cols + ["Date", "Ticker"]].head(20))  # Show first 20


Total rows with non-zero outlier flags: 575
      Open_is_outlier  High_is_outlier  Low_is_outlier  Close_is_outlier  \
165                 0                0               0                 0   
654                 0                0               0                 0   
658                 0                0               0                 0   
670                 0                0               0                 0   
671                 0                0               0                 0   
672                 0                0               0                 0   
673                 0                0               0                 0   
688                 0                0               0                 0   
689                 0                0               0                 0   
702                 0                0               0                 0   
888                 0                0               0                 0   
1586                0                0      