<a href="https://colab.research.google.com/github/CVedaReddy/project/blob/main/Digital%20twin%20project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
df=df[df.Battery=='B0005']
df

Unnamed: 0,Voltage_measured,Current_measured,Temperature_measured,Current_charge,Voltage_charge,Time,Capacity,id_cycle,type,ambient_temperature,time,Battery
0,3.974871,-2.012528,24.389085,1.9982,3.062,35.703,1.856487,1,discharge,24,2008.0,B0005
1,3.951717,-2.013979,24.544752,1.9982,3.030,53.781,1.856487,1,discharge,24,2008.0,B0005
2,3.934352,-2.011144,24.731385,1.9982,3.011,71.922,1.856487,1,discharge,24,2008.0,B0005
3,3.920058,-2.013007,24.909816,1.9982,2.991,90.094,1.856487,1,discharge,24,2008.0,B0005
4,3.907904,-2.014400,25.105884,1.9982,2.977,108.281,1.856487,1,discharge,24,2008.0,B0005
...,...,...,...,...,...,...,...,...,...,...,...,...
45117,2.855064,-2.012702,40.404733,1.9982,1.910,2345.875,1.325079,168,discharge,24,2008.0,B0005
45118,2.818475,-2.013183,40.406072,1.9982,1.878,2355.406,1.325079,168,discharge,24,2008.0,B0005
45119,2.774912,-2.011141,40.486980,1.9982,1.840,2364.875,1.325079,168,discharge,24,2008.0,B0005
45120,2.721142,-2.014372,40.695235,1.9982,1.794,2374.468,1.325079,168,discharge,24,2008.0,B0005


In [7]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, GRU, Input
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.optimizers import Adam, SGD
from sklearn.model_selection import train_test_split
from tensorflow.keras import Model

# --- I. Data Preprocessing and Feature Engineering Pipeline ---
# 1. Load Data
    try:
        # Load the discharge data (filepath is assumed to be 'discharge.csv')
        df = pd.read_csv("/content/discharge.csv")
    except FileNotFoundError:
        print(f"Error: File not found at {"/content/discharge.csv"}")
        return None, None, None, None, None


df=df[Battery='B0005']
# Define the feature columns (X) and the target column (Y) based on the paper's requirements
FEATURES =df[["Current_measured",'Temperature_measured','Voltage_charge','Time','id_cycle']]
TARGETS = df['Voltage_measured'] # Voltage is used as the primary signal for SOC prediction

# Model hyperparameters for the sequence:
WINDOW_LENGTH = 250 # Fixed window length for time-series sequence prediction
TEST_SPLIT_RATIO = 0.2

def load_and_prepare_data(filepath, feature_cols, target_col, window_length=WINDOW_LENGTH):
    """Loads CSV data, scales features, and creates time-series sequences (windowing)."""


    # Create the matrix X containing all relevant time-series data for the models
    X_df=FEATURES
    # Drop rows with NaN or infinite values that might break the scaler
    X_df.replace([np.inf, -np.inf], np.nan, inplace=True)
    X_df.dropna(inplace=True)

    # Re-extract raw data after cleaning
    X_raw = X_df.values
    Y_raw = TARGETS.values.reshape(-1, 1)

    # 3. Normalization (Min-Max Scaling to 0-1 range)
    scaler_X = MinMaxScaler(feature_range=(0, 1))
    X_scaled = scaler_X.fit_transform(X_raw)

    scaler_Y = MinMaxScaler(feature_range=(0, 1))
    Y_scaled = scaler_Y.fit_transform(Y_raw)

    # 4. Sequence Transformation (Windowing: Samples x Time Steps x Features)
    def create_sequences(data, target, seq_length):
        X, Y =,
        # Sequences are created up to the point where a full sequence + 1 target step can be extracted
        for i in range(len(data) - seq_length):
            X.append(data[i:(i + seq_length), :])
            # Target is the value immediately following the end of the input sequence
            Y.append(target[i + seq_length])
        return np.array(X), np.array(Y)

    X_sequences, Y_sequences = create_sequences(X_scaled, Y_scaled, window_length)

    # 5. Train/Test Split (Maintain chronological order for time series split)
    X_train, X_test, Y_train, Y_test = train_test_split(
        X_sequences, Y_sequences,
        test_size=TEST_SPLIT_RATIO,
        random_state=42, # Ensures reproducibility [2]
        shuffle=False # Essential for time series data
    )

    return X_train, X_test, Y_train, Y_test, scaler_Y

# --- II. Offline LSTM Modeling for SOC Prediction ---

def build_offline_lstm_model(input_shape):
    """Builds the 4-layer LSTM architecture with L1/L2 regularization."""
    model = Sequential()

    # LSTM Layers (4 hidden layers required)
    l1_l2_reg = l1_l2(l1=0.01, l2=0.01) # L1/L2 regularization to mitigate overfitting

    # Layer 1: Input Shape is (Window Length, Num Features)
    model.add(LSTM(256, activation='tanh', return_sequences=True,
                   input_shape=input_shape, kernel_regularizer=l1_l2_reg))

    # Layer 2
    model.add(LSTM(128, activation='tanh', return_sequences=True,
                   kernel_regularizer=l1_l2_reg))

    # Layer 3
    model.add(LSTM(64, activation='tanh', return_sequences=True,
                   kernel_regularizer=l1_l2_reg))

    # Layer 4: Final LSTM layer returns only the last hidden state for prediction
    model.add(LSTM(32, activation='tanh', return_sequences=False,
                   kernel_regularizer=l1_l2_reg))

    # Output Layers (2 Dense Layers)
    model.add(Dense(16, activation='relu'))
    model.add(Dense(1, activation='sigmoid')) # Output normalized prediction (0 to 1)

    return model

# --- III. Training Loop and Evaluation Strategy ---

def train_and_evaluate_lstm(model, X_train, Y_train, X_test, Y_test, optimizers, learning_rates, epochs=100):
    """Trains the LSTM model across various optimizers and evaluates performance."""
    results = {}

    for opt_name in optimizers:
        for lr in learning_rates:
            print(f"--- Training with {opt_name}, LR: {lr} ---")

            # 1. Initialize Optimizer (SWATS requires custom implementation logic not included here)
            if opt_name == 'Adam':
                optimizer = Adam(learning_rate=lr)
            elif opt_name == 'SGD':
                optimizer = SGD(learning_rate=lr)
            # Placeholder for SWATS (Start with Adam for demonstration)
            elif opt_name == 'SWATS':
                optimizer = Adam(learning_rate=lr)
            else:
                continue

            # 2. Compile Model (Loss is Mean Squared Error, MSE)
            model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])

            # 3. Fit Model (Training)
            history = model.fit(X_train, Y_train,
                                epochs=epochs,
                                batch_size=256,
                                validation_split=0.2,
                                verbose=0)

            # 4. Evaluate on Test Set
            loss, mae = model.evaluate(X_test, Y_test, verbose=0)

            # 5. Store Results (MAE and RMSE are the final metrics)
            training_results[(opt_name, lr)] = {
                'Final MAE (%)': mae * 100,
                'Final RMSE (%)': np.sqrt(loss) * 100
            }

    return training_results

# --- IV. TS-GAN Architecture (Conceptual Structure) ---

def define_tsgan_architecture(sequence_length, num_features, latent_dim=10):
    """
    Conceptual definition of the core TS-GAN models (Embedder, Recovery, Generator, Discriminator)
    using GRU layers (RNN-based components as required by the paper).
    """

    # 1. Embedder (E)
    E_input = Input(shape=(sequence_length, num_features))
    E_layer = GRU(latent_dim, activation='tanh', return_sequences=True)(E_input)
    E_output = Dense(latent_dim, activation='sigmoid')(E_

SyntaxError: invalid syntax (ipython-input-200521431.py, line 14)

In [None]:
df=df[Battery='B0005']