In [221]:
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import precision_recall_curve, f1_score, roc_auc_score, roc_curve
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV

In [222]:
data = pd.read_csv("nasa_data/Battery_Data_Cleaned.csv")
data.shape

(7368, 9)

So there are 7368 different instances 

In [223]:
data["battery_id"].value_counts().shape

(34,)

The dataset has 34 different battery datasets

In [224]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7368 entries, 0 to 7367
Data columns (total 9 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   type                 7368 non-null   int64  
 1   ambient_temperature  7368 non-null   int64  
 2   battery_id           7368 non-null   int64  
 3   test_id              7368 non-null   int64  
 4   uid                  7368 non-null   int64  
 5   filename             7368 non-null   object 
 6   Capacity             7368 non-null   float64
 7   Re                   7368 non-null   float64
 8   Rct                  7368 non-null   float64
dtypes: float64(3), int64(5), object(1)
memory usage: 518.2+ KB


In [225]:
data.describe()

Unnamed: 0,type,ambient_temperature,battery_id,test_id,uid,Capacity,Re,Rct
count,7368.0,7368.0,7368.0,7368.0,7368.0,7368.0,7368.0,7368.0
mean,0.002443,19.911238,32.213762,166.309718,3735.133415,0.824926,0.077739,0.125128
std,0.865297,11.210718,16.643714,139.771878,2190.232696,0.250283,0.022584,0.044834
min,-1.0,4.0,5.0,0.0,1.0,0.0,0.026691,0.038781
25%,-1.0,4.0,18.0,54.0,1842.75,0.775098,0.060875,0.084685
50%,0.0,24.0,36.0,125.0,3686.5,0.894803,0.074693,0.118383
75%,1.0,24.0,45.0,244.25,5603.25,0.986519,0.095817,0.158926
max,1.0,44.0,56.0,555.0,7565.0,1.292025,0.142128,0.238124


In [226]:
len(data["battery_id"].value_counts())

34

So the dataset has data of 34 batteries

In [227]:
dead_batteries = pd.DataFrame(data["Capacity"] > 0.75)
dead_batteries["Capacity"].value_counts()

Capacity
True     5750
False    1618
Name: count, dtype: int64

In [228]:
data = data.rename(columns={"Re" : "internal_resistance", "Rct" : "charge_transfer_resistance", "Capacity" : "capacity"})

In [229]:
data.describe()

Unnamed: 0,type,ambient_temperature,battery_id,test_id,uid,capacity,internal_resistance,charge_transfer_resistance
count,7368.0,7368.0,7368.0,7368.0,7368.0,7368.0,7368.0,7368.0
mean,0.002443,19.911238,32.213762,166.309718,3735.133415,0.824926,0.077739,0.125128
std,0.865297,11.210718,16.643714,139.771878,2190.232696,0.250283,0.022584,0.044834
min,-1.0,4.0,5.0,0.0,1.0,0.0,0.026691,0.038781
25%,-1.0,4.0,18.0,54.0,1842.75,0.775098,0.060875,0.084685
50%,0.0,24.0,36.0,125.0,3686.5,0.894803,0.074693,0.118383
75%,1.0,24.0,45.0,244.25,5603.25,0.986519,0.095817,0.158926
max,1.0,44.0,56.0,555.0,7565.0,1.292025,0.142128,0.238124


In [294]:
def label_battery_dataset(df, capacity_threshold=0.80, ir_std_multiplier=2, ctr_std_multiplier=2):
    """
    Labels battery dataset instances as 'healthy' (1) or 'dead' (0) based on capacity,
    internal resistance (IR), and charge transfer resistance (CTR).
    
    Parameters:
    - capacity_threshold: Fraction of nominal capacity below which a battery is labeled 'dead' (default: 0.75)
    - ir_std_multiplier: Number of standard deviations for IR outlier threshold (default: 2)
    - ctr_std_multiplier: Number of standard deviations for CTR outlier threshold (default: 2)
    
    Returns:
    - df: DataFrame with an additional 'label' column (1 = healthy, 0 = dead)
    """
    # Initialize label column
    df['label'] = 1  # Default to healthy
    
    # Get unique battery IDs
    battery_ids = df['battery_id'].unique()
    
    for battery_id in battery_ids:
        # Filter data for the current battery
        battery_data = df[df['battery_id'] == battery_id].copy()
        
        # Estimate nominal capacity as the maximum observed capacity for this battery
        nominal_capacity = battery_data['capacity'].max()
        
        # Compute mean and std for IR and CTR in early cycles (first 10% or charge cycles)
        early_cycles = battery_data[
            (battery_data['type'] == 1) |  # Charge cycles
            (battery_data.index < battery_data.index.min() + len(battery_data) * 0.2)  # First 10% of cycles
        ]
        ir_mean = early_cycles['internal_resistance'].mean()
        ir_std = early_cycles['internal_resistance'].std()
        ctr_mean = early_cycles['charge_transfer_resistance'].mean()
        ctr_std = early_cycles['charge_transfer_resistance'].std()
        
        # Define thresholds
        capacity_limit = nominal_capacity * capacity_threshold
        ir_limit = ir_mean + ir_std_multiplier * ir_std
        ctr_limit = ctr_mean + ctr_std_multiplier * ctr_std
        
        # Label instances as 'dead' (0) if any condition is met
        df.loc[
            (df['battery_id'] == battery_id) & 
            (
                (df['capacity'] < capacity_limit) |
                (df['internal_resistance'] > ir_limit) |
                (df['charge_transfer_resistance'] > ctr_limit)
            ),
            'label'
        ] = 0
    
    return df

In [296]:
data = label_battery_dataset(data)
data["label"].value_counts()

label
1    5569
0    1799
Name: count, dtype: int64

In [319]:
def train_semi_supervised_ensemble(df, features, contamination=0.1, nu=0.1, n_neighbors=50, weights=None):
    """
    Trains a semi-supervised ensemble of Isolation Forest, One-Class SVM, and LOF for battery fault detection.
    
    Parameters:
    - df: pandas DataFrame with columns ['battery_id', 'capacity', 'internal_resistance', 
        'charge_transfer_resistance', 'type', 'label']
    - features: List of feature columns for training
    - contamination: Contamination parameter for Isolation Forest and LOF (default: 0.1)
    - nu: Nu parameter for One-Class SVM (default: 0.1)
    - n_neighbors: Number of neighbors for LOF (default: 20)
    - weights: List of weights for [Isolation Forest, One-Class SVM, LOF] (default: equal weights)
    
    Returns:
    - models: Dictionary of trained models
    - scaler: Fitted StandardScaler
    - scores: Dictionary of individual model and ensemble anomaly scores
    - predictions: Dictionary of individual model and ensemble binary predictions
    """
    if weights is None:
        weights = [1/3, 1/3, 1/3]  # Equal weights by default
    
    # Split labeled and unlabeled data
    labeled_data = df[df['label'].notnull()]
    healthy_data = labeled_data[labeled_data['label'] == 1]  # Train on healthy instances (label = 1)
    X = healthy_data[features]
    
    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # Initialize models
    models = {
        'IsolationForest': IsolationForest(contamination=contamination, random_state=42),
        'OneClassSVM': OneClassSVM(nu=nu, kernel='poly', gamma='auto', degree=20),
        'LOF': LocalOutlierFactor(n_neighbors=n_neighbors, contamination=0.4, novelty=True, leaf_size=100, algorithm="auto")
    }
    
    # Train models on healthy data
    for name, model in models.items():
        model.fit(X_scaled)
    
    # Compute anomaly scores for all data
    X_all = df[features]
    X_all_scaled = scaler.transform(X_all)
    
    scores = {}
    predictions = {}
    for name, model in models.items():
        if name == 'LOF':
            # LOF novelty mode returns -1 for inliers, 1 for outliers
            scores[name] = -model.score_samples(X_all_scaled)  # Higher score = anomaly
        else:
            # Isolation Forest and One-Class SVM: Higher score = anomaly
            scores[name] = -model.score_samples(X_all_scaled) if name == 'OneClassSVM' else model.decision_function(X_all_scaled)
        
        # Normalize scores to [0, 1]
        scores[name] = (scores[name] - scores[name].min()) / (scores[name].max() - scores[name].min())
        
        # Determine threshold for individual model
        precision, recall, thresholds = precision_recall_curve(labeled_data['label'], scores[name][labeled_data.index])
        f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)
        optimal_threshold = thresholds[np.argmax(f1_scores)]
        predictions[name] = (scores[name] >= optimal_threshold).astype(int)
    
    # Compute ensemble scores
    ensemble_scores = (
        weights[0] * scores['IsolationForest'] +
        weights[1] * scores['OneClassSVM'] +
        weights[2] * scores['LOF']
    )
    scores['Ensemble'] = ensemble_scores
    
    # Determine threshold for ensemble
    precision, recall, thresholds = precision_recall_curve(labeled_data['label'], ensemble_scores[labeled_data.index])
    f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)
    optimal_threshold = thresholds[np.argmax(f1_scores)]
    predictions['Ensemble'] = (ensemble_scores >= optimal_threshold).astype(int)
    
    return models, scaler, scores, predictions

In [328]:
def evaluate_model(y_true, y_score, y_pred):
    """
    Evaluates the model using PR-AUC, F1-score, and ROC-AUC.
    
    Parameters:
    - y_true: True labels (1 = healthy, 0 = dead)
    - y_score: Anomaly scores
    - y_pred: Binary predictions
    
    Returns:
    - metrics: Dictionary with PR-AUC, F1-score, and ROC-AUC
    """
    precision, recall, _ = precision_recall_curve(y_true, y_score)
    pr_auc = np.mean(precision)  # Approximate PR-AUC
    f1 = f1_score(y_true, y_pred)
    roc_auc = roc_auc_score(y_true, y_score)
    return {'PR-AUC': pr_auc, 'F1-Score': f1, 'ROC-AUC': roc_auc}

def plot_performance_comparison(y_true, scores, predictions, output_dir='plots'):
    """
    Plots ROC curves, precision-recall curves, and bar charts to compare model performance.
    
    Parameters:
    - y_true: True labels
    - scores: Dictionary of anomaly scores for each model
    - predictions: Dictionary of binary predictions for each model
    - output_dir: Directory to save plots
    """
    import os
    os.makedirs(output_dir, exist_ok=True)
    
    # Compute metrics for each model
    metrics = {name: evaluate_model(y_true, scores[name][y_true.index], predictions[name][y_true.index]) 
               for name in scores}
    
    # Plot ROC curves
    plt.figure(figsize=(8, 6))
    for name in scores:
        fpr, tpr, _ = roc_curve(y_true, scores[name][y_true.index])
        plt.plot(fpr, tpr, label=f'{name} (AUC = {metrics[name]["ROC-AUC"]:.4f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curves for Individual Models and Ensemble')
    plt.legend()
    plt.grid(True)
    plt.savefig(f'{output_dir}/roc_curves.png')
    plt.close()
    
    # Plot Precision-Recall curves
    plt.figure(figsize=(8, 6))
    for name in scores:
        precision, recall, _ = precision_recall_curve(y_true, scores[name][y_true.index])
        plt.plot(recall, precision, label=f'{name} (PR-AUC = {metrics[name]["PR-AUC"]:.2f})')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curves for Individual Models and Ensemble')
    plt.legend()
    plt.grid(True)
    plt.savefig(f'{output_dir}/pr_curves.png')
    plt.close()
    
    # Plot bar chart of metrics
    metric_names = ['PR-AUC', 'F1-Score', 'ROC-AUC']
    plt.figure(figsize=(10, 6))
    x = np.arange(len(metric_names))
    width = 0.2
    for i, name in enumerate(scores):
        values = [metrics[name][m] for m in metric_names]
        plt.bar(x + i * width, values, width, label=name)
    plt.xlabel('Metrics')
    plt.ylabel('Score')
    plt.title('Performance Comparison of Individual Models and Ensemble')
    plt.xticks(x + width * 1.5, metric_names)
    plt.legend()
    plt.grid(True, axis='y')
    plt.savefig(f'{output_dir}/metric_comparison.png')
    plt.close()
    
    return metrics

In [329]:
features = ['capacity', 'internal_resistance', 'charge_transfer_resistance', 'type']

models, scaler, scores, predictions = train_semi_supervised_ensemble(
    data, features, contamination=0.1, nu=0.1, n_neighbors=500, weights=[0.39, 0.60, 0.01]
)

labeled_data = data[data['label'].notnull()]
metrics = plot_performance_comparison(labeled_data['label'], scores, predictions)

print("Performance Metrics:")
for name, metric in metrics.items():
    print(f"{name}: {metric}")


Performance Metrics:
IsolationForest: {'PR-AUC': 0.9117893439941487, 'F1-Score': 0.9045893719806763, 'ROC-AUC': 0.8181838915915758}
OneClassSVM: {'PR-AUC': 0.8474066635813566, 'F1-Score': 0.9214677071226963, 'ROC-AUC': 0.7242151148195798}
LOF: {'PR-AUC': 0.501435505033368, 'F1-Score': 0.8609414856612816, 'ROC-AUC': 0.15466110090290777}
Ensemble: {'PR-AUC': 0.9059776775984593, 'F1-Score': 0.9047290300310666, 'ROC-AUC': 0.8210214050203066}
