#Estimating Vehicle Repair Costs with Predictive Modeling

Code authored by: Irfan Ozmen

In [None]:
import numpy as np
import sklearn
from sklearn.model_selection import train_test_split

In [None]:
print(np.__version__)

In [None]:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader,TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from scipy import stats
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

import pandas as pd

import matplotlib.pyplot as plt
%pip install seaborn
import seaborn as sns
from IPython.display import set_matplotlib_formats
set_matplotlib_formats('svg')
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
%pip install missingno
import missingno as msno
%pip install rapidfuzz
from rapidfuzz import process

In [None]:
file_path ="J:/My Drive/Colab Notebooks/RI_Project/MTRX Test Data/MTRX1000000.csv"

df = pd.read_csv(file_path)

In [None]:
df.info()

# List numeric and categorical features
numeric_cols = df.select_dtypes(include='number').columns
categorical_cols = df.select_dtypes(include='object').columns

print("Numerical Features:\n", numeric_cols)
print("\nCategorical Features:\n", categorical_cols)

In [None]:
#Histograms for numeric columns
df[numeric_cols].hist(bins=30, figsize=(15, 10))
plt.suptitle("Numerical Feature Distributions")
plt.show()

In [None]:
#Counterplots for top categorical features
for col in ['ITEM_NAME', 'VEHICLE_MODEL', 'REPAIRER_STATE']:
    plt.figure(figsize=(10,4))
    sns.countplot(y=col, data=df, order=df[col].value_counts().index[:10])
    plt.title(f"Top values in {col}")
    plt.show()

In [None]:
for col in numeric_cols:
    plt.figure(figsize=(10,2))
    sns.boxplot(x=df[col])
    plt.title(f"Outlier Detection in {col}")
    plt.show()

In [None]:
#Handle Missing values

#Item mapping

# List of accepted categories
clean_categories = [
    "HAIL",
    "PDR",
    "GLASS",
    "PACKAGE",
    "PPF VINYL PACKAGE",
    "PARTS",
    "WHEEL",
    "TINT",
    "MISC",
    "DETAIL",
    "PPF VINYL",
    "REMOVEINSTALL",
    "PRICE A DENT",
    "PAINT & BODY",
    "DETAIL PACKAGE",
    "PPF VINYL MATERIAL"
]

def fuzzy_clean_item_name(raw_name):
    try:
        name = str(raw_name).upper().strip()
        best_match, score, _ = process.extractOne(name, clean_categories)
        return best_match if score > 80 else name
    except Exception as e:
        print(f"Skipping value: {raw_name} due to error: {e}")
        return raw_name  # fallback if something goes wrong

df['SERVICE'] = df['SERVICE'].apply(fuzzy_clean_item_name).str.upper().str.strip()



# Make a copy to preserve original
df_cleaned = df.copy()

# Strategy examples:
for col in numeric_cols:
    df_cleaned[col].fillna(df_cleaned[col].mean(), inplace=True)

for col in categorical_cols:
    df_cleaned[col].fillna(df_cleaned[col].mode()[0], inplace=True)

In [None]:
print("Numerical Features:\n", numeric_cols)
print("\nCategorical Features:\n", categorical_cols)

In [None]:
# Test Outliers with IQR method
Q1 = df_cleaned[numeric_cols].quantile(0.25)
Q3 = df_cleaned[numeric_cols].quantile(0.75)
IQR = Q3 - Q1

df_no_outliers = df_cleaned[~((df_cleaned[numeric_cols] < (Q1 - 1.5 * IQR)) |
                              (df_cleaned[numeric_cols] > (Q3 + 1.5 * IQR))).any(axis=1)]
print("Original size:", df_cleaned.shape)
print("No outlier size:", df_no_outliers.shape)

In [None]:
#
from scipy.stats.mstats import winsorize

df_model = df_no_outliers.copy()  # or df_cleaned if you choose not to remove outliers

TARGET = "ITEM_AMOUNT"

# choose which categorical columns you want embeddings for
cat_cols = ["ITEM_NAME", "VEHICLE_MODEL", "REPAIRER_STATE", "SERVICE"]  # adjust as needed

# numeric columns = numeric features except target
num_cols = [c for c in df_model.select_dtypes(include="number").columns if c != TARGET]
# Remove identifier columns (edit list based on dataset)
id_like = ["INVOICE_ID", "_BATCH_ID_"]
num_cols = [c for c in num_cols if c not in id_like]

# 1) split FIRST (prevents leakage)
df_model = df_model[df_model[TARGET].notna()].copy()
df_model = df_model[df_model[TARGET] >= 0].copy()

train_df, test_df = train_test_split(df_model, test_size=0.2, random_state=42)

# 2) winsorize using TRAIN percentiles only
low_q, high_q = train_df[TARGET].quantile([0.005, 0.995])
train_df[TARGET] = train_df[TARGET].clip(lower=low_q, upper=high_q)
test_df[TARGET]  = test_df[TARGET].clip(lower=low_q, upper=high_q)

# 3) scale numeric (fit on train only)
scaler = StandardScaler()
X_num_train = scaler.fit_transform(train_df[num_cols].astype(float))
X_num_test  = scaler.transform(test_df[num_cols].astype(float))

y_train = np.log1p(train_df[TARGET].astype(np.float32).values).reshape(-1, 1)
y_test  = np.log1p(test_df[TARGET].astype(np.float32).values).reshape(-1, 1)

# 4) build vocab (train only) and encode cats. Adding a range will work better. Second options is much more diffucult.
def build_vocab(series: pd.Series, min_freq=2):
    vc = series.astype(str).value_counts()
    tokens = vc[vc >= min_freq].index.tolist()
    # 0 reserved for UNK
    stoi = {tok: i+1 for i, tok in enumerate(tokens)}
    return stoi

vocabs = {col: build_vocab(train_df[col], min_freq=2) for col in cat_cols}

def encode_cats(frame: pd.DataFrame, vocabs: dict, cat_cols: list):
    cols = []
    for col in cat_cols:
        stoi = vocabs[col]
        ids = frame[col].astype(str).map(lambda x: stoi.get(x, 0)).astype(np.int64).values
        cols.append(ids)
    return np.stack(cols, axis=1)  # shape [N, num_cat_cols]

X_cat_train = encode_cats(train_df, vocabs, cat_cols)
X_cat_test  = encode_cats(test_df, vocabs, cat_cols)

# 5) embedding sizes
def emb_dim(cardinality: int) -> int:
    return int(min(50, round(np.sqrt(cardinality))))

cat_cardinalities = {col: len(vocabs[col]) + 1 for col in cat_cols}  # +1 for UNK=0
emb_dims = {col: emb_dim(cat_cardinalities[col]) for col in cat_cols}

cat_cardinalities, emb_dims

In [None]:
from torch.utils.data import Dataset

class RepairDataset(Dataset):
    def __init__(self, X_cat, X_num, y):
        self.X_cat = torch.tensor(X_cat, dtype=torch.long)
        self.X_num = torch.tensor(X_num, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X_cat[idx], self.X_num[idx], self.y[idx]

train_ds = RepairDataset(X_cat_train, X_num_train, y_train)
test_ds  = RepairDataset(X_cat_test,  X_num_test,  y_test)

train_loader = DataLoader(train_ds, batch_size=256, shuffle=True)
test_loader  = DataLoader(test_ds, batch_size=1024, shuffle=False)


class RepairEmbNet(nn.Module):
    def __init__(self, cat_cardinalities, emb_dims, n_num, hidden=128, p=0.2):
        super().__init__()
        self.cat_cols = list(cat_cardinalities.keys())

        self.emb = nn.ModuleDict({
            col: nn.Embedding(cat_cardinalities[col], emb_dims[col])
            for col in self.cat_cols
        })

        emb_total = sum(emb_dims.values())
        self.bn_num = nn.BatchNorm1d(n_num) if n_num > 0 else None

        self.fc1 = nn.Linear(emb_total + n_num, hidden)
        self.drop1 = nn.Dropout(p)
        self.fc2 = nn.Linear(hidden, hidden // 2)
        self.drop2 = nn.Dropout(p)
        self.out = nn.Linear(hidden // 2, 1)

    def forward(self, x_cat, x_num):
        embs = [self.emb[col](x_cat[:, i]) for i, col in enumerate(self.cat_cols)]
        x = torch.cat(embs, dim=1)

        if self.bn_num is not None:
            x_num = self.bn_num(x_num)
        x = torch.cat([x, x_num], dim=1)

        x = F.relu(self.fc1(x))
        x = self.drop1(x)
        x = F.relu(self.fc2(x))
        x = self.drop2(x)
        return self.out(x)


In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

import scipy.stats as stats
import matplotlib.pyplot as plt
import numpy as np

device = "cuda" if torch.cuda.is_available() else "cpu"
model = RepairEmbNet(cat_cardinalities, emb_dims, n_num=len(num_cols)).to(device)

opt = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
# Huber loss (SmoothL1). beta is the "delta" threshold in log-space.
loss_fn = nn.SmoothL1Loss(beta=0.5) # try 0.1, 0.2, 0.5 

numepochs = 20
train_loss_hist = []
test_loss_hist  = []

for epoch in range(numepochs):
    # ---- train ----
    model.train()
    batch_losses = []

    for xc, xn, yb in train_loader:
        xc, xn, yb = xc.to(device), xn.to(device), yb.to(device)

        pred = model(xc, xn)
        loss = loss_fn(pred, yb)

        opt.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step()

        batch_losses.append(loss.item())

    train_epoch_loss = float(np.mean(batch_losses))
    train_loss_hist.append(train_epoch_loss)

    # ---- eval (loss + metrics) ----
    model.eval()
    eval_losses = []
    preds, trues = [], []

    with torch.no_grad():
        for xc, xn, yb in test_loader:
            xc, xn, yb = xc.to(device), xn.to(device), yb.to(device)

            pred = model(xc, xn)
            loss = loss_fn(pred, yb)
            eval_losses.append(loss.item())

            preds.append(pred.cpu().numpy())
            trues.append(yb.cpu().numpy())

    test_epoch_loss = float(np.mean(eval_losses))
    test_loss_hist.append(test_epoch_loss)

    # stack first (critical)
    preds = np.vstack(preds)   # shape [N,1]
    trues = np.vstack(trues)   # shape [N,1]
    

    # convert log-space -> dollar-space
    preds_cost = np.expm1(preds)
    trues_cost = np.expm1(trues)

    mse = mean_squared_error(trues_cost, preds_cost)
    mae = mean_absolute_error(trues_cost, preds_cost)
    r2  = r2_score(trues_cost, preds_cost)

    # Number of samples in evaluation
    n = len(trues_cost)

    # Effective number of predictors
    p = len(num_cols) + sum(emb_dims.values())

    # Adjusted R^2 (guard against invalid case)
    if n > p + 1:
        adj_r2 = 1 - (1 - r2) * (n - 1) / (n - p - 1)
    else:
        adj_r2 = np.nan

    print(
        f"Epoch {epoch+1:02d} | "
        f"train huber {train_epoch_loss:.4f} | "
        f"test huber {test_epoch_loss:.4f} | "
        f"MAE ${mae:.2f} | R2 {r2:.3f} | "
    )
# preds and trues are already stacked and in log-space
log_residuals = (trues - preds).reshape(-1)

# Remove any non-finite values (defensive)
log_residuals = log_residuals[np.isfinite(log_residuals)]

plt.figure(figsize=(6,6))
stats.probplot(log_residuals, dist="norm", plot=plt)
plt.title("Q–Q Plot of Log-Space Residuals")
plt.grid(True)
plt.show()

In [None]:
import matplotlib.pyplot as plt

epochs = np.arange(1, len(train_loss_hist) + 1)

plt.figure(figsize=(10,6))
plt.plot(epochs, train_loss_hist, label="Train Loss (MSE)")
plt.plot(epochs, test_loss_hist, label="Test Loss (MSE)")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training vs Test Loss Over Epochs")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Ensure 1D arrays
y_pred = preds.reshape(-1)
y_true = trues.reshape(-1)

y_pred_cost = np.expm1(y_pred)
y_true_cost = np.expm1(y_true)

residuals = y_true_cost - y_pred_cost

#################################################################################
# Attach predictions & residuals back to test_df (align by index)
test_df_diag = test_df.copy().reset_index(drop=True)

test_df_diag["y_pred_cost"] = y_pred_cost
test_df_diag["residual"] = residuals

# Identify extreme suspicious points
bad = test_df_diag.loc[
    (test_df_diag["y_pred_cost"] > 800) &
    (test_df_diag["residual"] < -800)
]

bad[[TARGET, "VEHICLE_MODEL", "SERVICE", "REPAIRER_STATE", "y_pred_cost", "residual"]]
#################################################################################################

# ---- Empirical prediction interval from residuals (dollars) ----
# Choose coverage (80% interval shown here: 10th to 90th percentile)
lo_q, hi_q = 0.10, 0.90

res_lo = np.quantile(residuals, lo_q)
res_hi = np.quantile(residuals, hi_q)

print(f"Empirical residual interval ({int((hi_q-lo_q)*100)}%): "
      f"[{res_lo:.2f}, {res_hi:.2f}] dollars")

print("Residual summary:")
print(f"Mean residual: {residuals.mean():.2f}")
print(f"Std residual:  {residuals.std():.2f}")
print(f"MAE:           {np.mean(np.abs(residuals)):.2f}")



# For visualization only
plot_residuals = residuals.copy()
plot_residuals = np.clip(plot_residuals, -400, 400)

# 1) Residuals vs Predicted
plt.figure(figsize=(10,6))
plt.scatter(y_pred_cost, residuals, alpha=0.4)
plt.axhline(0, linewidth=2)
plt.xlabel("Predicted Cost ($)")
plt.ylabel("Residual (Actual - Predicted)")
plt.title("Residuals vs Predicted Cost")
plt.grid(True)
plt.show()

# 2) Residual Histogram
plt.figure(figsize=(10,6))
plt.hist(residuals, bins=40, edgecolor="black", alpha=0.8)
plt.axvline(0, linewidth=2)
plt.xlabel("Residual (Actual - Predicted)")
plt.ylabel("Count")
plt.title("Residual Histogram")
plt.grid(True)
plt.show()


In [None]:
example = pd.DataFrame([{
    "VEHICLE_YEAR": 2017,
    "ITEM_NAME": "FRONT",
    "VEHICLE_MODEL": "F150",
    "REPAIRER_STATE": "California",
    "SERVICE": "PDR"
}])

# numeric: align columns and fill missing with train means
num_fill = train_df[num_cols].astype(float).mean()
example_num = example.reindex(columns=num_cols, fill_value=np.nan).astype(float)
example_num = example_num.fillna(num_fill)

ex_num = scaler.transform(example_num)
# categorical
ex_cat = encode_cats(example, vocabs, cat_cols)

xc = torch.tensor(ex_cat, dtype=torch.long).to(device)
xn = torch.tensor(ex_num, dtype=torch.float32).to(device)

model.eval()
with torch.no_grad():
    pred_log = model(xc, xn).item()
pred_cost = np.expm1(pred_log)

# ---- Prediction range using empirical residual interval ---

pred_low  = max(0.0, pred_cost + res_lo)
pred_high = max(0.0, pred_cost + res_hi)

print(f"Predicted repair cost (point): ${pred_cost:.2f}")
print(f"Likely range ({int((hi_q-lo_q)*100)}%): ${pred_low:.2f} – ${pred_high:.2f}")
print(f"Uncertainty (half-width): ±${(pred_high - pred_low)/2:.2f}")

print(f"Predicted repair cost: ${pred_cost:.2f}")


baseline_log = np.full_like(y_test, y_train.mean())
baseline_pred = np.expm1(baseline_log)
baseline_true = np.expm1(y_test)

baseline_mae = mean_absolute_error(baseline_true, baseline_pred)

print(f"Baseline MAE (dollars): {baseline_mae:.2f}")
