In [None]:
# ev_fault_detection.py
"""
Complete Python Implementation for EV Powertrain Fault Detection
Project: Fault Detection and Diagnosis in Electric Vehicle Powertrains Using Machine Learning
"""

import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.decomposition import PCA
import warnings
warnings.filterwarnings('ignore')

class PMSMSimulator:
    """
    Permanent Magnet Synchronous Motor Simulator with Fault Injection Capabilities
    """

    def __init__(self):
        # Motor parameters (typical values for EV traction motor)
        self.Rs = 0.2      # Stator resistance (Ohm)
        self.Ld = 0.001    # d-axis inductance (H)
        self.Lq = 0.001    # q-axis inductance (H)
        self.psi_m = 0.2   # Permanent magnet flux (Wb)
        self.J = 0.01      # Moment of inertia (kg.m²)
        self.B = 0.001     # Viscous friction coefficient
        self.p = 4         # Number of pole pairs

        # Simulation parameters
        self.fs = 10000    # Sampling frequency (Hz)
        self.dt = 1/self.fs

    def electrical_equations(self, id_val, iq_val, vd, vq, omega_e):
        """PMSM electrical equations in dq reference frame"""
        did_dt = (vd - self.Rs * id_val + omega_e * self.Lq * iq_val) / self.Ld
        diq_dt = (vq - self.Rs * iq_val - omega_e * (self.Ld * id_val + self.psi_m)) / self.Lq
        return did_dt, diq_dt

    def mechanical_equation(self, Te, TL, omega_m):
        """PMSM mechanical equation"""
        domega_dt = (Te - TL - self.B * omega_m) / self.J
        return domega_dt

    def torque_equation(self, id_val, iq_val):
        """Electromagnetic torque calculation"""
        return 1.5 * self.p * (self.psi_m * iq_val + (self.Ld - self.Lq) * id_val * iq_val)

    def dq_to_abc(self, id_val, iq_val, theta):
        """Transform dq currents to abc frame using inverse Park transformation"""
        ia = id_val * np.cos(theta) - iq_val * np.sin(theta)
        ib = id_val * np.cos(theta - 2*np.pi/3) - iq_val * np.sin(theta - 2*np.pi/3)
        ic = id_val * np.cos(theta + 2*np.pi/3) - iq_val * np.sin(theta + 2*np.pi/3)
        return ia, ib, ic

    def abc_to_dq(self, ia, ib, ic, theta):
        """Transform abc currents to dq frame using Park transformation"""
        id_val = (2/3) * (ia * np.cos(theta) + ib * np.cos(theta - 2*np.pi/3) + ic * np.cos(theta + 2*np.pi/3))
        iq_val = (2/3) * (-ia * np.sin(theta) - ib * np.sin(theta - 2*np.pi/3) - ic * np.sin(theta + 2*np.pi/3))
        return id_val, iq_val

    def simulate_healthy(self, duration=2, speed_ref=100, load_torque=5):
        """Simulate healthy PMSM operation"""
        t = np.arange(0, duration, self.dt)
        n_points = len(t)

        # Initialize state variables
        id_vals = np.zeros(n_points)
        iq_vals = np.zeros(n_points)
        omega_m = np.zeros(n_points)
        theta = np.zeros(n_points)

        # Initial conditions
        id_vals[0] = 0
        iq_vals[0] = 0
        omega_m[0] = 0
        theta[0] = 0

        # Reference speed (rad/s)
        omega_ref = speed_ref * 2 * np.pi / 60

        for i in range(n_points-1):
            # Simple speed controller
            speed_error = omega_ref - omega_m[i]
            iq_ref = speed_error * 0.1  # PI controller simplified
            id_ref = 0  # Field weakening not considered

            # Voltage references (simplified control)
            vd = 0.5 * (id_ref - id_vals[i])
            vq = 0.5 * (iq_ref - iq_vals[i]) + omega_m[i] * self.p * self.psi_m

            # Electrical equations
            did_dt, diq_dt = self.electrical_equations(id_vals[i], iq_vals[i], vd, vq, omega_m[i]*self.p)

            # Mechanical equation
            Te = self.torque_equation(id_vals[i], iq_vals[i])
            domega_dt = self.mechanical_equation(Te, load_torque, omega_m[i])

            # Integration (Euler method for simplicity)
            id_vals[i+1] = id_vals[i] + did_dt * self.dt
            iq_vals[i+1] = iq_vals[i] + diq_dt * self.dt
            omega_m[i+1] = omega_m[i] + domega_dt * self.dt
            theta[i+1] = theta[i] + omega_m[i] * self.p * self.dt

        # Generate final abc currents
        ia_vals, ib_vals, ic_vals = [], [], []
        for i in range(n_points):
            ia, ib, ic = self.dq_to_abc(id_vals[i], iq_vals[i], theta[i])
            ia_vals.append(ia)
            ib_vals.append(ib)
            ic_vals.append(ic)

        return np.array(ia_vals), np.array(ib_vals), np.array(ic_vals), t

    def simulate_open_phase(self, duration=2, speed_ref=100, load_torque=5, faulty_phase='A'):
        """Simulate open-phase fault by setting one phase current to zero"""
        ia, ib, ic, t = self.simulate_healthy(duration, speed_ref, load_torque)

        if faulty_phase == 'A':
            ia = np.zeros_like(ia)
        elif faulty_phase == 'B':
            ib = np.zeros_like(ib)
        elif faulty_phase == 'C':
            ic = np.zeros_like(ic)

        return ia, ib, ic, t


In [None]:
def simulate_bearing_fault(self, duration=2, speed_ref=100, load_torque=5, fault_severity=0.3):
        """Simulate bearing fault with torque oscillations"""
        ia, ib, ic, t = self.simulate_healthy(duration, speed_ref, load_torque)

        # Add bearing fault signature - torque oscillations
        fault_freq = speed_ref / 60 * 3.2  # Characteristic bearing fault frequency
        torque_oscillation = fault_severity * load_torque * np.sin(2 * np.pi * fault_freq * t)

        # Modify currents to reflect torque oscillations
        modulation = 1 + 0.1 * torque_oscillation / load_torque
        ia *= modulation
        ib *= modulation
        ic *= modulation

        return ia, ib, ic, t

class FeatureExtractor:
    """Feature extraction from current signals"""

    def __init__(self, window_size=1000, overlap=500):
        self.window_size = window_size
        self.overlap = overlap

    def extract_features(self, ia, ib, ic):
        """Extract statistical features from three-phase currents"""
        n_samples = len(ia)
        features = []

        for start in range(0, n_samples - self.window_size, self.window_size - self.overlap):
            end = start + self.window_size

            # Extract window for each phase
            ia_win = ia[start:end]
            ib_win = ib[start:end]
            ic_win = ic[start:end]

            window_features = []

            for phase_current in [ia_win, ib_win, ic_win]:
                # Statistical features
                mean_val = np.mean(phase_current)
                rms_val = np.sqrt(np.mean(phase_current**2))
                variance_val = np.var(phase_current)
                skewness_val = self.skewness(phase_current)
                kurtosis_val = self.kurtosis(phase_current)
                peak_val = np.max(np.abs(phase_current))
                energy_val = np.sum(phase_current**2)

                phase_features = [mean_val, rms_val, variance_val, skewness_val,
                                kurtosis_val, peak_val, energy_val]
                window_features.extend(phase_features)

            features.append(window_features)

        return np.array(features)

    def skewness(self, x):
        """Calculate skewness of signal"""
        n = len(x)
        mean_x = np.mean(x)
        std_x = np.std(x)
        if std_x == 0:
            return 0
        return (np.sum((x - mean_x)**3) / n) / (std_x**3)

    def kurtosis(self, x):
        """Calculate kurtosis of signal"""
        n = len(x)
        mean_x = np.mean(x)
        std_x = np.std(x)
        if std_x == 0:
            return 0
        return (np.sum((x - mean_x)**4) / n) / (std_x**4) - 3

class FaultDetectionModel:
    """Machine learning model for fault detection in EV powertrain"""

    def __init__(self):
        # Initialize classifiers
        self.rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
        self.svm_classifier = SVC(kernel='rbf', C=1, gamma=0.1)
        self.scaler = StandardScaler()
        self.pca = PCA(n_components=5)

    def train(self, X_train, y_train):
        """Train the model"""
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_train_pca = self.pca.fit_transform(X_train_scaled)

        # Train the classifiers
        self.rf_classifier.fit(X_train_pca, y_train)
        self.svm_classifier.fit(X_train_pca, y_train)

    def predict(self, X_test):
        """Make predictions using the trained model"""
        X_test_scaled = self.scaler.transform(X_test)
        X_test_pca = self.pca.transform(X_test_scaled)

        rf_predictions = self.rf_classifier.predict(X_test_pca)
        svm_predictions = self.svm_classifier.predict(X_test_pca)

        return rf_predictions, svm_predictions

    def evaluate(self, X_test, y_test):
        """Evaluate the model performance"""
        rf_predictions, svm_predictions = self.predict(X_test)

        print("Random Forest Classifier:")
        print(classification_report(y_test, rf_predictions))
        print("Confusion Matrix:\n", confusion_matrix(y_test, rf_predictions))
        print("Accuracy:", accuracy_score(y_test, rf_predictions))

        print("\nSupport Vector Machine Classifier:")
        print(classification_report(y_test, svm_predictions))
        print("Confusion Matrix:\n", confusion_matrix(y_test, svm_predictions))
        print("Accuracy:", accuracy_score(y_test, svm_predictions))

if __name__ == "__main__":
    # Initialize simulator, feature extractor, and model
    pmsm_sim = PMSMSimulator()
    feature_extractor = FeatureExtractor()
    fault_model = FaultDetectionModel()

    # Simulate healthy and faulty signals
    ia, ib, ic, t = pmsm_sim.simulate_healthy()
    ia_fault, ib_fault, ic_fault, t_fault = pmsm_sim.simulate_open_phase()

    # Extract features
    healthy_features = feature_extractor.extract_features(ia, ib, ic)
    faulty_features = feature_extractor.extract_features(ia_fault, ib_fault, ic_fault)

    # Create dataset (combine healthy and faulty features)
    X = np.vstack([healthy_features, faulty_features])
    y = np.hstack([np.zeros(len(healthy_features)), np.ones(len(faulty_features))])

    # Split dataset into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Train and evaluate the model
    fault_model.train(X_train, y_train)
    fault_model.evaluate(X_test, y_test)

Random Forest Classifier:
              precision    recall  f1-score   support

         0.0       1.00      1.00      1.00         8
         1.0       1.00      1.00      1.00         8

    accuracy                           1.00        16
   macro avg       1.00      1.00      1.00        16
weighted avg       1.00      1.00      1.00        16

Confusion Matrix:
 [[8 0]
 [0 8]]
Accuracy: 1.0

Support Vector Machine Classifier:
              precision    recall  f1-score   support

         0.0       1.00      0.88      0.93         8
         1.0       0.89      1.00      0.94         8

    accuracy                           0.94        16
   macro avg       0.94      0.94      0.94        16
weighted avg       0.94      0.94      0.94        16

Confusion Matrix:
 [[7 1]
 [0 8]]
Accuracy: 0.9375
