**Bias Mitigation and Ethical AI System**
---

---


**AI Development Workflow Assignment - Part 3 Implementation **

This module implements fairness-aware machine learning techniques
to address algorithmic bias in healthcare AI systems.


In [5]:
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

In [9]:
class FairnessAwareClassifier(BaseEstimator, ClassifierMixin):
    """
    A fairness-aware classifier that implements demographic parity constraints
    and bias mitigation techniques for healthcare AI applications.
    """

    def __init__(self, base_classifier=None, fairness_constraint='demographic_parity',
                 lambda_fairness=0.1, sensitive_features=None):
        """
        Initialize fairness-aware classifier.

        Args:
            base_classifier: Base ML model (default: RandomForestClassifier)
            fairness_constraint: Type of fairness constraint ('demographic_parity', 'equalized_odds')
            lambda_fairness: Weight for fairness regularization term
            sensitive_features: List of sensitive feature column names
        """
        self.base_classifier = base_classifier or RandomForestClassifier(random_state=42)
        self.fairness_constraint = fairness_constraint
        self.lambda_fairness = lambda_fairness
        self.sensitive_features = sensitive_features or []
        self.threshold_adjustments = {}
        self.is_fitted = False

    def fit(self, X, y, sensitive_attributes=None):
        """
        Fit the fairness-aware classifier.

        Args:
            X: Training features
            y: Training target
            sensitive_attributes: DataFrame with sensitive attributes
        """
        # Fit base classifier
        self.base_classifier.fit(X, y)

        if sensitive_attributes is not None:
            # Calculate group-specific thresholds for fairness
            self._calculate_fair_thresholds(X, y, sensitive_attributes)

        self.is_fitted = True
        return self

    def _calculate_fair_thresholds(self, X, y, sensitive_attributes):
        """
        Calculate group-specific prediction thresholds to ensure fairness.
        """
        # Get base predictions
        base_proba = self.base_classifier.predict_proba(X)[:, 1]

        # Calculate optimal thresholds for each sensitive group
        for sensitive_feature in self.sensitive_features:
            if sensitive_feature in sensitive_attributes.columns:
                group_thresholds = {}
                unique_groups = sensitive_attributes[sensitive_feature].unique()

                # Calculate baseline metrics
                overall_positive_rate = y.mean()

                for group in unique_groups:
                    group_mask = sensitive_attributes[sensitive_feature] == group
                    group_y = y[group_mask]
                    group_proba = base_proba[group_mask]

                    if len(group_y) > 0:
                        # Find threshold that achieves demographic parity
                        if self.fairness_constraint == 'demographic_parity':
                            # Adjust threshold to match overall positive prediction rate
                            threshold = np.percentile(group_proba, (1 - overall_positive_rate) * 100)
                            group_thresholds[group] = max(0.1, min(0.9, threshold))

                        elif self.fairness_constraint == 'equalized_odds':
                            # Adjust threshold to equalize TPR and FPR across groups
                            threshold = self._find_equalized_odds_threshold(group_y, group_proba, overall_positive_rate)
                            group_thresholds[group] = threshold

                self.threshold_adjustments[sensitive_feature] = group_thresholds

    def _find_equalized_odds_threshold(self, y_true, y_proba, target_rate):
        """
        Find threshold that achieves equalized odds constraint.
        """
        thresholds = np.linspace(0.1, 0.9, 50)
        best_threshold = 0.5
        best_score = float('inf')

        for threshold in thresholds:
            predictions = (y_proba >= threshold).astype(int)

            # Calculate TPR and FPR
            tp = np.sum((y_true == 1) & (predictions == 1))
            fp = np.sum((y_true == 0) & (predictions == 1))
            tn = np.sum((y_true == 0) & (predictions == 0))
            fn = np.sum((y_true == 1) & (predictions == 0))

            tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
            fpr = fp / (fp + tn) if (fp + tn) > 0 else 0

            # Score based on deviation from target rates
            score = abs(tpr - target_rate) + abs(fpr - (1 - target_rate))

            if score < best_score:
                best_score = score
                best_threshold = threshold

        return best_threshold

    def predict(self, X, sensitive_attributes=None):
        """
        Make fairness-aware predictions.
        """
        if not self.is_fitted:
            raise ValueError("Model must be fitted before prediction")

        # Get base predictions
        base_proba = self.base_classifier.predict_proba(X)[:, 1]

        if sensitive_attributes is None or not self.threshold_adjustments:
            # Use standard threshold if no fairness adjustments
            return (base_proba >= 0.5).astype(int)

        # Apply group-specific thresholds
        predictions = np.zeros(len(X))

        for sensitive_feature, group_thresholds in self.threshold_adjustments.items():
            if sensitive_feature in sensitive_attributes.columns:
                for group, threshold in group_thresholds.items():
                    group_mask = sensitive_attributes[sensitive_feature] == group
                    predictions[group_mask] = (base_proba[group_mask] >= threshold).astype(int)

        return predictions.astype(int)

    def predict_proba(self, X):
        """
        Return prediction probabilities from base classifier.
        """
        if not self.is_fitted:
            raise ValueError("Model must be fitted before prediction")

        return self.base_classifier.predict_proba(X)


class BiasAuditSystem:
    """
    Comprehensive system for detecting and measuring algorithmic bias
    in machine learning models used in healthcare.
    """

    def __init__(self):
        self.audit_results = {}
        self.mitigation_strategies = []

    def comprehensive_bias_audit(self, model, X_test, y_test, sensitive_attributes,
                                sensitive_features=['race', 'gender']):
        """
        Perform comprehensive bias audit across multiple fairness metrics.

        Args:
            model: Trained ML model
            X_test: Test features
            y_test: Test target
            sensitive_attributes: DataFrame with sensitive attributes
            sensitive_features: List of sensitive features to audit

        Returns:
            dict: Comprehensive bias audit results
        """
        print("Performing comprehensive bias audit...")

        # Get predictions
        if hasattr(model, 'predict_proba'):
            y_proba = model.predict_proba(X_test)[:, 1]
        else:
            y_proba = model.predict(X_test)

        y_pred = model.predict(X_test, sensitive_attributes=sensitive_attributes)

        audit_results = {}

        for feature in sensitive_features:
            if feature in sensitive_attributes.columns:
                feature_results = self._audit_feature_bias(
                    y_test, y_pred, y_proba, sensitive_attributes[feature], feature
                )
                audit_results[feature] = feature_results

        # Calculate intersectional bias (multiple sensitive attributes)
        if len(sensitive_features) > 1:
            intersectional_results = self._audit_intersectional_bias(
                y_test, y_pred, y_proba, sensitive_attributes, sensitive_features
            )
            audit_results['intersectional'] = intersectional_results

        self.audit_results = audit_results
        return audit_results

    def _audit_feature_bias(self, y_true, y_pred, y_proba, sensitive_attribute, feature_name):
        """
        Audit bias for a single sensitive feature.
        """
        unique_groups = sensitive_attribute.unique()
        group_metrics = {}

        for group in unique_groups:
            group_mask = sensitive_attribute == group
            group_y_true = y_true[group_mask]
            group_y_pred = y_pred[group_mask]
            group_y_proba = y_proba[group_mask]

            if len(group_y_true) > 0:
                # Calculate fairness metrics
                metrics = self._calculate_fairness_metrics(group_y_true, group_y_pred, group_y_proba)
                group_metrics[group] = metrics

        # Calculate bias measures
        bias_measures = self._calculate_bias_measures(group_metrics)

        return {
            'group_metrics': group_metrics,
            'bias_measures': bias_measures,
            'feature_name': feature_name
        }

    def _calculate_fairness_metrics(self, y_true, y_pred, y_proba):
        """
        Calculate comprehensive fairness metrics for a group.
        """
        # Confusion matrix components
        cm = confusion_matrix(y_true, y_pred).ravel()
        if len(cm) == 1:
            # Handle single-class case
            if y_true.iloc[0] == 0:  # All negatives
                tn, fp, fn, tp = cm[0], 0, 0, 0
            else:  # All positives
                tn, fp, fn, tp = 0, 0, 0, cm[0]
        else:
            tn, fp, fn, tp = cm

        # Basic metrics
        accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

        # Fairness-specific metrics
        positive_prediction_rate = np.mean(y_pred)
        negative_prediction_rate = 1 - positive_prediction_rate
        base_rate = np.mean(y_true)

        # Calibration metrics
        calibration_score = self._calculate_calibration(y_true, y_proba)

        return {
            'sample_size': len(y_true),
            'base_rate': base_rate,
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'specificity': specificity,
            'positive_prediction_rate': positive_prediction_rate,
            'negative_prediction_rate': negative_prediction_rate,
            'calibration_score': calibration_score,
            'auc_score': roc_auc_score(y_true, y_proba) if len(np.unique(y_true)) > 1 else 0.5
        }

    def _calculate_calibration(self, y_true, y_proba, n_bins=10):
        """
        Calculate calibration score (Brier score).
        """
        return np.mean((y_proba - y_true) ** 2)

    def _calculate_bias_measures(self, group_metrics):
        """
        Calculate bias measures comparing groups.
        """
        if len(group_metrics) < 2:
            return {}

        groups = list(group_metrics.keys())
        bias_measures = {}

        # Demographic Parity Difference
        pprs = [group_metrics[group]['positive_prediction_rate'] for group in groups]
        bias_measures['demographic_parity_diff'] = max(pprs) - min(pprs)

        # Equalized Odds Difference (TPR difference)
        tprs = [group_metrics[group]['recall'] for group in groups]
        bias_measures['equalized_odds_diff'] = max(tprs) - min(tprs)

        # Equality of Opportunity Difference (FPR difference)
        fprs = [1 - group_metrics[group]['specificity'] for group in groups]
        bias_measures['equality_opportunity_diff'] = max(fprs) - min(fprs)

        # Calibration Difference
        calibrations = [group_metrics[group]['calibration_score'] for group in groups]
        bias_measures['calibration_diff'] = max(calibrations) - min(calibrations)

        # Statistical Significance Tests
        bias_measures['statistical_tests'] = self._perform_bias_tests(group_metrics)

        return bias_measures

    def _perform_bias_tests(self, group_metrics):
        """
        Perform statistical tests for bias detection.
        """
        groups = list(group_metrics.keys())
        if len(groups) < 2:
            return {}

        tests = {}

        # Chi-square test for independence (prediction rates)
        pprs = [group_metrics[group]['positive_prediction_rate'] for group in groups]
        sample_sizes = [group_metrics[group]['sample_size'] for group in groups]

        # Perform chi-square test
        observed = [[int(ppr * size), int((1-ppr) * size)] for ppr, size in zip(pprs, sample_sizes)]
        if all(sum(obs) > 0 for obs in observed):
            try:
                chi2, p_value = stats.chi2_contingency(observed)[:2]
                tests['chi2_test'] = {'statistic': chi2, 'p_value': p_value}
            except:
                tests['chi2_test'] = {'statistic': None, 'p_value': None}

        return tests

    def _audit_intersectional_bias(self, y_true, y_pred, y_proba, sensitive_attributes, features):
        """
        Audit intersectional bias across multiple sensitive attributes.
        """
        # Create intersectional groups
        intersectional_groups = sensitive_attributes[features].apply(
            lambda x: '_'.join(x.astype(str)), axis=1
        )

        return self._audit_feature_bias(y_true, y_pred, y_proba, intersectional_groups, 'intersectional')

    def generate_bias_report(self, audit_results=None):
        """
        Generate comprehensive bias audit report.
        """
        if audit_results is None:
            audit_results = self.audit_results

        report = """
ALGORITHMIC BIAS AUDIT REPORT
============================
Generated: {timestamp}

EXECUTIVE SUMMARY
-----------------
This report presents findings from a comprehensive algorithmic bias audit
of the healthcare AI system. The audit examines fairness across demographic
groups and provides recommendations for bias mitigation.

""".format(timestamp=pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S'))

        for feature, results in audit_results.items():
            if feature != 'intersectional':
                report += self._format_feature_report(feature, results)

        # Add recommendations
        report += self._generate_recommendations(audit_results)

        return report

    def _format_feature_report(self, feature, results):
        """
        Format bias audit results for a specific feature.
        """
        report = f"""
BIAS ANALYSIS: {feature.upper()}
{'-' * (15 + len(feature))}

Group Performance Metrics:
"""

        for group, metrics in results['group_metrics'].items():
            report += f"""
{group}:
  - Sample Size: {metrics['sample_size']}
  - Base Rate: {metrics['base_rate']:.3f}
  - Positive Prediction Rate: {metrics['positive_prediction_rate']:.3f}
  - Precision: {metrics['precision']:.3f}
  - Recall: {metrics['recall']:.3f}
  - AUC Score: {metrics['auc_score']:.3f}
"""

        # Add bias measures
        bias_measures = results['bias_measures']
        report += f"""
Bias Measures:
  - Demographic Parity Difference: {bias_measures.get('demographic_parity_diff', 0):.3f}
  - Equalized Odds Difference: {bias_measures.get('equalized_odds_diff', 0):.3f}
  - Equality of Opportunity Difference: {bias_measures.get('equality_opportunity_diff', 0):.3f}
  - Calibration Difference: {bias_measures.get('calibration_diff', 0):.3f}

"""

        # Bias severity assessment
        severity = self._assess_bias_severity(bias_measures)
        report += f"Bias Severity Assessment: {severity}\n"

        return report

    def _assess_bias_severity(self, bias_measures):
        """
        Assess the severity of detected bias.
        """
        dp_diff = bias_measures.get('demographic_parity_diff', 0)
        eo_diff = bias_measures.get('equalized_odds_diff', 0)

        if dp_diff > 0.2 or eo_diff > 0.2:
            return "HIGH - Immediate intervention required"
        elif dp_diff > 0.1 or eo_diff > 0.1:
            return "MODERATE - Bias mitigation recommended"
        elif dp_diff > 0.05 or eo_diff > 0.05:
            return "LOW - Monitor and consider mitigation"
        else:
            return "MINIMAL - Continue monitoring"

    def _generate_recommendations(self, audit_results):
        """
        Generate bias mitigation recommendations.
        """
        recommendations = """
BIAS MITIGATION RECOMMENDATIONS
==============================

Based on the audit findings, the following actions are recommended:

1. IMMEDIATE ACTIONS:
   - Implement fairness-aware model training with demographic parity constraints
   - Establish group-specific prediction thresholds
   - Enhance data collection to ensure representative sampling

2. SHORT-TERM IMPROVEMENTS:
   - Implement post-processing calibration techniques
   - Develop bias monitoring dashboards for continuous assessment
   - Train clinical staff on bias-aware decision making

3. LONG-TERM STRATEGIES:
   - Collect more diverse and representative training data
   - Implement federated learning approaches across diverse healthcare systems
   - Establish regular bias auditing protocols (quarterly assessments)

4. REGULATORY COMPLIANCE:
   - Document bias mitigation efforts for regulatory review
   - Implement explainable AI techniques for clinical transparency
   - Establish patient consent protocols for AI-assisted care

5. CONTINUOUS MONITORING:
   - Set up automated bias detection alerts
   - Implement A/B testing for bias mitigation strategies
   - Regular retraining with updated diverse datasets
"""

        return recommendations


# =============================================
# DEMONSTRATION CODE
# =============================================

def run_demo():
    """Demonstrate the fairness auditing system with synthetic healthcare data"""
    print("=== Healthcare AI Bias Audit Demonstration ===")

    # Generate synthetic healthcare dataset
    X, y = make_classification(
        n_samples=2000,
        n_features=15,
        n_informative=8,
        n_classes=2,
        weights=[0.7, 0.3],  # Imbalanced classes
        random_state=42
    )

    X = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])
    y = pd.Series(y, name='target')

    # Create sensitive attributes (race, gender, age_group)
    sensitive_attrs = pd.DataFrame({
        'race': np.random.choice(['White', 'Black', 'Asian', 'Hispanic'], size=2000, p=[0.5, 0.2, 0.2, 0.1]),
        'gender': np.random.choice(['Male', 'Female', 'Other'], size=2000, p=[0.45, 0.5, 0.05]),
        'age_group': np.random.choice(['18-30', '31-50', '51+'], size=2000)
    })

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    sens_attrs_train = sensitive_attrs.loc[X_train.index]
    sens_attrs_test = sensitive_attrs.loc[X_test.index]

    # Initialize and train fairness-aware model
    print("\nTraining fairness-aware classifier...")
    fair_model = FairnessAwareClassifier(
        base_classifier=LogisticRegression(max_iter=1000),
        fairness_constraint='demographic_parity',
        sensitive_features=['race', 'gender']
    )
    fair_model.fit(X_train, y_train, sens_attrs_train)

    # Perform comprehensive bias audit
    print("\nConducting bias audit...")
    auditor = BiasAuditSystem()
    audit_results = auditor.comprehensive_bias_audit(
        fair_model,
        X_test,
        y_test,
        sens_attrs_test,
        sensitive_features=['race', 'gender', 'age_group']
    )

    # Generate and display report
    print("\nGenerating audit report...")
    report = auditor.generate_bias_report(audit_results)
    print(report)

    # Save report to file
    with open("healthcare_ai_bias_audit.txt", "w") as f:
        f.write(report)
    print("\nReport saved to 'healthcare_ai_bias_audit.txt'")

if __name__ == "__main__":
    run_demo()

=== Healthcare AI Bias Audit Demonstration ===

Training fairness-aware classifier...

Conducting bias audit...
Performing comprehensive bias audit...

Generating audit report...

ALGORITHMIC BIAS AUDIT REPORT
Generated: 2025-07-25 00:08:06

EXECUTIVE SUMMARY
-----------------
This report presents findings from a comprehensive algorithmic bias audit
of the healthcare AI system. The audit examines fairness across demographic
groups and provides recommendations for bias mitigation.


BIAS ANALYSIS: RACE
-------------------

Group Performance Metrics:

White:
  - Sample Size: 285
  - Base Rate: 0.354
  - Positive Prediction Rate: 0.309
  - Precision: 0.739
  - Recall: 0.644
  - AUC Score: 0.843

Asian:
  - Sample Size: 110
  - Base Rate: 0.218
  - Positive Prediction Rate: 0.273
  - Precision: 0.533
  - Recall: 0.667
  - AUC Score: 0.818

Black:
  - Sample Size: 140
  - Base Rate: 0.214
  - Positive Prediction Rate: 0.279
  - Precision: 0.410
  - Recall: 0.533
  - AUC Score: 0.766

Hispan