In [None]:
pip install imbalanced-learn

In [None]:
pip install keras-tuner

In [None]:
pip install tensorflow-hub

In [None]:
pip install --upgrade tensorflow

In [None]:
pip install tensorflow-hub


In [None]:
pip list | findstr tensorflow

In [None]:
pip install tensorflow keras-tuner numpy matplotlib

In [None]:
import os
import json
import numpy as np
import pickle
import torch
import tensorflow.keras.backend as K
import matplotlib.pyplot as plt
from collections import Counter
from transformers import BertTokenizer, BertModel
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import (Input, LSTM, Dense, Dropout, Add, LayerNormalization, 
                                     Bidirectional, MultiHeadAttention, GlobalAveragePooling1D)
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from imblearn.over_sampling import SMOTE
import keras_tuner as kt
from sklearn.utils.class_weight import compute_class_weight

# ==============================
# 🔹 1. Load Dataset
# ==============================
with open("combined_dataset.json", "r") as f:
    parsed_data = [json.loads(line) for line in f if line.strip()]

X_texts = [sample["Context"] for sample in parsed_data]
Y_labels = [sample["Response"] for sample in parsed_data]

# ==============================
# 🔹 2. Load BERT Tokenizer & Model
# ==============================
bert_model_name = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(bert_model_name)
bert_model = BertModel.from_pretrained(bert_model_name)

# ==============================
# 🔹 3. Function to Convert Text to BERT Embeddings
# ==============================
def get_bert_embeddings(texts, batch_size=16):
    """Converts input texts into BERT embeddings."""
    embeddings = []
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i : i + batch_size]
        tokens = tokenizer(batch_texts, padding=True, truncation=True, return_tensors="pt", max_length=128)
        
        with torch.no_grad():
            output = bert_model(**tokens)
        
        cls_embedding = output.last_hidden_state[:, 0, :].numpy()  # Extract [CLS] token
        embeddings.append(cls_embedding)
    
    return np.vstack(embeddings)

# ==============================
# 🔹 4. Load or Compute BERT Embeddings
# ==============================
if os.path.exists("bert_embeddings.npy") and os.path.exists("y_encoded.npy"):
    X_emb = np.load("bert_embeddings.npy")
    Y_encoded = np.load("y_encoded.npy")
    with open("response_encoder.pkl", "rb") as f:
        label_encoder = pickle.load(f)
    print("✅ Loaded saved BERT embeddings and labels.")
else:
    X_emb = get_bert_embeddings(X_texts)
    label_encoder = LabelEncoder()
    Y_encoded = label_encoder.fit_transform(Y_labels)

    np.save("bert_embeddings.npy", X_emb)
    np.save("y_encoded.npy", Y_encoded)
    with open("response_encoder.pkl", "wb") as f:
        pickle.dump(label_encoder, f)
    print("✅ Computed and saved BERT embeddings.")

# Convert labels to one-hot encoding
num_classes = len(label_encoder.classes_)
Y_encoded = to_categorical(Y_encoded, num_classes=num_classes)

# ==============================
# 🔹 5. Class Balancing with SMOTE
# ==============================
# Convert labels to integer format for SMOTE
Y_resampled_labels = np.argmax(Y_encoded, axis=1)  # Convert one-hot to class labels

# Count occurrences of each class
class_counts = Counter(Y_resampled_labels)
print(f"🔍 Class distribution before SMOTE: {class_counts}")

# Filter out classes with fewer than 2 samples
valid_classes = {label for label, count in class_counts.items() if count > 1}

# Keep only samples belonging to valid classes
valid_indices = [i for i, label in enumerate(Y_resampled_labels) if label in valid_classes]

# Update X and Y to remove rare classes
X_filtered = X_emb[valid_indices]
Y_filtered_labels = Y_resampled_labels[valid_indices]

# Apply SMOTE only if multiple valid classes exist
if len(set(Y_filtered_labels)) > 1:
    min_samples_per_class = min(Counter(Y_filtered_labels).values())  # Find smallest class size
    smote_k = min(5, min_samples_per_class - 1) if min_samples_per_class > 1 else 1  # Ensure k_neighbors >= 1

    smote = SMOTE(random_state=42, k_neighbors=smote_k)
    X_resampled, Y_resampled_labels = smote.fit_resample(X_filtered, Y_filtered_labels)

    # Convert back to one-hot encoding
    Y_resampled = to_categorical(Y_resampled_labels, num_classes=num_classes)
    print(f"✅ SMOTE applied successfully. New class distribution: {Counter(Y_resampled_labels)}")
else:
    X_resampled, Y_resampled = X_filtered, to_categorical(Y_filtered_labels, num_classes=num_classes)
    print("⚠️ Skipping SMOTE because not enough valid classes exist.")

# ==============================
# 🔹 6. Train-Test Split
# ==============================
try:
    X_train, X_test, Y_train, Y_test = train_test_split(
        X_resampled, Y_resampled, test_size=0.2, random_state=42, stratify=Y_resampled_labels
    )
except ValueError as e:
    print("⚠️ WARNING: Not enough samples for stratified split. Using a regular split.")
    X_train, X_test, Y_train, Y_test = train_test_split(
        X_resampled, Y_resampled, test_size=0.2, random_state=42
    )

# ==============================
# 🔹 7. Debugging & Fixing Label Issues
# ==============================
Y_train_labels = np.argmax(Y_train, axis=1)
min_label, max_label = np.min(Y_train_labels), np.max(Y_train_labels)

print(f"🟢 Min label index in Y_train: {min_label}")
print(f"🟢 Max label index in Y_train: {max_label}")

if max_label >= num_classes:
    print(f"🔴 ERROR: Found label index {max_label}, but num_classes is {num_classes}!")
    num_classes = max_label + 1  # Fix the issue
    print(f"🔄 Adjusted num_classes: {num_classes}")

# Expand dimensions for LSTM input
X_train = np.expand_dims(X_train, axis=1)
X_test = np.expand_dims(X_test, axis=1)

# Convert labels again with updated num_classes
Y_train = to_categorical(Y_train_labels, num_classes=num_classes)
Y_test = to_categorical(np.argmax(Y_test, axis=1), num_classes=num_classes)

# Compute Class Weights
class_weights = compute_class_weight("balanced", classes=np.unique(Y_resampled_labels), y=Y_resampled_labels)
class_weight_dict = {i: max(w, 0.5) for i, w in enumerate(class_weights)}

# ==============================
# 🔹 8. Print Final Data Shapes & Stats
# ==============================
print("✅ Training samples:", len(X_train), "Test samples:", len(X_test))
print("X_train shape:", X_train.shape)
print("Y_train shape:", Y_train.shape)
print("X_test shape:", X_test.shape)
print("Y_test shape:", Y_test.shape)
print("Class weights:", class_weight_dict)

np.save("X_train.npy", X_train)
np.save("Y_train.npy", Y_train)
np.save("X_test.npy", X_test)
np.save("Y_test.npy", Y_test)

print("💾 Saved X_train, Y_train, X_test, and Y_test to disk.")

In [None]:
import os
import json
import numpy as np
import tensorflow as tf
import keras_tuner as kt
import tensorflow.keras.backend as K
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import (
    Input, Dense, Dropout, LayerNormalization, Bidirectional, LSTM,
    GlobalAveragePooling1D, Add, MultiHeadAttention
)
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.initializers import HeNormal
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.metrics import TopKCategoricalAccuracy
from tensorflow.keras.saving import register_keras_serializable

# ==============================
# 🔹 1. Custom LSTM
# ==============================
@register_keras_serializable()
class CustomLSTM(LSTM):
    def __init__(self, *args, **kwargs):
        kwargs.pop("time_major", None)
        super().__init__(*args, **kwargs)

# ==============================
# 🔹 2. Focal Loss
# ==============================
@register_keras_serializable()
def focal_loss(alpha=0.25, gamma=2.0):
    def loss(y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1.0 - tf.keras.backend.epsilon())
        cross_entropy = -y_true * tf.math.log(y_pred)
        weight = alpha * tf.math.pow(1 - y_pred, gamma)
        return tf.reduce_mean(weight * cross_entropy)
    return loss

# ==============================
# 🔹 3. Model Architecture
# ==============================
def build_model(input_shape=(1, 768), num_classes=2480, num_heads=4, key_dim=64, lstm_units=256, dropout_rate=0.3):
    """
    Build the model with the given input shape.
    
    Parameters:
    - input_shape: Tuple representing (sequence_length, feature_dim), default (1, 768)
    - Other parameters remain the same
    """
    inputs = Input(shape=input_shape)
    
    # Initial projection
    x = Dense(512, activation="relu", kernel_initializer=HeNormal())(inputs)
    x = LayerNormalization()(x)

    # Calculate the output dimension of the Bidirectional LSTM
    bilstm_dim = lstm_units * 2  # Multiply by 2 because it's bidirectional
    
    # First Bidirectional LSTM - Only if sequence length > 1
    if input_shape[0] > 1:
        x = Bidirectional(CustomLSTM(lstm_units, return_sequences=True, dropout=dropout_rate))(x)
    else:
        # For sequence length=1, BiLSTM doesn't make sense, use Dense instead
        x = Dense(bilstm_dim, activation="relu")(x)
        x = Dropout(dropout_rate)(x)
    
    # Multi-Head Attention - only if sequence length > 1
    if input_shape[0] > 1:
        attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)(x, x)
        attn_output = Dense(bilstm_dim)(attn_output)  # Project to match output
        x = Add()([x, attn_output])
        x = LayerNormalization()(x)

    # Residual Dense Block
    residual = x
    x = Dense(bilstm_dim, activation="relu")(x)
    x = Dropout(dropout_rate)(x)
    x = Add()([x, residual])
    x = LayerNormalization()(x)

    # Second Bidirectional LSTM - Only if sequence length > 1
    if input_shape[0] > 1:
        second_lstm_units = lstm_units // 2
        x = Bidirectional(CustomLSTM(second_lstm_units, return_sequences=True, dropout=dropout_rate))(x)
        x = GlobalAveragePooling1D()(x)
    else:
        # For sequence length=1, flatten directly
        x = tf.keras.layers.Flatten()(x)

    # Final layers
    x = Dense(256, activation="relu", kernel_regularizer=l2(1e-4))(x)
    x = Dropout(dropout_rate)(x)
    x = Dense(128, activation="relu")(x)

    outputs = Dense(num_classes, activation="softmax")(x)

    return Model(inputs, outputs)


# ==============================
# 🔹 4. Hyperparameter Tuner
# ==============================
def model_builder(hp):
    # Get actual input shape from the data
    sequence_length = X_train.shape[1]
    feature_dim = X_train.shape[2]
    actual_input_shape = (sequence_length, feature_dim)
    
    lstm_units = hp.Int("lstm_units", 128, 512, step=64)
    num_heads = hp.Int("num_heads", 2, 8, step=2)
    key_dim = hp.Int("key_dim", 32, 128, step=32)
    dropout_rate = hp.Float("dropout_rate", 0.2, 0.5, step=0.05)
    lr = hp.Float("learning_rate", 1e-5, 1e-3, sampling="log")

    model = build_model(
        input_shape=actual_input_shape,
        num_classes=Y_train.shape[1],  # Get number of classes from actual data
        num_heads=num_heads,
        key_dim=key_dim,
        lstm_units=lstm_units,
        dropout_rate=dropout_rate
    )

    model.compile(
        optimizer=Adam(learning_rate=lr),
        loss=focal_loss(),
        metrics=["accuracy", TopKCategoricalAccuracy(k=5, name="top_5_accuracy")]
    )
    return model

# ==============================
# 🔹 5. Load Your Data
# ==============================
# Replace with actual data
X_train, Y_train = np.load("X_train.npy"), np.load("Y_train.npy")
X_test, Y_test = np.load("X_test.npy"), np.load("Y_test.npy")

# Check and print the actual shapes of your data
print(f"X_train shape: {X_train.shape}")
print(f"Y_train shape: {Y_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"Y_test shape: {Y_test.shape}")

# ==============================
# 🔹 6. Tuning - WITH ERROR HANDLING
# ==============================
try:
    tuner = kt.Hyperband(
        model_builder, 
        objective="val_accuracy", 
        max_epochs=50, 
        factor=3, 
        directory="tuner_results_v3",
        overwrite=True  # Use this to start fresh if needed
    )
    
    # Increase max_consecutive_failed_trials to be more tolerant of errors
    tuner.oracle.max_consecutive_failed_trials = 5
    
    tuner.search(
        X_train, Y_train, 
        epochs=20, 
        validation_split=0.2,
        verbose=1
    )
    
    best_hps = tuner.get_best_hyperparameters(1)[0]
    best_model = tuner.get_best_models(1)[0]

except Exception as e:
    print(f"Tuning error: {str(e)}")
    print("Falling back to default hyperparameters...")
    
    # Get actual shape from data
    actual_input_shape = (X_train.shape[1], X_train.shape[2])
    num_classes = Y_train.shape[1]
    
    # Fallback to default hyperparameters
    default_hps = {
        "learning_rate": 1e-4,
        "lstm_units": 256,
        "num_heads": 4,
        "key_dim": 64,
        "dropout_rate": 0.3
    }
    
    best_model = build_model(
        input_shape=actual_input_shape,
        num_classes=num_classes,
        num_heads=default_hps["num_heads"],
        key_dim=default_hps["key_dim"],
        lstm_units=default_hps["lstm_units"],
        dropout_rate=default_hps["dropout_rate"]
    )
    
    best_model.compile(
        optimizer=Adam(learning_rate=default_hps["learning_rate"]),
        loss=focal_loss(),
        metrics=["accuracy", TopKCategoricalAccuracy(k=5, name="top_5_accuracy")]
    )
    
    best_hps = default_hps

# Recompile best model with correct objects
best_model.compile(
    optimizer=Adam(learning_rate=best_hps.get("learning_rate", 1e-4)),
    loss=focal_loss(),
    metrics=["accuracy", TopKCategoricalAccuracy(k=5, name="top_5_accuracy")]
)

# Save hyperparams
with open("best_hyperparameters_v3.json", "w") as f:
    json.dump({
        "learning_rate": float(best_hps.get("learning_rate", 1e-4)),
        "lstm_units": best_hps.get("lstm_units", 256),
        "num_heads": best_hps.get("num_heads", 4),
        "dropout_rate": best_hps.get("dropout_rate", 0.3)
    }, f)

# ==============================
# 🔹 7. Training
# ==============================
early_stop = EarlyStopping(monitor="val_accuracy", patience=5, restore_best_weights=True, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor="val_loss", factor=0.2, patience=3, min_lr=1e-6, verbose=1)

history = best_model.fit(
    X_train, Y_train,
    validation_data=(X_test, Y_test),
    epochs=60,
    callbacks=[early_stop, reduce_lr]
)

# ==============================
# 🔹 8. Evaluate and Compare
# ==============================
test_loss, test_acc, test_top5 = best_model.evaluate(X_test, Y_test)
print(f"✅ Test Accuracy: {test_acc:.4f}")
print(f"🏅 Top-5 Accuracy: {test_top5:.4f}")
print(f"📊 Final Validation Accuracy: {history.history['val_accuracy'][-1]:.4f}")

# Compare with previous model
model_path = "best_transformer_lstm_model.keras"
prev_model_path = "previous_transformer_lstm_model.keras"
prev_model_acc = 0

if os.path.exists(model_path):
    print("📦 Loading previous model...")
    prev_model = load_model(
        model_path,
        custom_objects={"focal_loss": focal_loss(), "CustomLSTM": CustomLSTM}
    )
    prev_model.compile(optimizer=Adam(learning_rate=3e-5), loss=focal_loss(), metrics=["accuracy"])
    _, prev_model_acc = prev_model.evaluate(X_test, Y_test, verbose=0)

print(f"📉 Previous Model Accuracy: {prev_model_acc:.4f}")

# Save if improved
if test_acc > prev_model_acc + 0.005:
    if os.path.exists(model_path):
        os.rename(model_path, prev_model_path)
    best_model.save(model_path)
    print("✅ New best model saved!")
else:
    print("⚠️ No significant improvement over previous model.")

# ==============================
# 🔹 9. Plot Training Accuracy
# ==============================
plt.plot(history.history["accuracy"], label="Train Accuracy")
plt.plot(history.history["val_accuracy"], label="Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Training vs Validation Accuracy")
plt.legend()
plt.grid()
plt.tight_layout()
plt.show()