In [1]:
import os
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
import scipy.io as sio
from sklearn.preprocessing import StandardScaler

In [2]:
def resample(signal, target_len=100):
    """Resample 1D array to fixed length"""
    x_old = np.linspace(0, 1, len(signal))
    x_new = np.linspace(0, 1, target_len)
    return np.interp(x_new, x_old, signal)

In [3]:
def compute_soh(capacity, initial_capacity):
    """Compute SOH (%) from capacity"""
    return (capacity / initial_capacity) * 100

In [4]:
def extract_HIs(V, T):
    """Optional health indicators"""
    return [
        np.mean(V), np.max(V), np.min(V), np.std(V),
        np.mean(T), np.max(T), np.std(T)
    ]

In [5]:
# ------------------------------
# 1️⃣ NASA DATASET
# ------------------------------

In [6]:
def preprocess_nasa_folder(nasa_folder, seq_len=100):
    """
    Reads all CSV files under NASA folder and returns X, y arrays
    """
    X, y = [], []

    data_folder = os.path.join(nasa_folder, 'data')
    for csv_file in os.listdir(data_folder):
        if csv_file.endswith('.csv'):
            df = pd.read_csv(os.path.join(data_folder, csv_file))
            # Check for required columns
            if not all(col in df.columns for col in ['Voltage_measured','Current_measured','Temperature_measured','Capacity','Cycle']):
                continue

            initial_capacity = df[df['Cycle']==1]['Capacity'].iloc[0]

            for cycle_id in df['Cycle'].unique():
                cycle = df[df['Cycle']==cycle_id]
                V = resample(savgol_filter(cycle['Voltage_measured'].values,11,3), seq_len)
                I = resample(cycle['Current_measured'].values, seq_len)
                T = resample(cycle['Temperature_measured'].values, seq_len)
                time = np.linspace(0,1,seq_len)

                features = np.concatenate([V, I, T, time])
                soh = compute_soh(cycle['Capacity'].iloc[0], initial_capacity)

                X.append(features)
                y.append(soh)
    return np.array(X), np.array(y)

In [7]:
# ------------------------------
# 2️⃣ CALCE DATASET (CS2 + CX2)
# ------------------------------

In [8]:
def preprocess_calce_folder(calce_folder, seq_len=100):
    """
    Reads all CSVs in CALCE folder and returns X, y arrays
    Capacity is derived via Coulomb counting
    """
    X, y = [], []

    for csv_file in os.listdir(calce_folder):
        if csv_file.endswith('.csv'):
            df = pd.read_csv(os.path.join(calce_folder, csv_file)).dropna()
            # Ensure required columns
            if not all(col in df.columns for col in ['Voltage','Current','Temperature','Time','Cycle_Index']):
                continue

            cycles = df['Cycle_Index'].unique()

            # derive initial capacity from first cycle
            first_cycle = df[df['Cycle_Index']==cycles[0]]
            C0 = np.trapz(np.abs(first_cycle['Current'].values), first_cycle['Time'].values)/3600

            for cid in cycles:
                cycle = df[df['Cycle_Index']==cid]

                V = resample(savgol_filter(cycle['Voltage'].values,11,3), seq_len)
                I = resample(cycle['Current'].values, seq_len)
                T = resample(cycle['Temperature'].values, seq_len)
                time = np.linspace(0,1,seq_len)

                capacity = np.trapz(np.abs(cycle['Current'].values), cycle['Time'].values)/3600
                soh = compute_soh(capacity, C0)

                features = np.concatenate([V, I, T, time])
                X.append(features)
                y.append(soh)
    return np.array(X), np.array(y)

In [9]:
# ------------------------------
# 3️⃣ OXFORD DATASET (.MAT)
# ------------------------------

In [10]:
def preprocess_oxford(mat_path, seq_len=100):
    """
    Load Oxford .mat dataset
    """
    data = sio.loadmat(mat_path)
    # Expected variable names: Voltage, Current, Temperature, Capacity
    V_all = data['Voltage']
    I_all = data['Current']
    T_all = data['Temperature']
    capacity = data['Capacity'].flatten()

    X, y = [], []
    C0 = capacity[0]

    for i in range(len(capacity)):
        V = resample(V_all[i].flatten(), seq_len)
        I = resample(I_all[i].flatten(), seq_len)
        T = resample(T_all[i].flatten(), seq_len)
        time = np.linspace(0,1,seq_len)

        features = np.concatenate([V, I, T, time])
        soh = compute_soh(capacity[i], C0)

        X.append(features)
        y.append(soh)

    return np.array(X), np.array(y)

In [11]:
# ------------------------------
# 4️⃣ Merge All Datasets
# ------------------------------

In [12]:
def create_unified_dataset(nasa_path, calce_path, oxford_path, seq_len=100):
    X_nasa, y_nasa = preprocess_nasa_folder(nasa_path, seq_len)
    X_calce, y_calce = preprocess_calce_folder(calce_path, seq_len)
    X_ox, y_ox = preprocess_oxford(oxford_path, seq_len)

    X_all = np.vstack([X_nasa, X_calce, X_ox])
    y_all = np.concatenate([y_nasa, y_calce, y_ox])

    # Normalize features
    scaler = StandardScaler()
    X_all = scaler.fit_transform(X_all)

    return X_all, y_all