In [None]:

                                             # Week 2 â€“ EV Battery Health using Generative AI (VAE)
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, backend as K
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
import matplotlib.pyplot as plt

# Loading Dataset
file_path = "ev_battery_charging_data.csv"
df = pd.read_csv(file_path)
print("âœ… Dataset Loaded Successfully!")
print(df.head())
print("Dataset Shape:", df.shape)

# Data Cleaning
df = df.dropna()

# Encoding categorical columns
categorical_cols = ['Charging Mode', 'Battery Type', 'EV Model']
encoder = LabelEncoder()
for col in categorical_cols:
    df[col] = encoder.fit_transform(df[col])

# Droping unnecessary columns (if any non-numeric)
df = df.select_dtypes(include=[np.number])

# Step 4: Prepare Data for Model
X = df.values
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
print("âœ… Cleaned Dataset Shape:", X_scaled.shape)

input_dim = X_scaled.shape[1]
print("Input features:", input_dim)


# Build Variational Autoencoder (VAE) - Custom Model


latent_dim = 4  # compressed latent space

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.reconstruction_loss_fn = tf.keras.losses.MeanSquaredError()
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        # Unpack the data (since it's passed as (x, y) in fit, where x and y are the same for VAE)
        x = data[0]

        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(x)
            reconstruction = self.decoder(z)
            reconstruction_loss = self.reconstruction_loss_fn(x, reconstruction)
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            total_loss = reconstruction_loss + tf.reduce_mean(kl_loss)

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

# Define encoder and decoder separately
encoder_inputs = keras.Input(shape=(input_dim,))
x = layers.Dense(64, activation="relu")(encoder_inputs)
x = layers.Dense(32, activation="relu")(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)

# Sampling function integrated into encoder output
def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

z = layers.Lambda(sampling, name="z")([z_mean, z_log_var])

encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")


decoder_inputs = keras.Input(shape=(latent_dim,))
d = layers.Dense(32, activation="relu")(decoder_inputs)
d = layers.Dense(64, activation="relu")(d)
decoder_outputs = layers.Dense(input_dim, activation="sigmoid")(d)
decoder = keras.Model(decoder_inputs, decoder_outputs, name="decoder")

# Instantiate the custom VAE model
vae = VAE(encoder, decoder)
vae.compile(optimizer="adam")
vae.summary()

# Train the Model
history = vae.fit(X_scaled, X_scaled, epochs=50, batch_size=32, verbose=1)
vae.save("vae_ev_battery_model.h5")
print("âœ… Model trained and saved successfully!")


# Visualize Training Progress

plt.figure(figsize=(7,4))
plt.plot(history.history['loss'], label='Training Loss')
plt.title("VAE Training Loss Curve")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()


# Generate Synthetic Data

latent_samples = np.random.normal(size=(10, latent_dim))
generated_data = decoder.predict(latent_samples)
generated_df = pd.DataFrame(scaler.inverse_transform(generated_data), columns=df.columns)

print("\nðŸ§  Sample Synthetic EV Battery Data Generated:")
print(generated_df.head())

# Latent Space Visualization

latent_representations = encoder.predict(X_scaled)

plt.figure(figsize=(6,5))
plt.scatter(latent_representations[:, 0], latent_representations[:, 1], alpha=0.6, c=df['Degradation Rate (%)'], cmap='viridis')
plt.colorbar(label='Degradation Rate (%)')
plt.title("Latent Space Visualization (colored by Degradation Rate)")
plt.xlabel("z[0]")
plt.ylabel("z[1]")
plt.show()