In [3]:
import scipy.io as sio
import os

# Directory with Oxford data
data_dir = '../data/Oxford/'
file_path = os.path.join(data_dir, 'Oxford_Battery_Degradation_Dataset_1.mat')

if not os.path.exists(file_path):
    raise FileNotFoundError(f"Oxford dataset file not found at {file_path}. Please ensure it’s in {data_dir}.")

# Load the .mat file
mat_data = sio.loadmat(file_path)

# Print all top-level keys to identify the correct one
print("Top-level keys in Oxford_Battery_Degradation_Dataset_1.mat:")
for key in mat_data.keys():
    print(key)

# Optionally, inspect a key's structure (e.g., first key)
if mat_data.keys():
    sample_key = list(mat_data.keys())[0]
    print(f"\nStructure of key '{sample_key}':")
    print(mat_data[sample_key])

Top-level keys in Oxford_Battery_Degradation_Dataset_1.mat:
__header__
__version__
__globals__
Cell1
Cell2
Cell3
Cell4
Cell5
Cell6
Cell7
Cell8

Structure of key '__header__':
b'MATLAB 5.0 MAT-file, Platform: PCWIN64, Created on: Mon Jun 05 11:16:25 2017'


In [15]:
import os
import scipy.io as sio
import numpy as np
import pandas as pd

# Directory with Oxford data (create if needed)
data_dir = '../data/Oxford/'
os.makedirs(data_dir, exist_ok=True)

# Load Oxford_Battery_Degradation_Dataset_1.mat
file_path = os.path.join(data_dir, 'Oxford_Battery_Degradation_Dataset_1.mat')
if not os.path.exists(file_path):
    raise FileNotFoundError(f"Oxford dataset file not found at {file_path}. Please ensure it’s in {data_dir}.")

mat_data = sio.loadmat(file_path)

# Process each cell (1-8, using 'Cell1' to 'Cell8' as top-level keys)
for cell_id in range(1, 9):  # Cells 1 to 8
    cell_key = f'Cell{cell_id}'  # Match the exact key format
    if cell_key not in mat_data.keys():
        print(f"Warning: {cell_key} not found in dataset. Skipping.")
        continue

    cell_data = mat_data[cell_key][0, 0]  # Access cell data directly
    print(f"\nProcessing {cell_key}...")

    # Extract characterization cycles (e.g., cyc0100, cyc0200, etc.)
    cycles = []
    capacities = []  # From 1-C discharge or charge in Ah
    voltages = []
    currents = []
    temps = []
    soh = []  # State of Health (%)

    # Get all cycle keys (e.g., 'cyc0100', 'cyc0200', etc.)
    cycle_keys = [key for key in cell_data.dtype.names if key.startswith('cyc')]
    cycle_keys.sort()  # Sort to ensure chronological order
    print(f"Found cycle keys: {cycle_keys}")

    # Get initial capacity from cyc0000 (1-C discharge)
    initial_capacity = None
    for cycle_key in cycle_keys:
        if cycle_key == 'cyc0000':
            cycle_data = cell_data[cycle_key][0, 0]
            if 'C1dc' in cycle_data.dtype.names:
                discharge = cycle_data['C1dc'][0, 0]
                if 'q' in discharge.dtype.names:
                    initial_capacity_mah = abs(discharge['q'][-1])  # Last value in mAh
                    # Extract scalar from NumPy array
                    initial_capacity_mah_scalar = initial_capacity_mah.item() if hasattr(initial_capacity_mah, 'item') else initial_capacity_mah[0]
                    initial_capacity = initial_capacity_mah_scalar / 1000  # Convert to Ah
                    print(f"Initial capacity for {cell_key}: {initial_capacity:.3f} Ah")
                    break

    if initial_capacity is None:
        print(f"No initial capacity found for {cell_key}, using default 0.740 Ah")
        initial_capacity = 0.740  # Default per README

    for cycle_key in cycle_keys:
        print(f"\nProcessing cycle {cycle_key}...")
        cycle_data = cell_data[cycle_key][0, 0]
        
        # Debug: Print cycle data structure
        print(f"Cycle data keys: {cycle_data.dtype.names}")

        # Try 1-C discharge (C1dc) first, then 1-C charge (C1ch) if needed
        discharge_key = 'C1dc'  # 1-C discharge
        if discharge_key in cycle_data.dtype.names:
            discharge = cycle_data[discharge_key][0, 0]
            print(f"Discharge data keys: {discharge.dtype.names}")
            if 'q' in discharge.dtype.names:  # 'q' is charge in mAh
                capacity_mah = discharge['q'][-1]  # Last value is total capacity (mAh)
                print(f"Raw capacity (mAh): {capacity_mah}, Shape: {capacity_mah.shape if hasattr(capacity_mah, 'shape') else 'N/A'}")
                # Extract scalar, handle empty or malformed arrays, take absolute for discharge
                capacity_mah_scalar = abs(capacity_mah.item()) if hasattr(capacity_mah, 'item') and capacity_mah.size > 0 else \
                                    abs(capacity_mah[0]) if capacity_mah.size > 0 else np.nan
                capacity_ah = capacity_mah_scalar / 1000  # Convert to Ah
                capacities.append(capacity_ah)
                # Calculate SoH (%)
                soh_value = (capacity_ah / initial_capacity) * 100 if not np.isnan(capacity_ah) else np.nan
                soh.append(soh_value)
            else:
                capacities.append(np.nan)  # No capacity data
                soh.append(np.nan)
                print("No 'q' field in discharge data, setting capacity and SoH to NaN")
            # Extract features, handling NumPy arrays
            if 'v' in discharge.dtype.names and discharge['v'].size > 0:
                voltage = discharge['v'][0]
                voltage_scalar = voltage.item() if hasattr(voltage, 'item') else np.mean(voltage) if voltage.size > 0 else np.nan
                voltages.append(voltage_scalar)
            else:
                voltages.append(np.nan)
                print("No valid voltage data in discharge, setting to NaN")
            
            if 'q' in discharge.dtype.names and 't' in discharge.dtype.names:
                charge = discharge['q']
                print(f"Charge shape: {charge.shape if hasattr(charge, 'shape') else 'N/A'}")
                time = discharge['t']
                print(f"Time shape: {time.shape if hasattr(time, 'shape') else 'N/A'}")
                print(f"Time values (first 5): {time[:5] if time.size > 0 else 'N/A'}")  # Debug time values
                if charge.size > 0 and time.size > 1:
                    # Convert MATLAB datenum (days) to seconds for time_diff
                    time_clean = np.unique(time[~np.isnan(time)])  # Remove NaN and duplicates
                    if len(time_clean) > 1:
                        # Time difference in days, convert to seconds (86400 seconds/day)
                        time_diff_days = np.diff(time_clean)[0]
                        time_diff_seconds = time_diff_days * 86400  # Convert days to seconds
                        if time_diff_seconds > 0:  # Ensure positive time difference
                            current_mah = charge / time_diff_seconds  # Current in mA (mAh/s)
                            # Use mean for multi-element array, adjust for units
                            current_scalar = np.mean(current_mah) if current_mah.size > 0 and not np.isnan(current_mah).any() else np.nan
                            current_scalar = current_scalar / 1000  # Convert mA to A
                        else:
                            current_scalar = np.nan
                            print("Time difference is zero or negative after conversion, setting current to NaN")
                    else:
                        current_scalar = np.nan
                        print("Insufficient unique time points after cleaning, setting current to NaN")
                else:
                    current_scalar = np.nan
                    print("Insufficient charge or time data, setting current to NaN")
                currents.append(-current_scalar)  # Keep negative for discharge, convert mA to A
            else:
                currents.append(np.nan)
                print("No valid charge or time data for current calculation, setting to NaN")
            
            if 'T' in discharge.dtype.names and discharge['T'].size > 0:
                temp = discharge['T'][0]
                temp_scalar = temp.item() if hasattr(temp, 'item') else np.mean(temp) if temp.size > 0 else np.nan
                temps.append(temp_scalar)
            else:
                temps.append(np.nan)
                print("No valid temperature data in discharge, setting to NaN")
        else:
            # Fall back to 1-C charge (C1ch) if discharge data is missing
            charge_key = 'C1ch'
            if charge_key in cycle_data.dtype.names:
                charge = cycle_data[charge_key][0, 0]
                print(f"Charge data keys: {charge.dtype.names}")
                if 'q' in charge.dtype.names:
                    capacity_mah = charge['q'][-1]  # Last value is total capacity (mAh)
                    print(f"Raw capacity (mAh): {capacity_mah}, Shape: {capacity_mah.shape if hasattr(capacity_mah, 'shape') else 'N/A'}")
                    capacity_mah_scalar = abs(capacity_mah.item()) if hasattr(capacity_mah, 'item') and capacity_mah.size > 0 else \
                                        abs(capacity_mah[0]) if capacity_mah.size > 0 else np.nan
                    capacity_ah = capacity_mah_scalar / 1000  # Convert to Ah
                    capacities.append(capacity_ah)
                    # Calculate SoH (%)
                    soh_value = (capacity_ah / initial_capacity) * 100 if not np.isnan(capacity_ah) else np.nan
                    soh.append(soh_value)
                else:
                    capacities.append(np.nan)
                    soh.append(np.nan)
                    print("No 'q' field in charge data, setting capacity and SoH to NaN")
                if 'v' in charge.dtype.names and charge['v'].size > 0:
                    voltage = charge['v'][0]
                    voltage_scalar = voltage.item() if hasattr(voltage, 'item') else np.mean(voltage) if voltage.size > 0 else np.nan
                    voltages.append(voltage_scalar)
                else:
                    voltages.append(np.nan)
                    print("No valid voltage data in charge, setting to NaN")
                
                if 'q' in charge.dtype.names and 't' in charge.dtype.names:
                    charge = charge['q']
                    print(f"Charge shape: {charge.shape if hasattr(charge, 'shape') else 'N/A'}")
                    time = charge['t']
                    print(f"Time shape: {time.shape if hasattr(time, 'shape') else 'N/A'}")
                    print(f"Time values (first 5): {time[:5] if time.size > 0 else 'N/A'}")  # Debug time values
                    if charge.size > 0 and time.size > 1:
                        # Convert MATLAB datenum (days) to seconds for time_diff
                        time_clean = np.unique(time[~np.isnan(time)])  # Remove NaN and duplicates
                        if len(time_clean) > 1:
                            # Time difference in days, convert to seconds (86400 seconds/day)
                            time_diff_days = np.diff(time_clean)[0]
                            time_diff_seconds = time_diff_days * 86400  # Convert days to seconds
                            if time_diff_seconds > 0:  # Ensure positive time difference
                                current_mah = charge / time_diff_seconds  # Current in mA (mAh/s)
                                # Use mean for multi-element array, adjust for units
                                current_scalar = np.mean(current_mah) if current_mah.size > 0 and not np.isnan(current_mah).any() else np.nan
                                current_scalar = current_scalar / 1000  # Convert mA to A
                            else:
                                current_scalar = np.nan
                                print("Time difference is zero or negative after conversion, setting current to NaN")
                        else:
                            current_scalar = np.nan
                            print("Insufficient unique time points after cleaning, setting current to NaN")
                    else:
                        current_scalar = np.nan
                        print("Insufficient charge or time data, setting current to NaN")
                    currents.append(current_scalar)  # Keep positive for charge, convert mA to A
                else:
                    currents.append(np.nan)
                    print("No valid charge or time data for current calculation in charge, setting to NaN")
                
                if 'T' in charge.dtype.names and charge['T'].size > 0:
                    temp = charge['T'][0]
                    temp_scalar = temp.item() if hasattr(temp, 'item') else np.mean(temp) if temp.size > 0 else np.nan
                    temps.append(temp_scalar)
                else:
                    temps.append(np.nan)
                    print("No valid temperature data in charge, setting to NaN")
            else:
                capacities.append(np.nan)
                soh.append(np.nan)
                voltages.append(np.nan)
                currents.append(np.nan)
                temps.append(np.nan)
                print(f"Warning: No charge or discharge data for {cycle_key} in {cell_key}")

        # Cycle number (e.g., 100, 200, etc.) from cycle_key (e.g., 'cyc0100' → 100)
        cycle_num = (int(cycle_key.replace('cyc', '')) / 100) + 1  # Convert to 1-based cycle number (e.g., 100 → 2, 200 → 3)
        cycles.append(cycle_num)

        # Debug: Check lengths after each cycle
        print(f"Current lengths - Cycles: {len(cycles)}, Capacities: {len(capacities)}, "
              f"Voltages: {len(voltages)}, Currents: {len(currents)}, Temps: {len(temps)}, SoH: {len(soh)}")

    # Verify all lists have the same length before creating DataFrame
    lengths = [len(cycles), len(capacities), len(voltages), len(currents), len(temps), len(soh)]
    if len(set(lengths)) > 1:
        print(f"Warning: Mismatched lengths - {dict(zip(['cycles', 'capacities', 'voltages', 'currents', 'temps', 'soh'], lengths))}")
        # Find and pad or truncate to match the longest list with NaN
        max_length = max(lengths)
        for lst, name in zip([cycles, capacities, voltages, currents, temps, soh], 
                            ['cycles', 'capacities', 'voltages', 'currents', 'temps', 'soh']):
            if len(lst) < max_length:
                print(f"Padding {name} with NaN to match length {max_length}")
                lst.extend([np.nan] * (max_length - len(lst)))

    # Calculate RUL (using 80% SoH threshold, 0.592 Ah for 0.740 Ah initial)
    failure_threshold = 0.592  # 80% of 0.740 Ah (adjust to 0.518 Ah for 70% if needed)
    failure_cycle = -1
    valid_capacities = [c for c in capacities if not np.isnan(c)]  # Ignore NaN
    valid_cycles = [c for c, cap in zip(cycles, capacities) if not np.isnan(cap)]

    if valid_capacities:
        # Extract scalar values for comparison, handle negative values
        valid_capacities_scalar = [abs(c.item()) if hasattr(c, 'item') and not np.isnan(c) else abs(c) for c in valid_capacities]
        for i, cap in enumerate(valid_capacities_scalar):
            if cap <= failure_threshold:
                failure_cycle = valid_cycles[i]
                break

        if failure_cycle == -1:
            print(f"{cell_key} didn't reach {failure_threshold} Ah threshold in {len(valid_cycles)} cycles. Using last cycle as proxy.")
            failure_cycle = valid_cycles[-1]
    else:
        print(f"No valid capacity data for {cell_key}. Skipping RUL calculation.")
        failure_cycle = cycles[-1] if cycles else -1

    ruls = [max(0.0, failure_cycle - c) if not np.isnan(cap) else np.nan for c, cap in zip(cycles, capacities)]  # Cap RUL at 0.0

    # Create DataFrame with cycle data
    df_processed = pd.DataFrame({
        'Cycle': cycles,  # Characterization cycle number (e.g., 2, 3, 4 for cyc0100, cyc0200, etc.)
        'Voltage (V)': [v.item() if hasattr(v, 'item') and not np.isnan(v) else v for v in voltages],
        'Current (A)': [c.item() if hasattr(c, 'item') and not np.isnan(c) else c for c in currents],
        'Temperature (°C)': [t.item() if hasattr(t, 'item') and not np.isnan(t) else t for t in temps],
        'Capacity (Ah)': capacities,
        'SoH (%)': soh,
        'RUL (Cycles)': ruls
    })

    # Handle NaN values (e.g., interpolate or drop if needed)
    df_processed = df_processed.interpolate(method='linear').bfill().ffill()

    # Print stats with scalar values
    print(f"\n{cell_key} Stats (Threshold {failure_threshold} Ah, 80% SoH):")
    print(f"Loaded {len(cycles)} characterization cycles.")
    if valid_capacities:
        print(f"Initial capacity: {valid_capacities_scalar[0]:.3f} Ah")
        print(f"Final capacity: {valid_capacities_scalar[-1]:.3f} Ah")
        print(f"Initial SoH: {soh[0]:.1f}%")
        print(f"Final SoH: {soh[-1]:.1f}%")
    else:
        print("No valid capacity data available.")
    if failure_cycle != -1:
        failure_cap = next(cap for cap, cyc in zip(valid_capacities_scalar, valid_cycles) if cyc == failure_cycle)
        failure_soh = next(s for s, c in zip(soh, cycles) if c == failure_cycle and not np.isnan(s))
        print(f"Failure cycle: {failure_cycle} (capacity ~{failure_cap:.3f} Ah, SoH ~{failure_soh:.1f}%)")
    print(f"RUL at cycle 1: {ruls[0] if not np.isnan(ruls[0]) else 'N/A'} cycles")

    # Save DataFrame to CSV
    output_file = f'../data/Oxford/preprocessed/cell{cell_id}_processed.csv'
    df_processed.to_csv(output_file, index=False)
    print(f"Data saved to {output_file}\n")

print("All Oxford cells processed.")


Processing Cell1...
Found cycle keys: ['cyc0000', 'cyc0100', 'cyc0200', 'cyc0300', 'cyc0400', 'cyc0500', 'cyc0600', 'cyc0700', 'cyc0800', 'cyc0900', 'cyc1000', 'cyc1100', 'cyc1200', 'cyc1300', 'cyc1400', 'cyc1600', 'cyc1800', 'cyc1900', 'cyc2000', 'cyc2100', 'cyc2200', 'cyc2300', 'cyc2400', 'cyc2500', 'cyc2600', 'cyc2700', 'cyc2800', 'cyc2900', 'cyc3000', 'cyc3100', 'cyc3200', 'cyc3300', 'cyc3500', 'cyc3600', 'cyc3700', 'cyc3800', 'cyc3900', 'cyc4000', 'cyc4100', 'cyc4200', 'cyc4300', 'cyc4400', 'cyc4500', 'cyc4600', 'cyc4800', 'cyc5000', 'cyc5100', 'cyc5200', 'cyc5300', 'cyc5400', 'cyc5500', 'cyc5600', 'cyc5700', 'cyc5800', 'cyc5900', 'cyc6000', 'cyc6100', 'cyc6200', 'cyc6300', 'cyc6400', 'cyc6500', 'cyc6600', 'cyc6700', 'cyc6800', 'cyc6900', 'cyc7000', 'cyc7100', 'cyc7200', 'cyc7300', 'cyc7400', 'cyc7500', 'cyc7600', 'cyc7700', 'cyc7800', 'cyc7900', 'cyc8000', 'cyc8100', 'cyc8200']
Initial capacity for Cell1: 0.739 Ah

Processing cycle cyc0000...
Cycle data keys: ('C1ch', 'C1dc', 'O