In [None]:
import pandas as pd
import glob

# Get all CSV files in /content/ that match the pattern B00X.csv
csv_files = sorted(glob.glob('/content/B00*.csv'))

# Load all CSVs into a list of DataFrames
dfs = [pd.read_csv(file) for file in csv_files]

# Combine all data into one DataFrame
combined_df = pd.concat(dfs, ignore_index=True)

# Save the processed dataset
output_path = "/content/rul_dataset.csv"
combined_df.to_csv(output_path, index=False)

# Display final dataset info
print("Combined dataset saved at:", output_path)
print("Dataset shape:", combined_df.shape)
print("Columns:", combined_df.columns)


✅ Combined dataset saved at: /content/rul_dataset.csv
📊 Dataset shape: (185721, 10)
📝 Columns: Index(['cycle', 'ambient_temperature', 'capacity', 'voltage_measured',
       'current_measured', 'temperature_measured', 'current_load',
       'voltage_load', 'time', 'RUL'],
      dtype='object')


In [None]:
import pandas as pd
import glob

# Get all CSV files in /content/ that match the pattern B00X.csv
csv_files = sorted(glob.glob('/content/551*.csv'))

# Load all CSVs into a list of DataFrames
dfs = [pd.read_csv(file) for file in csv_files]

# Combine all data into one DataFrame
combined_df = pd.concat(dfs, ignore_index=True)

# Save the processed dataset
output_path = "/content/soc_dataset.csv"
combined_df.to_csv(output_path, index=False)

# Display final dataset info
print("Combined dataset saved at:", output_path)
print("Dataset shape:", combined_df.shape)
print("Columns:", combined_df.columns)


✅ Combined dataset saved at: /content/soc_dataset.csv
📊 Dataset shape: (1524, 6)
📝 Columns: Index(['Voltage', 'Current', 'Temperature', 'Capacity', 'WhAccu', 'Cnt'], dtype='object')


In [None]:
import pandas as pd

# Load SOC dataset
soc_df = pd.read_csv('/content/soc_dataset.csv')

# Define rated capacity (modify as per actual battery capacity)
C_rated = soc_df['Capacity'].max()  # Assuming max recorded capacity as full charge

# Ensure 'Current' column exists
if 'Current' not in soc_df.columns:
    raise ValueError("Column 'Current' is missing in dataset!")

# Compute cumulative charge (Coulombs = Ah converted to percentage SOC)
soc_df['Cumulative_Discharge'] = (soc_df['Current'].cumsum()) / 3600  # Convert to Ah
soc_df['SOC'] = 100 * (1 - soc_df['Cumulative_Discharge'] / C_rated)

# Clip SOC between 0-100% for realistic values
soc_df['SOC'] = soc_df['SOC'].clip(0, 100)

# Save updated dataset
output_path = "/content/soc_dataset.csv"
soc_df.to_csv(output_path, index=False)

# Display dataset info
print("SOC column added and dataset saved at:", output_path)
print("Dataset shape:", soc_df.shape)
print("Columns:", soc_df.columns)


✅ SOC column added and dataset saved at: /content/soc_dataset.csv
📊 Dataset shape: (1524, 8)
📝 Columns: Index(['Voltage', 'Current', 'Temperature', 'Capacity', 'WhAccu', 'Cnt',
       'Cumulative_Discharge', 'SOC'],
      dtype='object')


In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import xgboost as xgb
import joblib

# Load dataset with computed SOC
df = pd.read_csv('/content/soc_dataset.csv')

# Drop any rows with missing values (optional, based on data quality)
df = df.dropna()

# Define target and features
target = 'SOC'
features = df.columns.drop(['SOC'])  # Use all other columns as features

# Split data into features (X) and target (y)
X = df[features]
y = df[target]

# Split into train and test sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train XGBoost Regressor
model = xgb.XGBRegressor(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=6,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=42
)

model.fit(X_train, y_train)

# Predict on test data
y_pred = model.predict(X_test)

# Evaluation
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
r2 = r2_score(y_test, y_pred)

print("Model Performance:")
print(f"MAE: {mae:.2f}")
print(f"RMSE: {rmse:.2f}")
print(f"R² Score: {r2:.2f}")

# Save the model
model_path = "/content/xgboost_soc_model.pkl"
joblib.dump(model, model_path)
print("Model saved to:", model_path)

# Optional: Show feature importance
import matplotlib.pyplot as plt

xgb.plot_importance(model)
plt.title("Feature Importance")
plt.show()


In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd  # Import pandas
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# Load the dataset
data_path = '/content/soh_dataset.csv'
df = pd.read_csv(data_path)

# Define input and output variables
input_vars = ['terminal_voltage', 'terminal_current', 'temperature', 'charge_current', 'charge_voltage', 'time', 'capacity', 'cycle']
output_var = 'SOH'

# Split dataset into training and testing sets
train, test = train_test_split(df, test_size=0.2, random_state=42)  # Use df, not file path
train_input = train[input_vars]
train_output = train[output_var]
test_input = test[input_vars]
test_output = test[output_var]

# Define the Kalman filter model
class KalmanFilter(nn.Module):
    def __init__(self, dim):
        super(KalmanFilter, self).__init__()
        self.dim = dim
        self.F = nn.Parameter(torch.eye(dim))
        self.H = nn.Parameter(torch.eye(dim))
        self.Q = nn.Parameter(torch.eye(dim))
        self.R = nn.Parameter(torch.eye(dim))
        self.x_prior = nn.Parameter(torch.zeros(dim))
        self.P_prior = nn.Parameter(torch.eye(dim))

    def forward(self, z):
      # Prediction step
      x_prior = torch.matmul(self.F, self.x_prior.data)
      P_prior = torch.matmul(torch.matmul(self.F, self.P_prior), self.F.transpose(0, 1)) + self.Q

      # Update step
      HPHtR = torch.matmul(self.H, torch.matmul(P_prior, self.H.transpose(0, 1))) + self.R + 1e-8 * torch.eye(self.dim)
      K = torch.matmul(torch.matmul(P_prior, self.H.transpose(0, 1)), torch.inverse(HPHtR))
      x_posterior = x_prior + torch.matmul(K, z - torch.matmul(self.H, x_prior))
      P_posterior = torch.matmul(torch.eye(self.dim) - torch.matmul(K, self.H), P_prior)

      self.x_prior.data = x_posterior
      self.P_prior.data = P_posterior

      return x_posterior[0]  # Return only the first element


# Initialize the model and optimizer
model = KalmanFilter(dim=len(input_vars))
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 3
for epoch in range(num_epochs):
    train_loss = 0.0
    for i in range(len(train_input)):
        input_tensor = torch.tensor(train_input.values[i], dtype=torch.float32)
        output_tensor = torch.tensor(train_output.values[i], dtype=torch.float32)

        output = model(input_tensor)
        loss = criterion(output, output_tensor)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        train_loss += loss.item()

    rmse_train = np.sqrt(train_loss / len(train_input))
    print('Epoch: {} \tTraining RMSE: {:.6f}'.format(epoch+1, rmse_train))

# Testing loop
test_preds = []
test_actuals = []

for i in range(len(test_input)):
    input_tensor = torch.tensor(test_input.values[i], dtype=torch.float32)
    output_tensor = torch.tensor(test_output.values[i], dtype=torch.float32)

    output = model(input_tensor)
    test_preds.append(output.item())
    test_actuals.append(output_tensor.item())

# Compute RMSE
rmse_test = np.sqrt(mean_squared_error(test_actuals, test_preds))
print('Test RMSE: {:.6f}'.format(rmse_test))

# Save the model
torch.save(model.state_dict(), 'model1.pth')
print("Model saved as model1.pth")



Epoch: 1 	Training RMSE: 0.150071
Epoch: 2 	Training RMSE: 0.009422
Epoch: 3 	Training RMSE: 0.005999
Test RMSE: 0.000563
Model saved as model1.pth


In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
from sklearn.metrics import mean_squared_error, r2_score

# Load the dataset
data_path = '/content/rul_dataset.csv'  # Path to your dataset
df = pd.read_csv(data_path)

# Display dataset overview
print("Dataset overview:")
print(df.head())
df = df.head(1000)

# 1. Extract features and target
# Select relevant features for predicting RUL
X = df[['ambient_temperature', 'capacity', 'voltage_measured',
        'current_measured', 'temperature_measured',
        'current_load', 'voltage_load', 'time']].values
y = df['RUL'].values

# 2. Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 3. Normalize the features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# 4. Define the GPR kernel and model
kernel = C(1.0, (1e-4, 1e4)) * RBF(length_scale=1.0, length_scale_bounds=(1e-2, 1e3))

# Define optimizer options explicitly
optimizer_options = {'maxiter': 1000}  # Set the max iterations explicitly

# Instantiate the GPR model
gpr = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, alpha=1e-2, optimizer="fmin_l_bfgs_b")

# 5. Train the GPR model
print("Training the GPR model...")
gpr.fit(X_train, y_train)

# Display the fitted kernel parameters
print("Kernel parameters after training:")
print(gpr.kernel_)

# 6. Predict on the test set
print("Predicting RUL on the test set...")
y_pred, sigma = gpr.predict(X_test, return_std=True)

# 7. Evaluate the model
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
r2 = r2_score(y_test, y_pred)
print(f"Model Evaluation:\nRMSE: {rmse:.2f}\nR² Score: {r2:.2f}")

# 8. Predict RUL on new data
new_data = np.array([[24, 1.8, 4.2, -0.005, 24.3, -0.001, 0.0, 100]])  # Example new data
new_data_scaled = scaler.transform(new_data)
rul_pred, uncertainty = gpr.predict(new_data_scaled, return_std=True)
print(f"Predicted RUL: {rul_pred[0]:.2f}, Uncertainty: {uncertainty[0]:.2f}")

import joblib

# Save the trained model
joblib.dump(gpr, 'model2_gpr.pkl')

# Save the scaler
joblib.dump(scaler, 'model2_scaler.pkl')

print("Model and scaler saved as 'model2_gpr.pkl' and 'model2_scaler.pkl'")


Dataset overview:
   cycle  ambient_temperature  capacity  voltage_measured  current_measured  \
0      1                   24  1.856487          4.191492         -0.004902   
1      1                   24  1.856487          4.190749         -0.001478   
2      1                   24  1.856487          3.974871         -2.012528   
3      1                   24  1.856487          3.951717         -2.013979   
4      1                   24  1.856487          3.934352         -2.011144   

   temperature_measured  current_load  voltage_load    time  RUL  
0             24.330034       -0.0006         0.000   0.000  167  
1             24.325993       -0.0006         4.206  16.781  167  
2             24.389085       -1.9982         3.062  35.703  167  
3             24.544752       -1.9982         3.030  53.781  167  
4             24.731385       -1.9982         3.011  71.922  167  
Training the GPR model...


ABNORMAL_TERMINATION_IN_LNSRCH.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  _check_optimize_result("lbfgs", opt_res)


Kernel parameters after training:
84.3**2 * RBF(length_scale=1.75)
Predicting RUL on the test set...
Model Evaluation:
RMSE: 1.27
R² Score: 0.24
Predicted RUL: 31.16, Uncertainty: 81.45
Model and scaler saved as 'model2_gpr.pkl' and 'model2_scaler.pkl'


In [None]:
!ls


551_Charge1.csv  551_Charge8.csv      rul_dataset.csv
551_Charge2.csv  B0005_discharge.csv  sample_data
551_Charge3.csv  B0006_discharge.csv  soc_dataset.csv
551_Charge4.csv  B0007_discharge.csv  soc_dataset_with_SOC.csv
551_Charge5.csv  B0018_discharge.csv  soh_dataset.csv
551_Charge6.csv  model2_gpr.pkl
551_Charge7.csv  model2_scaler.pkl


In [None]:
!pip uninstall -y torch
!pip install torch --upgrade --quiet

Found existing installation: torch 2.6.0+cu124
Uninstalling torch-2.6.0+cu124:
  Successfully uninstalled torch-2.6.0+cu124
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m766.7/766.7 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m57.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m47.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m39.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m854.7 kB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import joblib
from sklearn.preprocessing import StandardScaler

# ========== Step 1: Load the datasets ========== #
df1 = pd.read_csv('/content/soh_dataset.csv')
df2 = pd.read_csv('/content/soc_dataset.csv')  # Replace this path
df3 = pd.read_csv('/content/rul_dataset.csv')

# ========== Step 2: Rename for consistency ========== #
df1 = df1.rename(columns={
    'terminal_voltage': 'voltage',
    'terminal_current': 'current',
    'temperature': 'temperature',
    'charge_current': 'charge_current',
    'charge_voltage': 'charge_voltage',
    'time': 'time',
    'capacity': 'capacity',
    'cycle': 'cycle'
})
df2 = df2.rename(columns={
    'Voltage': 'voltage',
    'Current': 'current',
    'Temperature': 'temperature',
    'WhAccu': 'whaccu',
    'Cnt': 'cnt',
    'Cumulative_Discharge': 'cumulative_discharge',
    'Capacity': 'capacity',
    'Time': 'time'
})
df3 = df3.rename(columns={
    'ambient_temperature': 'temperature',
    'voltage_measured': 'voltage',
    'current_measured': 'current',
    'temperature_measured': 'temp_measured',
    'current_load': 'charge_current',
    'voltage_load': 'charge_voltage',
    'time': 'time',
    'capacity': 'capacity',
    'cycle': 'cycle'
})

# Drop target columns
df1.drop(columns=['SOH'], inplace=True, errors='ignore')
df2.drop(columns=['SOC'], inplace=True, errors='ignore')
df3.drop(columns=['RUL'], inplace=True, errors='ignore')

# ========== Step 3: Natural Join ========== #
common_cols = list(set(df1.columns) & set(df2.columns) & set(df3.columns))
merged = df1.merge(df2, on=common_cols, how='inner')
merged = merged.merge(df3, on=common_cols, how='inner')

# ========== Step 4: Cross Join remaining columns ========== #
def cross_join(df_left, df_right):
    df_left['_tmp'] = 1
    df_right['_tmp'] = 1
    result = pd.merge(df_left, df_right, on='_tmp').drop('_tmp', axis=1)
    return result

left1 = df1.drop(columns=common_cols, errors='ignore')
left2 = df2.drop(columns=common_cols, errors='ignore')
left3 = df3.drop(columns=common_cols, errors='ignore')

cross = cross_join(left1, left2)
cross = cross_join(cross, left3)

# Final combined data
combined = pd.concat([merged.reset_index(drop=True), cross.reset_index(drop=True)], axis=1)

# ========== Step 5: Predict SOH using Kalman Filter (Model 1) ========== #
class KalmanFilter(nn.Module):
    def __init__(self, dim):
        super(KalmanFilter, self).__init__()
        self.dim = dim
        self.F = nn.Parameter(torch.eye(dim))
        self.H = nn.Parameter(torch.eye(dim))
        self.Q = nn.Parameter(torch.eye(dim))
        self.R = nn.Parameter(torch.eye(dim))
        self.x_prior = nn.Parameter(torch.zeros(dim))
        self.P_prior = nn.Parameter(torch.eye(dim))

    def forward(self, z):
        x_prior = torch.matmul(self.F, self.x_prior.data)
        P_prior = torch.matmul(torch.matmul(self.F, self.P_prior), self.F.transpose(0, 1)) + self.Q
        HPHtR = torch.matmul(self.H, torch.matmul(P_prior, self.H.transpose(0, 1))) + self.R + 1e-8 * torch.eye(self.dim)
        K = torch.matmul(torch.matmul(P_prior, self.H.transpose(0, 1)), torch.inverse(HPHtR))
        x_posterior = x_prior + torch.matmul(K, z - torch.matmul(self.H, x_prior))
        P_posterior = torch.matmul(torch.eye(self.dim) - torch.matmul(K, self.H), P_prior)
        self.x_prior.data = x_posterior
        self.P_prior.data = P_posterior
        return x_posterior[0]

# Load model
soh_input_cols = ['voltage', 'current', 'temperature', 'charge_current', 'charge_voltage', 'time', 'capacity', 'cycle']
kf_model = KalmanFilter(dim=len(soh_input_cols))
kf_model.load_state_dict(torch.load('model1.pth'))
kf_model.eval()

# Predict
soh_preds = []
for _, row in combined[soh_input_cols].dropna().iterrows():
    input_tensor = torch.tensor(row.values, dtype=torch.float32)
    with torch.no_grad():
        pred = kf_model(input_tensor)
        soh_preds.append(pred.item())

# Add predictions
combined = combined.iloc[:len(soh_preds)].copy()
combined['SOH_pred'] = soh_preds

# ========== Step 6: Predict RUL using GPR (Model 2) ========== #
gpr = joblib.load('model2_gpr.pkl')
scaler = joblib.load('model2_scaler.pkl')

rul_input_cols = ['temperature', 'capacity', 'voltage', 'current', 'charge_current', 'charge_voltage', 'time']
X_rul = combined[rul_input_cols].fillna(0)
X_scaled = scaler.transform(X_rul)
rul_preds = gpr.predict(X_scaled)
combined['RUL_pred'] = rul_preds

# ========== Step 7: Output ========== #
print(combined[['SOH_pred', 'RUL_pred']].head())
