# Tax Liability Prediction (Regression) â€” Complete Endâ€‘toâ€‘End Notebook ðŸ§¾ðŸ“ˆ

This notebook is a **full regression ML project** built on the synthetic tax dataset.

## ðŸŽ¯ Objective
Predict **`TaxLiability`** (continuous numeric target).

## âœ… Includes
- Data loading + sanity checks
- **Advanced EDA (multiâ€‘level)**
- Cleaning (handles NaN/inf safely)
- Feature engineering
- Preprocessing pipeline (numeric + categorical)
- Train/Val/Test split
- **6+ regression models**
- Evaluation: **RMSE, MAE, RÂ², MAPE**
- Crossâ€‘validation (compatible scoring)
- Diagnostics: Pred vs Actual + residual plots
- Permutation importance (**safe settings**, no length mismatch)
- Save final deploymentâ€‘ready pipeline `.joblib`


In [None]:
# ============================
# 0) Setup
# ============================
DATA_PATH = "tax_synthetic_ml_dataset.csv"  # keep CSV in same folder as this notebook

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

df = pd.read_csv(DATA_PATH)
print("Raw shape:", df.shape)
df.head()

## 1) Data Audit (Quality Checks)

In [None]:
df.info()

In [None]:
# Missing values
missing = df.isna().sum().sort_values(ascending=False)
missing = missing[missing > 0]
print("Columns with missing:", len(missing))
missing.head(30)

In [None]:
# Duplicates
print("Duplicate rows:", int(df.duplicated().sum()))

In [None]:
df.describe(include="all").T.head(25)

## 2) Define Target (Regression) + Safe Cleaning
We predict **`TaxLiability`** and drop:
- `AuditFlag` (classification label)
- `EffectiveTaxRate` (derived from TaxLiability â†’ leakage)

We also:
- coerce target to numeric
- replace inf with NaN
- drop rows where **target is NaN** (prevents training error)


In [None]:
TARGET = "TaxLiability"

# Drop classification/leakage columns if present
drop_cols = ["AuditFlag", "EffectiveTaxRate"]
df = df.drop(columns=[c for c in drop_cols if c in df.columns], errors="ignore")

# Replace infinities in entire dataframe
df = df.replace([np.inf, -np.inf], np.nan)

# Force target numeric (bad strings -> NaN)
df[TARGET] = pd.to_numeric(df[TARGET], errors="coerce")

print("NaNs in target before:", int(df[TARGET].isna().sum()))

# Drop rows with NaN target (recommended)
df = df.dropna(subset=[TARGET]).reset_index(drop=True)

print("Shape after target cleanup:", df.shape)
print("NaNs in target after:", int(df[TARGET].isna().sum()))

X = df.drop(columns=[TARGET])
y = df[TARGET].astype(float)

# Final safety: remove non-finite target rows (should be none after dropna)
mask = np.isfinite(y.to_numpy())
X = X.loc[mask].reset_index(drop=True)
y = y.loc[mask].reset_index(drop=True)

print("Final X,y:", X.shape, y.shape)

## 3) Advanced EDA (Level 1): Target Distribution

In [None]:
plt.figure()
y.hist(bins=60)
plt.title("Target Distribution: TaxLiability")
plt.xlabel("TaxLiability")
plt.ylabel("Count")
plt.show()

print("Skewness:", float(y.skew()))
print("Kurtosis:", float(y.kurtosis()))

## 4) Advanced EDA (Level 2): Numeric Distributions & Outliers

In [None]:
num_cols_all = X.select_dtypes(include=[np.number]).columns.tolist()
cat_cols_all = X.select_dtypes(exclude=[np.number]).columns.tolist()

major_num = ["Age","AnnualIncome","BusinessIncome","CapitalGains","DeductionsTotal","TaxableIncome","StandardDeduction","HealthInsurancePremium"]
major_num = [c for c in major_num if c in X.columns]

for col in major_num:
    plt.figure()
    X[col].hist(bins=50)
    plt.title(f"Distribution: {col}")
    plt.xlabel(col)
    plt.ylabel("Count")
    plt.show()

def iqr_outlier_count(s):
    s = s.dropna()
    if len(s) == 0:
        return 0
    q1, q3 = np.percentile(s, [25, 75])
    iqr = q3 - q1
    lo, hi = q1 - 1.5*iqr, q3 + 1.5*iqr
    return int(((s < lo) | (s > hi)).sum())

outlier_counts = {c: iqr_outlier_count(X[c]) for c in major_num}
pd.Series(outlier_counts).sort_values(ascending=False)

## 5) Advanced EDA (Level 3): Relationships with Target

In [None]:
# Scatter plots (sample for speed)
sample_idx = X.sample(n=min(6000, len(X)), random_state=42).index

for col in ["TaxableIncome","AnnualIncome","BusinessIncome","CapitalGains","DeductionsTotal"]:
    if col not in X.columns:
        continue
    plt.figure()
    plt.scatter(X.loc[sample_idx, col], y.loc[sample_idx], alpha=0.25)
    plt.title(f"{col} vs TaxLiability")
    plt.xlabel(col)
    plt.ylabel("TaxLiability")
    plt.show()

In [None]:
# Categorical: target mean by category
for col in cat_cols_all:
    stats = df.groupby(col)[TARGET].agg(["mean","median","count"]).sort_values("mean", ascending=False)
    display(stats.head(10))
    plt.figure()
    stats["mean"].head(12).plot(kind="bar")
    plt.title(f"Mean TaxLiability by {col} (Top 12)")
    plt.xlabel(col)
    plt.ylabel("Mean TaxLiability")
    plt.show()

## 6) Advanced EDA (Level 4): Correlation Heatmap (Numeric)

In [None]:
corr = df[num_cols_all + [TARGET]].corr(numeric_only=True)

plt.figure(figsize=(10, 8))
plt.imshow(corr.values, aspect="auto")
plt.colorbar()
plt.xticks(range(len(corr.columns)), corr.columns, rotation=90)
plt.yticks(range(len(corr.index)), corr.index)
plt.title("Correlation Heatmap (Numeric)")
plt.tight_layout()
plt.show()

## 7) Feature Engineering
Adds:
- TotalIncome
- BusinessIncomeRatio
- DeductionsRatio
- IncomePerDependent
- LogTotalIncome
All ratios use +1 to avoid divide-by-zero.


In [None]:
df_fe = df.copy()

df_fe["TotalIncome"] = df_fe["AnnualIncome"] + df_fe["BusinessIncome"] + df_fe["CapitalGains"]
df_fe["BusinessIncomeRatio"] = df_fe["BusinessIncome"] / (df_fe["AnnualIncome"] + 1.0)
df_fe["DeductionsRatio"] = df_fe["DeductionsTotal"] / (df_fe["TotalIncome"] + 1.0)
df_fe["IncomePerDependent"] = df_fe["TotalIncome"] / (df_fe["Dependents"] + 1.0)
df_fe["LogTotalIncome"] = np.log1p(df_fe["TotalIncome"])

# Replace any inf that might appear (shouldn't)
df_fe = df_fe.replace([np.inf, -np.inf], np.nan)

# Drop rows where engineered columns became NaN (rare)
engineered_cols = ["TotalIncome","BusinessIncomeRatio","DeductionsRatio","IncomePerDependent","LogTotalIncome"]
df_fe = df_fe.dropna(subset=engineered_cols + [TARGET]).reset_index(drop=True)

X = df_fe.drop(columns=[TARGET])
y = df_fe[TARGET].astype(float)

num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
cat_cols = X.select_dtypes(exclude=[np.number]).columns.tolist()

print("After FE X,y:", X.shape, y.shape)
X.head()

## 8) Train / Validation / Test Split (70/15/15)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.30, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.50, random_state=42)

print("Train:", X_train.shape, "Val:", X_val.shape, "Test:", X_test.shape)

## 9) Preprocessing Pipeline (Numeric + Categorical)

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer

num_pipe = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

cat_pipe = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("ohe", OneHotEncoder(handle_unknown="ignore"))
])

preprocess = ColumnTransformer(transformers=[
    ("num", num_pipe, num_cols),
    ("cat", cat_pipe, cat_cols),
])

## 10) Metrics (Compatible with Older/Newer scikit-learn)
We compute RMSE manually (avoids `squared=False` incompatibility).


In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

def regression_metrics(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)  # no squared=...
    rmse = float(np.sqrt(mse))
    mae = float(mean_absolute_error(y_true, y_pred))
    r2 = float(r2_score(y_true, y_pred))
    denom = np.clip(np.abs(np.asarray(y_true)), 1.0, None)
    mape = float(np.mean(np.abs((np.asarray(y_true) - np.asarray(y_pred)) / denom)) * 100)
    return {"RMSE": rmse, "MAE": mae, "R2": r2, "MAPE_%": mape}

## 11) Train 6+ Regression Models + Validation Comparison

In [None]:
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, HistGradientBoostingRegressor, GradientBoostingRegressor

models = {
    "LinearRegression": LinearRegression(),
    "Ridge": Ridge(alpha=1.0, random_state=42),
    "Lasso": Lasso(alpha=0.0008, random_state=42, max_iter=20000),
    "ElasticNet": ElasticNet(alpha=0.001, l1_ratio=0.35, random_state=42, max_iter=20000),
    "GradientBoosting": GradientBoostingRegressor(random_state=42),
    "RandomForest": RandomForestRegressor(n_estimators=350, random_state=42, n_jobs=-1),
    "ExtraTrees": ExtraTreesRegressor(n_estimators=500, random_state=42, n_jobs=-1),
    "HistGradientBoosting": HistGradientBoostingRegressor(random_state=42),
}

val_results = []
trained = {}

for name, model in models.items():
    pipe = Pipeline(steps=[("preprocess", preprocess), ("model", model)])
    pipe.fit(X_train, y_train)
    pred_val = pipe.predict(X_val)
    m = regression_metrics(y_val, pred_val)
    m["Model"] = name
    val_results.append(m)
    trained[name] = pipe
    print(name, m)

val_df = pd.DataFrame(val_results).set_index("Model").sort_values(["RMSE","MAE"])
val_df

## 12) Cross-Validation (CV) for Top Models (Compatible scoring)
We use `neg_mean_squared_error` then take sqrt to get RMSE.


In [None]:
from sklearn.model_selection import KFold, cross_val_score

top_models = val_df.head(4).index.tolist()
cv = KFold(n_splits=5, shuffle=True, random_state=42)

cv_rows = []
for name in top_models:
    pipe = trained[name]
    scores = cross_val_score(pipe, X_train, y_train, scoring="neg_mean_squared_error", cv=cv, n_jobs=-1)
    cv_rmse = np.sqrt(-scores)
    cv_rows.append({
        "Model": name,
        "CV_RMSE_mean": float(cv_rmse.mean()),
        "CV_RMSE_std": float(cv_rmse.std()),
    })

cv_df = pd.DataFrame(cv_rows).set_index("Model").sort_values("CV_RMSE_mean")
cv_df

## 13) Final Model: Refit on Train+Val and Evaluate on Test

In [None]:
best_name = val_df.index[0]
print("Best model by validation RMSE:", best_name)

best_pipe = Pipeline(steps=[("preprocess", preprocess), ("model", models[best_name])])

X_trval = pd.concat([X_train, X_val], axis=0)
y_trval = pd.concat([y_train, y_val], axis=0)

best_pipe.fit(X_trval, y_trval)

pred_test = best_pipe.predict(X_test)
test_metrics = regression_metrics(y_test, pred_test)
test_metrics

## 14) Diagnostics: Pred vs Actual + Residual Plots

In [None]:
plt.figure()
plt.scatter(y_test, pred_test, alpha=0.3)
plt.title(f"Predicted vs Actual (Test) â€” {best_name}")
plt.xlabel("Actual TaxLiability")
plt.ylabel("Predicted TaxLiability")
plt.show()

resid = y_test - pred_test

plt.figure()
pd.Series(resid).hist(bins=60)
plt.title("Residual Distribution (Test)")
plt.xlabel("Residual (Actual - Pred)")
plt.ylabel("Count")
plt.show()

plt.figure()
plt.scatter(pred_test, resid, alpha=0.3)
plt.axhline(0, color="red")
plt.title("Residuals vs Predicted (Test)")
plt.xlabel("Predicted TaxLiability")
plt.ylabel("Residual")
plt.show()

## 15) Permutation Importance (Safe + Correct Feature Names)
- Uses **original input columns** (no oneâ€‘hot length mismatch)
- Uses **n_jobs=1** to avoid joblib disk/pickling issues


In [None]:
from sklearn.inspection import permutation_importance

X_imp = X_test.sample(n=min(1500, len(X_test)), random_state=42)
y_imp = y_test.loc[X_imp.index]

r = permutation_importance(
    best_pipe,
    X_imp,
    y_imp,
    n_repeats=3,
    random_state=42,
    n_jobs=1,
    scoring="neg_mean_squared_error"
)

feature_names = X_imp.columns.tolist()

imp_df = pd.DataFrame({
    "feature": feature_names,
    "importance_mean": r.importances_mean,
    "importance_std": r.importances_std
}).sort_values("importance_mean", ascending=False)

display(imp_df.head(20))

topk = imp_df.head(15).iloc[::-1]
plt.figure(figsize=(8, 6))
plt.barh(topk["feature"], topk["importance_mean"])
plt.title("Permutation Importance (Original Features)")
plt.xlabel("Importance (MSE impact)")
plt.ylabel("Feature")
plt.tight_layout()
plt.show()

## 16) Save Final Pipeline (Deploymentâ€‘Ready)

In [None]:
import joblib

ARTIFACT_PATH = "tax_liability_regression_pipeline.joblib"
joblib.dump(best_pipe, ARTIFACT_PATH)
print("Saved:", ARTIFACT_PATH)

## 17) Quick Inference Example

In [None]:
example_row = X_test.iloc[[0]].copy()
pred_example = float(best_pipe.predict(example_row)[0])
print("Example prediction TaxLiability:", pred_example)
example_row

## âœ… Conclusion (Vivaâ€‘Ready)
- We framed the task as **regression** to predict **TaxLiability**
- Performed EDA, feature engineering, and trained multiple models
- Selected best model using validation RMSE + confirmed with CV
- Ran diagnostics + feature importance
- Saved a single pipeline artifact for Streamlit/FastAPI deployment
