# Zinara Vehicle License Compliance Prediction

This notebook contains the machine learning development and experimentation for predicting vehicle license compliance in the Zinara system.

## Table of Contents
1. [Data Loading and Exploration](#data-loading)
2. [Data Preprocessing](#preprocessing)
3. [Feature Engineering](#feature-engineering)
4. [Model Development](#model-development)
5. [Model Evaluation](#evaluation)
6. [Model Deployment](#deployment)

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
import os

# ML Libraries
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, 
    roc_auc_score, confusion_matrix, classification_report,
    roc_curve, precision_recall_curve
)
import xgboost as xgb
import joblib
import shap

# Set style for plots
plt.style.use('default')
sns.set_palette("husl")

print("Libraries imported successfully!")

## 1. Data Loading and Exploration <a name="data-loading"></a>

In [None]:
# Load the dataset
data_path = '../data/raw/zinara_vehicle_licensing_data_2015_2024.csv'
df = pd.read_csv(data_path)

print(f"Dataset loaded successfully! Shape: {df.shape}")
print("\nFirst 5 rows:")
display(df.head())

print("\nDataset Info:")
df.info()

print("\nBasic Statistics:")
display(df.describe())

In [None]:
# Check for missing values
print("Missing Values:")
missing_data = df.isnull().sum()
missing_percent = (missing_data / len(df)) * 100
missing_df = pd.DataFrame({'Missing Values': missing_data, 'Percentage': missing_percent})
display(missing_df[missing_df['Missing Values'] > 0])

# Create target variable from License Status
df['is_licensed'] = df['License Status'].map({'Compliant': True, 'Non-compliant': False, 'Expired': False})
# Handle any unmapped values (set to False for safety)
df['is_licensed'] = df['is_licensed'].fillna(False)

# Check target distribution
print("\nTarget Distribution:")
target_dist = df['is_licensed'].value_counts()
print(target_dist)
print(f"\nClass Imbalance Ratio: {target_dist.min() / target_dist.max():.2f}")


In [None]:
# Visualize distributions
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# Vehicle type distribution
df['Vehicle Type'].value_counts().plot(kind='bar', ax=axes[0,0])
axes[0,0].set_title('Vehicle Type Distribution')
axes[0,0].tick_params(axis='x', rotation=45)

# Region distribution
df['Region'].value_counts().plot(kind='bar', ax=axes[0,1])
axes[0,1].set_title('Region Distribution')

# Payment mode distribution
df['Preferred Payment Mode'].value_counts().plot(kind='bar', ax=axes[0,2])
axes[0,2].set_title('Payment Mode Distribution')

# Days since renewal distribution
df['Days Since Last Renewal'].hist(bins=50, ax=axes[1,0])
axes[1,0].set_title('Days Since Last Renewal')

# Total renewals distribution
df['Number of Late Renewals in Last 3 Years'].hist(bins=30, ax=axes[1,1])
axes[1,1].set_title('Total Renewals Distribution')

# Target distribution
df['is_licensed'].value_counts().plot(kind='pie', autopct='%1.1f%%', ax=axes[1,2])
axes[1,2].set_title('License Status Distribution')

plt.tight_layout()
plt.show()

## 2. Data Preprocessing <a name="preprocessing"></a>

In [None]:
def clean_data(df):
    """Clean the dataset following best practices"""
    print("Starting data cleaning process...")
    
    # 1. Remove duplicates
    initial_count = len(df)
    df = df.drop_duplicates(subset=['Vehicle ID'])
    print(f"Removed {initial_count - len(df)} duplicate records")
    
    # 2. Handle missing values intelligently
    
    # For payment modes - if payment is in-person/cash, online platform data should be null
    online_payment_mask = df['Preferred Payment Mode'].isin(['online', 'mobile_money'])
    
    if 'online_platform_last_login' in df.columns:
        df.loc[~online_payment_mask, 'online_platform_last_login'] = None
        df.loc[online_payment_mask & df['online_platform_last_login'].isna(), 'online_platform_last_login'] = df['last_license_renewal']
    
    # 3. Handle date inconsistencies
    date_columns = ['registration_date', 'last_license_renewal', 'license_expiry_date']
    for col in date_columns:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors='coerce')
    
    # 4. Remove records with impossible dates
    if 'registration_date' in df.columns:
        df = df[df['registration_date'] <= pd.Timestamp.now()]
    
    # 5. Handle agent sync delays
    if 'Agent Synchronization Lag' in df.columns and 'Preferred Payment Mode' in df.columns:
        agent_payment_mask = df['Preferred Payment Mode'] == 'Agent'
        df.loc[~agent_payment_mask, 'Agent Synchronization Lag'] = 0
        
        median_sync_delay = df.loc[agent_payment_mask, 'Agent Synchronization Lag'].median()
        df.loc[agent_payment_mask & df['Agent Synchronization Lag'].isna(), 'Agent Synchronization Lag'] = median_sync_delay
    
    # 6. Clean vehicle types
    if 'Vehicle Type' in df.columns:
        df['Vehicle Type'] = df['Vehicle Type'].str.lower().str.strip()
        type_mapping = {
            'car': 'sedan',
            'automobile': 'sedan',
            'motorbike': 'motorcycle',
            'bike': 'motorcycle',
            'lorry': 'truck',
            'pickup': 'truck'
        }
        df['Vehicle Type'] = df['Vehicle Type'].replace(type_mapping)
    
    # 7. Handle renewal counts
    if 'Number of Late Renewals in Last 3 Years' in df.columns:
        # Since we don't have total_renewals, we'll just ensure late renewals are reasonable
        df['Number of Late Renewals in Last 3 Years'] = df['Number of Late Renewals in Last 3 Years'].clip(lower=0)
    
    # 8. Remove outliers using IQR method
    numerical_cols = df.select_dtypes(include=[np.number]).columns
    for col in numerical_cols:
        if col not in ['Vehicle ID', 'is_licensed']:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            outlier_count = len(df[(df[col] < lower_bound) | (df[col] > upper_bound)])
            if outlier_count > 0:
                print(f"Removing {outlier_count} outliers from {col}")
                df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
    
    print(f"Data cleaning complete. Final dataset size: {len(df)}")
    return df

# Clean the data
df_clean = clean_data(df.copy())
print(f"\nCleaned dataset shape: {df_clean.shape}")

## 3. Feature Engineering <a name="feature-engineering"></a>

In [None]:
def engineer_features(df):
    """Engineer features for model training"""
    print("Engineering features...")
    
    # 1. Days since last renewal - already available as 'Days Since Last Renewal'
    # No need to create this feature as it's already in the dataset
    
    # 2. Vehicle age - not available in dataset, skip
    
    # 3. Renewal frequency - not available in dataset, skip
    
    # 4. Late renewal rate - not available in dataset, skip
    
    # 5. Season of last renewal - Month and Quarter are already available in dataset
    
    # 6. Payment digitalization score
    if 'Preferred Payment Mode' in df.columns:
        digital_mapping = {
            'Online': 1.0,
            'Mobile Money': 0.8,
            'Agent': 0.4,
            'Cash': 0.0,
            'Bank Transfer': 0.6
        }
        df['payment_digital_score'] = df['Preferred Payment Mode'].map(digital_mapping)
    
    # 7. Regional risk score
    if 'Region' in df.columns:
        region_risk = {
            'Urban': 0.3,
            'Peri-urban': 0.5,
            'Rural': 0.7
        }
        df['region_risk_score'] = df['Region'].map(region_risk)
    
    # 8. Agent efficiency
    if 'Agent Synchronization Lag' in df.columns:
        max_delay = df['Agent Synchronization Lag'].max()
        df['agent_efficiency'] = 1 - (df['Agent Synchronization Lag'] / (max_delay + 1))
    
    print("Feature engineering complete")
    return df

# Engineer features
df_featured = engineer_features(df_clean.copy())
print(f"\nFeatured dataset shape: {df_featured.shape}")
print("\nNew features added:")
new_features = set(df_featured.columns) - set(df_clean.columns)
print(list(new_features))

In [None]:
def prepare_features_target(df, target_column='is_licensed'):
    """Prepare features and target for model training"""
    
    # Define feature columns
    feature_cols = [
        'Days Since Last Renewal', 'vehicle_age_years', 'Total Renewals',
        'Number of Late Renewals in Last 3 Years', 'Average Renewal Lag Days', 'renewal_frequency',
        'late_renewal_rate', 'payment_digital_score', 'region_risk_score',
        'agent_efficiency', 'income_score', 'Month', 'Quarter'
    ]
    
    # Add categorical features with encoding
    categorical_features = []
    
    # Vehicle type encoding
    if 'Vehicle Type' in df.columns:
        vehicle_type_encoded = pd.get_dummies(df['Vehicle Type'], prefix='vehicle_type')
        df = pd.concat([df, vehicle_type_encoded], axis=1)
        categorical_features.extend(vehicle_type_encoded.columns.tolist())
    
    # Region encoding
    if 'Region' in df.columns:
        region_encoded = pd.get_dummies(df['Region'], prefix='region')
        df = pd.concat([df, region_encoded], axis=1)
        categorical_features.extend(region_encoded.columns.tolist())
    
    # Payment mode encoding
    if 'Preferred Payment Mode' in df.columns:
        payment_encoded = pd.get_dummies(df['Preferred Payment Mode'], prefix='payment')
        df = pd.concat([df, payment_encoded], axis=1)
        categorical_features.extend(payment_encoded.columns.tolist())
    
    # Income bracket encoding
    if 'Income Bracket' in df.columns:
        income_encoded = pd.get_dummies(df['Income Bracket'], prefix='income')
        df = pd.concat([df, income_encoded], axis=1)
        categorical_features.extend(income_encoded.columns.tolist())
    
    # Combine all features
    all_features = feature_cols + categorical_features
    
    # Filter existing columns
    available_features = [col for col in all_features if col in df.columns]
    
    # Prepare X and y
    # Convert to numeric, coercing errors to NaN, then fill with 0
    X = df[available_features].apply(pd.to_numeric, errors='coerce').fillna(0).astype(float)
    y = ~df[target_column]  # Invert because we want to predict unlicensed
    
    print(f"Prepared {len(available_features)} features for training")
    print(f"Target distribution - Unlicensed: {y.sum()}, Licensed: {(~y).sum()}")
    
    return X, y, available_features

# Prepare features and target
X, y, feature_names = prepare_features_target(df_featured.copy())
print(f"\nFeature matrix shape: {X.shape}")
print(f"Target shape: {y.shape}")

## 4. Model Development <a name="model-development"></a>

In [None]:
# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("\nData split and scaling complete!")

In [None]:
def evaluate_model(model, X_test, y_test, model_name):
    """Evaluate model performance"""
    if hasattr(model, 'predict_proba'):
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1]
    else:
        y_pred = model.predict(X_test)
        y_pred_proba = None
    
    metrics = {
        'accuracy': accuracy_score(y_test, y_pred),
        'precision': precision_score(y_test, y_pred),
        'recall': recall_score(y_test, y_pred),
        'f1_score': f1_score(y_test, y_pred),
    }
    
    if y_pred_proba is not None:
        metrics['roc_auc'] = roc_auc_score(y_test, y_pred_proba)
    
    print(f"\n{model_name} Results:")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")
    
    # Confusion Matrix
    cm = confusion_matrix(y_test, y_pred)
    print(f"\nConfusion Matrix:")
    print(cm)
    
    return metrics

# Initialize models
models = {
    'Logistic Regression': LogisticRegression(random_state=42, class_weight='balanced'),
    'Decision Tree': DecisionTreeClassifier(random_state=42, class_weight='balanced'),
    'Random Forest': RandomForestClassifier(random_state=42, class_weight='balanced', n_estimators=100),
    'XGBoost': xgb.XGBClassifier(random_state=42, scale_pos_weight=(y_train == 0).sum() / (y_train == 1).sum())
}

# Train and evaluate models
results = {}
for name, model in models.items():
    print(f"\n{'='*50}")
    print(f"Training {name}...")
    
    # Use scaled data for logistic regression, original for tree-based models
    if name == 'Logistic Regression':
        model.fit(X_train_scaled, y_train)
        results[name] = evaluate_model(model, X_test_scaled, y_test, name)
    else:
        model.fit(X_train, y_train)
        results[name] = evaluate_model(model, X_test, y_test, name)

In [None]:
# Compare model performance
results_df = pd.DataFrame(results).T
print("\nModel Comparison:")
display(results_df)

# Plot comparison
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

metrics = ['accuracy', 'precision', 'recall', 'f1_score']
for i, metric in enumerate(metrics):
    ax = axes[i//2, i%2]
    results_df[metric].plot(kind='bar', ax=ax)
    ax.set_title(f'Model Comparison - {metric.capitalize()}')
    ax.set_ylabel(metric.capitalize())
    ax.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

In [None]:
# Hyperparameter tuning for the best model (XGBoost)
print("Performing hyperparameter tuning for XGBoost...")

# Define parameter grid
param_grid = {
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.1, 0.3],
    'n_estimators': [100, 200, 300],
    'subsample': [0.8, 0.9, 1.0],
    'colsample_bytree': [0.8, 0.9, 1.0]
}

# Calculate scale_pos_weight
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()

# Initialize model
xgb_model = xgb.XGBClassifier(
    random_state=42,
    scale_pos_weight=scale_pos_weight,
    eval_metric='logloss'
)

# Perform grid search
grid_search = GridSearchCV(
    xgb_model, param_grid, cv=3, scoring='f1', n_jobs=-1, verbose=1
)
grid_search.fit(X_train, y_train)

print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best F1 score: {grid_search.best_score_:.4f}")

# Train best model
best_model = grid_search.best_estimator_
best_model.fit(X_train, y_train)

# Evaluate best model
best_metrics = evaluate_model(best_model, X_test, y_test, "Tuned XGBoost")

# Feature importance
feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': best_model.feature_importances_
}).sort_values('importance', ascending=False)

print("\nTop 10 Feature Importance:")
display(feature_importance.head(10))

## 5. Model Evaluation <a name="evaluation"></a>

In [None]:
# ROC Curve
y_pred_proba = best_model.predict_proba(X_test)[:, 1]
fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)
roc_auc = roc_auc_score(y_test, y_pred_proba)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

# Precision-Recall Curve
precision, recall, thresholds = precision_recall_curve(y_test, y_pred_proba)

plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.show()

# Feature importance plot
plt.figure(figsize=(10, 8))
feature_importance.head(15).plot(x='feature', y='importance', kind='barh', ax=plt.gca())
plt.title('Top 15 Feature Importance')
plt.xlabel('Importance')
plt.tight_layout()
plt.show()

In [None]:
# SHAP analysis for model interpretability
print("Performing SHAP analysis...")

# Create SHAP explainer for XGBoost (tree-based model)
explainer = shap.TreeExplainer(best_model)
shap_values = explainer.shap_values(X_test)

# Summary plot
plt.figure(figsize=(10, 8))
shap.summary_plot(shap_values, X_test, feature_names=feature_names, show=False)
plt.title('SHAP Feature Importance Summary')
plt.tight_layout()
plt.show()

# Waterfall plot for a single prediction
sample_idx = 0
shap.plots.waterfall(shap_values[sample_idx])
plt.title(f'SHAP Waterfall Plot for Sample {sample_idx}')
plt.tight_layout()
plt.show()

## 6. Model Deployment <a name="deployment"></a>

In [None]:
# Save the model and scaler
model_dir = '../data/models'
os.makedirs(model_dir, exist_ok=True)

# Save model
model_path = os.path.join(model_dir, 'xgboost_model_v1.0.joblib')
joblib.dump(best_model, model_path)

# Save scaler
scaler_path = os.path.join(model_dir, 'scaler_v1.0.joblib')
joblib.dump(scaler, scaler_path)

# Save feature names
features_path = os.path.join(model_dir, 'features_v1.0.joblib')
joblib.dump(feature_names, features_path)

print(f"Model saved to: {model_path}")
print(f"Scaler saved to: {scaler_path}")
print(f"Features saved to: {features_path}")

# Save model metadata
metadata = {
    'model_type': 'XGBoost',
    'version': '1.0',
    'training_date': datetime.now().isoformat(),
    'metrics': best_metrics,
    'feature_importance': feature_importance.to_dict('records'),
    'hyperparameters': grid_search.best_params_
}

metadata_path = os.path.join(model_dir, 'model_metadata_v1.0.json')
import json
with open(metadata_path, 'w') as f:
    json.dump(metadata, f, indent=2, default=str)

print(f"Metadata saved to: {metadata_path}")
print("\nModel deployment files ready!")

In [None]:
# Test model loading and prediction
print("Testing model loading and prediction...")

# Load model
loaded_model = joblib.load(model_path)
loaded_scaler = joblib.load(scaler_path)
loaded_features = joblib.load(features_path)

# Make prediction on a sample
sample = X_test.iloc[0:1]
sample_scaled = loaded_scaler.transform(sample)
prediction = loaded_model.predict(sample_scaled)[0]
probability = loaded_model.predict_proba(sample_scaled)[0][1]

print(f"Sample prediction: {prediction} (probability: {probability:.4f})")
print(f"Actual value: {y_test.iloc[0]}")
print("\nModel loading and prediction test successful!")

## Summary

This notebook has demonstrated the complete machine learning pipeline for predicting vehicle license compliance:

1. **Data Loading & Exploration**: Loaded and analyzed the vehicle licensing dataset
2. **Data Preprocessing**: Cleaned data, handled missing values, and removed outliers
3. **Feature Engineering**: Created meaningful features from raw data
4. **Model Development**: Trained and compared multiple ML models
5. **Model Evaluation**: Evaluated performance using various metrics and visualizations
6. **Model Deployment**: Saved the trained model for production use

### Key Findings:
- XGBoost performed best with F1 score of {best_metrics.get('f1_score', 'N/A'):.4f}
- Top features include days since renewal, payment digitalization score, and agent efficiency
- Model shows good discriminatory power with AUC of {best_metrics.get('roc_auc', 'N/A'):.4f}

### Next Steps:
1. Integrate the model into the Django application
2. Set up automated model retraining pipeline
3. Implement model monitoring and performance tracking
4. Add A/B testing capabilities for model updates