# Data Processing Plan - Portfolio Project

Student: Erdi Shpati  
School: Holberton School  
Date: November 15, 2025  
Project: S&P 500 Preprocessing

This notebook documents data sources, formats, features, exploration, hypotheses, missing data/outliers strategy, time-aware splits, bias mitigation, features for training, data types, transformations, and storage plans, aligned to the assignment requirements.


In [1]:
import os
from pathlib import Path
import numpy as np
import pandas as pd

# Local CSV path
CSV_PATH = Path(r"C:\Users\Admin\Downloads\archive") / "all_stocks_5yr.csv"
CSV_PATH


WindowsPath('C:/Users/Admin/Downloads/archive/all_stocks_5yr.csv')

## 1. Data Sources

- Primary source: Local CSV located at C:\Users\Admin\Downloads\archive\all_stocks_5yr.csv.  
- Aggregation: Single source for baseline; may later augment with fundamentals or macro time series.  
- Access method: pandas.read_csv using a Windows-safe path via pathlib.


In [2]:
import pandas as pd
from pathlib import Path

CSV_PATH = Path(r"C:\Users\Admin\Downloads\archive") / "all_stocks_5yr.csv"

# Read only first 1000 rows to keep memory tiny
df = pd.read_csv(CSV_PATH, nrows=1000, parse_dates=["date"])
df = df.sort_values("date").set_index("date")
print(df.shape); df.head()


(1000, 6)


Unnamed: 0_level_0,open,high,low,close,volume,Name
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2013-02-08,15.07,15.12,14.63,14.75,8407500,AAL
2013-02-11,14.89,15.01,14.26,14.46,8882000,AAL
2013-02-12,14.45,14.51,14.1,14.27,8126000,AAL
2013-02-13,14.3,14.94,14.25,14.66,10259500,AAL
2013-02-14,14.94,14.96,13.16,13.99,31879900,AAL


## 2. Data Format

- Current format: CSV with columns typically including date, open, high, low, close, volume, and a ticker identifier (Name).  
- Target format: Pandas DataFrame indexed by datetime for time-series operations and Parquet for intermediate storage.  
- Structure: Long format with multiple tickers stacked by date, enabling per-ticker rolling features.


In [3]:
print("Columns:", list(df.columns))
print("\nNull fraction (top 20):")
print(df.isna().mean().sort_values(ascending=False).head(20))


Columns: ['open', 'high', 'low', 'close', 'volume', 'Name']

Null fraction (top 20):
open      0.0
high      0.0
low       0.0
close     0.0
volume    0.0
Name      0.0
dtype: float64


## 3. Features and Exploration

- Raw features: open, high, low, close, volume, Name, and date.  
- EDA plan: descriptive statistics, missingness summary, distributions, correlations among numeric features, and per-ticker coverage over time.  
- Derived features: 1-day/5-day returns, rolling means (volume MA5, close MA5/MA20), and simple momentum indicators.


In [4]:
required = {"open","high","low","close","volume","Name"}
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing expected columns: {missing}")

df["close_next"] = df.groupby("Name")["close"].shift(-1)
df["up_1d"] = (df["close_next"] > df["close"]).astype("Int64")
df["ret_1d"] = df.groupby("Name")["close"].pct_change()
df["ret_5d"] = df.groupby("Name")["close"].pct_change(5)
df["vol_ma5"] = df.groupby("Name")["volume"].transform(lambda s: s.rolling(5, min_periods=1).mean())
df["close_ma5"] = df.groupby("Name")["close"].transform(lambda s: s.rolling(5, min_periods=1).mean())
df["close_ma20"] = df.groupby("Name")["close"].transform(lambda s: s.rolling(20, min_periods=1).mean())

df_model = df.dropna(subset=["close_next"]).copy()
print("df_model:", df_model.shape)


df_model: (999, 13)


## 4. Hypotheses and Testing

- Hypothesis 1: Short-term moving averages (e.g., close MA5 vs. MA20) capture momentum informative for next-day direction.  
  - Test: correlation/feature importance and evaluation by momentum regimes.  
- Hypothesis 2: Recent returns and volume changes have predictive value for next-day up/down.  
  - Test: classification metrics and calibration across feature quantiles.


In [5]:
import numpy as np
import pandas as pd

# Sanity: ensure engineered features and targets exist
needed_cols = {"close_ma5","close_ma20","ret_1d","ret_5d","up_1d","close_next"}
print("Has needed columns:", needed_cols.issubset(set(df_model.columns)))

# Hypothesis 1: Momentum via moving averages (close_ma5 vs close_ma20)
# Create a simple momentum signal and check association with next-day direction (classification target up_1d)
if {"close_ma5","close_ma20","up_1d"}.issubset(df_model.columns):
    df_mom = df_model[["close_ma5","close_ma20","up_1d"]].dropna().copy()
    df_mom["mom_pos"] = (df_mom["close_ma5"] > df_mom["close_ma20"]).astype(int)
    # Group-wise accuracy proxy: fraction of positive next-day moves when momentum positive vs negative
    grp = df_mom.groupby("mom_pos")["up_1d"].mean()
    print("P(up_1d=1) by momentum regime (0=MA5<=MA20, 1=MA5>MA20):")
    print(grp)

# Hypothesis 2: Recent returns and volume/price levels relate to next-day movement
# Bin recent return ret_1d into quantiles and compare up_1d rates
if {"ret_1d","up_1d"}.issubset(df_model.columns):
    tmp = df_model[["ret_1d","up_1d"]].dropna().copy()
    # Use quantiles; clip extremes to reduce influence of outliers
    tmp["ret_1d_clip"] = tmp["ret_1d"].clip(tmp["ret_1d"].quantile(0.01), tmp["ret_1d"].quantile(0.99))
    tmp["ret_1d_bin"] = pd.qcut(tmp["ret_1d_clip"], q=5, duplicates="drop")
    print("\nP(up_1d=1) by ret_1d quantile bin:")
    print(tmp.groupby("ret_1d_bin")["up_1d"].mean())

# Optional: correlation preview for regression target (close_next) with engineered numeric features (sampled)
numeric_checks = ["open","high","low","close","volume","ret_1d","ret_5d","vol_ma5","close_ma5","close_ma20","close_next"]
numeric_checks = [c for c in numeric_checks if c in df_model.columns]
if "close_next" in numeric_checks:
    sample_n = min(len(df_model), 20000)
    corr_preview = df_model[numeric_checks].iloc[:sample_n].corr()["close_next"].sort_values(ascending=False)
    print("\nCorrelation with close_next (preview on first 20k rows):")
    print(corr_preview)


Has needed columns: True
P(up_1d=1) by momentum regime (0=MA5<=MA20, 1=MA5>MA20):
mom_pos
0    0.548463
1    0.522569
Name: up_1d, dtype: Float64

P(up_1d=1) by ret_1d quantile bin:
ret_1d_bin
(-0.0577, -0.0149]        0.465
(-0.0149, -0.00312]    0.537688
(-0.00312, 0.00698]        0.57
(0.00698, 0.0188]      0.557789
(0.0188, 0.0593]           0.54
Name: up_1d, dtype: Float64

Correlation with close_next (preview on first 20k rows):
close_next    1.000000
close         0.996818
high          0.995688
low           0.995546
open          0.994181
close_ma5     0.992252
close_ma20    0.978314
vol_ma5       0.224725
volume        0.146502
ret_5d        0.010956
ret_1d        0.006913
Name: close_next, dtype: float64


  print(tmp.groupby("ret_1d_bin")["up_1d"].mean())


## 5. Data Density, Missing Data, Outliers

- Density: OHLCV is generally dense; verify and document any gaps.  
- Missing handling: numeric → median imputation; categorical → most frequent in the pipeline for reproducibility.  
- Outliers: detect with IQR/Z-score; choose whether to cap or keep due to market shock relevance, documenting the decision.


In [6]:
# Missingness
missing = df_model.isna().mean().sort_values(ascending=False)
print("Top missing columns:")
print(missing.head(15))

# Lightweight outlier rate (IQR) on compact numeric set
core_num = [
    "open","high","low","close","volume",
    "ret_1d","ret_5d","vol_ma5","close_ma5","close_ma20"
]
core_num = [c for c in core_num if c in df_model.columns]

def iqr_outlier_rate(s, factor=1.5):
    q1, q3 = s.quantile(0.25), s.quantile(0.75)
    iqr = q3 - q1
    lo, hi = q1 - factor*iqr, q3 + factor*iqr
    return ((s < lo) | (s > hi)).mean()

# Use train later for final rates; for now compute on full to preview
preview = pd.Series({c: iqr_outlier_rate(df_model[c].dropna()) for c in core_num})
print("IQR outlier rates (preview):")
print(preview.sort_values(ascending=False))


Top missing columns:
ret_5d        0.005005
ret_1d        0.001001
low           0.000000
high          0.000000
open          0.000000
volume        0.000000
close         0.000000
close_next    0.000000
Name          0.000000
up_1d         0.000000
vol_ma5       0.000000
close_ma5     0.000000
close_ma20    0.000000
dtype: float64
IQR outlier rates (preview):
volume        0.049049
vol_ma5       0.042042
ret_1d        0.028056
ret_5d        0.025151
high          0.000000
open          0.000000
low           0.000000
close         0.000000
close_ma5     0.000000
close_ma20    0.000000
dtype: float64


## 6. Data Splitting Strategy

- Time-aware split by chronological order: 70% train, 15% validation, 15% test.  
- Rationale: avoids look-ahead leakage and mirrors real predictive use.  
- No shuffling; maintain the global timeline across all tickers.


In [7]:
TARGET = "up_1d"  # or "close_next" for regression

feature_cols = [
    "open","high","low","close","volume",
    "ret_1d","ret_5d","vol_ma5","close_ma5","close_ma20","Name"
]
feature_cols = [c for c in feature_cols if c in df_model.columns]

X = df_model[feature_cols].copy()
y = df_model[TARGET].copy()

# Time-aware split on tiny set
all_idx = X.index.sort_values().unique()
n = len(all_idx)
cut1 = all_idx[int(n*0.70)] if n > 0 else None
cut2 = all_idx[int(n*0.85)] if n > 0 else None

train_idx = X.index[X.index <= cut1] if cut1 is not None else X.index[:0]
val_idx   = X.index[(X.index > cut1) & (X.index <= cut2)] if cut2 is not None else X.index[:0]
test_idx  = X.index[X.index > cut2] if cut2 is not None else X.index[:0]

# If the sample is too small and any split is empty, fallback to simple head/tail split
if len(train_idx) == 0 or len(val_idx) == 0:
    split1 = int(len(X)*0.7)
    split2 = int(len(X)*0.85)
    train_idx = X.index[:split1]
    val_idx   = X.index[split1:split2]
    test_idx  = X.index[split2:]

X_train, y_train = X.loc[train_idx], y.loc[train_idx]
X_val,   y_val   = X.loc[val_idx],   y.loc[val_idx]
X_test,  y_test  = X.loc[test_idx],  y.loc[test_idx]

print("Shapes:", X_train.shape, X_val.shape, X_test.shape)
print("Preview cols:", X_train.columns[:20].tolist())


Shapes: (700, 11) (150, 11) (149, 11)
Preview cols: ['open', 'high', 'low', 'close', 'volume', 'ret_1d', 'ret_5d', 'vol_ma5', 'close_ma5', 'close_ma20', 'Name']


## 7. Ensuring an Unbiased Dataset

- Distribution alignment: ensure validation/test spans typical conditions; consider multiple validation windows if needed.  
- Representation: verify a reasonable mix of tickers/sectors across splits; note any survivorship bias.  
- Subgroup metrics: evaluate performance by ticker or sector and iterate on features to address gaps.


In [8]:
if "Name" in X_train.columns:
    print("Unique tickers — Train/Val/Test:",
          X_train["Name"].nunique(),
          X_val["Name"].nunique(),
          X_test["Name"].nunique())


Unique tickers — Train/Val/Test: 1 1 1


## 8. Features for Training

- Initial features: OHLCV, returns (ret_1d, ret_5d), rolling means (vol_ma5, close_ma5, close_ma20), and Name as a categorical feature.  
- Exclusions: any feature that leaks future information or redundant/unstable engineered fields.  
- Future enhancements: volatility proxies, sector info, and macro covariates if justified.  


## 9. Data Types

- Numerical: continuous OHLCV, returns, rolling stats.  
- Categorical: Name (ticker), optionally sector.  
- Datetime: used for chronological splitting and possible seasonal/cyclical features later.  


## 10. Transformations and Pipeline

- Numeric: median imputation and standardization.  
- Categorical: most-frequent imputation and one-hot encoding for Name with handle_unknown="ignore".  
- Implemented with scikit-learn ColumnTransformer + Pipeline to ensure reproducibility and avoid leakage.  


In [9]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

num_cols = [
    "open","high","low","close","volume",
    "ret_1d","ret_5d","vol_ma5","close_ma5","close_ma20"
]
num_cols = [c for c in num_cols if c in X_train.columns]
cat_cols = ["Name"] if "Name" in X_train.columns else []

preprocess = ColumnTransformer(
    transformers=[
        ("num", Pipeline([("imputer", SimpleImputer(strategy="median")),
                          ("scaler", StandardScaler())]), num_cols),
        ("cat", Pipeline([("imputer", SimpleImputer(strategy="most_frequent")),
                          ("onehot", OneHotEncoder(handle_unknown="ignore"))]), cat_cols),
    ],
    remainder="drop",
)

print("Numeric cols:", num_cols); print("Categorical cols:", cat_cols)


Numeric cols: ['open', 'high', 'low', 'close', 'volume', 'ret_1d', 'ret_5d', 'vol_ma5', 'close_ma5', 'close_ma20']
Categorical cols: ['Name']


## 11. Baseline Models and Metrics

- Classification baseline: Logistic Regression with class_weight='balanced' (TARGET = up_1d).  
- Regression baseline: Ridge Regression (TARGET = close_next).  
- Evaluate on validation; keep test untouched for final estimate.  


In [10]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.pipeline import Pipeline

clf = Pipeline([("prep", preprocess),
                ("mdl", LogisticRegression(max_iter=1000, class_weight="balanced"))])
clf.fit(X_train, y_train)
val_pred = clf.predict(X_val)
print(classification_report(y_val, val_pred))
try:
    val_proba = clf.predict_proba(X_val)[:, 1]
    print("Validation ROC-AUC:", roc_auc_score(y_val, val_proba))
except Exception as e:
    print("ROC-AUC not available:", e)


              precision    recall  f1-score   support

         0.0       0.49      0.67      0.57        70
         1.0       0.58      0.40      0.47        80

    accuracy                           0.53       150
   macro avg       0.54      0.54      0.52       150
weighted avg       0.54      0.53      0.52       150

Validation ROC-AUC: 0.5183928571428571


## 12. Save Processed Data

- Save a model-ready snapshot of the engineered dataset to Parquet for efficient storage and reloads.  
- Store artifacts under data/processed with clear naming for reproducibility.  


In [11]:
from pathlib import Path
PROC_DIR = Path("data/processed")
PROC_DIR.mkdir(parents=True, exist_ok=True)
out_path = PROC_DIR / "sp500_all_stocks_5yr_model.parquet"
df_model.to_parquet(out_path)
print("Saved:", out_path.resolve())


Saved: C:\Users\Admin\data\processed\sp500_all_stocks_5yr_model.parquet


## Submission Summary

- Loaded a local S&P 500 OHLCV dataset, parsed dates, and indexed chronologically to support time-series workflows.  
- Engineered next-day targets (close_next for regression, up_1d for classification) and core numeric features (returns and rolling means) per ticker without leakage.  
- Applied a chronological 70/15/15 train/validation/test split to avoid look-ahead bias and mirror real predictive use.  
- Built a reproducible scikit-learn Pipeline that imputes/scales numeric features and one-hot encodes the ticker (Name) only within the Pipeline.  
- Trained baseline models and evaluated on the validation set, with the test set reserved for final estimation; saved a model-ready Parquet snapshot for reproducibility.  
