In [1]:
import csv
import numpy as np
from PIL import Image, ImageEnhance, ImageFilter
import os
from skimage.filters import sobel, gaussian
from skimage.exposure import equalize_adapthist
import matplotlib.cm as cm
import matplotlib.colors as mcolors

# Function to generate and save the heatmap image from row data
def generate_image_from_row(row_dict, output_folder_path):
    # Get the image name from 'family' and 'hash' keys
    family = row_dict.get('family', 'unknown')
    file_hash = row_dict.get('hash', 'nohash')
    image_name = f"{family}_{file_hash}.png"

    # Extract only numeric features
    features = [float(value) for key, value in row_dict.items()
                if key not in ['hash', 'family'] and str(value).replace('.', '', 1).isdigit()]

    if len(features) == 0:
        print(f"⚠️ No valid numeric features for {image_name}")
        return

    # Dynamic normalization using mean ± 2*std
    mean_val = np.mean(features)
    std_val = np.std(features)
    dynamic_min = max(0, mean_val - 2 * std_val)
    dynamic_max = mean_val + 2 * std_val
    normalized = np.clip(features, dynamic_min, dynamic_max)
    normalized = (normalized - dynamic_min) / (dynamic_max - dynamic_min) * 255

    # Apply sigmoid scaling
    sigmoid_scaled = 255 / (1 + np.exp(-0.04 * (normalized - 127.5)))

    # Pad to nearest square
    total = len(sigmoid_scaled)
    side = int(np.ceil(np.sqrt(total)))
    padded = list(sigmoid_scaled) + [mean_val / (dynamic_max - dynamic_min) * 255] * (side * side - total)
    reshaped = np.array(padded).reshape((side, side))

    # Gaussian blur
    blurred = gaussian(reshaped / 255.0, sigma=0.7) * 255

    # CLAHE
    clahe = equalize_adapthist(blurred / 255.0, clip_limit=0.03, kernel_size=10) * 255

    # Sobel edge detection (slightly enhance edges for clarity)
    edges = sobel(clahe / 255.0)
    edge_enhanced = clahe * (1 + 0.5 * edges)
    edge_enhanced = np.clip(edge_enhanced, 0, 255).astype(np.uint8)

    # ✅ Apply heatmap (using "plasma" for clear patterns)
    norm = mcolors.Normalize(vmin=edge_enhanced.min(), vmax=edge_enhanced.max())
    colored = cm.plasma(norm(edge_enhanced))[:, :, :3]   # take RGB
    colored = (colored * 255).astype(np.uint8)
    image = Image.fromarray(colored)

    # Contrast and sharpening
    image = ImageEnhance.Contrast(image).enhance(1.8)
    image = image.filter(ImageFilter.UnsharpMask(radius=1.5, percent=200, threshold=2))

    # Save the image
    if not os.path.exists(output_folder_path):
        os.makedirs(output_folder_path)

    output_path = os.path.join(output_folder_path, image_name)
    image.save(output_path)
    print(f"✅ Saved Heatmap: {output_path} | Size: {side}x{side}")

# ---------- Main Code ----------

input_csv = r'D:\malware_LLm\Malware_Benign_API_call_argument_Feature_Vector.csv'
output_folder = r'D:\malware_LLm\Newheatmapout'

with open(input_csv, 'r', encoding='utf-8') as file:
    reader = csv.DictReader(file)
    for row in reader:
        generate_image_from_row(row, output_folder)


ModuleNotFoundError: No module named 'skimage'

In [None]:
import tensorflow as tf
print("TensorFlow version:", tf.__version__)
print("GPUs Available:", tf.config.list_physical_devices('GPU'))


In [None]:
# CNN for Heatmap Images (Training + Testing)

import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
import json
import datetime

# Constants
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 32
EPOCHS = 50
NUM_CLASSES = 8  # adware, backdoor, benign, downloader, spyware, trojan, virus, worm

# Paths (update these for your dataset)
data_dir = r'D:\malware_LLm\Newheatmapout'   # <-- folder jaha heatmap images hain
results_dir = r'D:\malware_LLm\heatmapresult'
os.makedirs(results_dir, exist_ok=True)

# Class names
class_names = ['adware', 'backdoor', 'benign', 'downloader', 'spyware', 'trojan', 'virus', 'worm']

# Dataset loader (for heatmap RGB images)
def load_dataset_with_filenames(folder_path):
    images = []
    labels = []
    filenames = []
    
    for filename in os.listdir(folder_path):
        if filename.endswith('.png'):
            class_name = filename.split('_')[0].lower()
            if class_name in class_names:
                img_path = os.path.join(folder_path, filename)
                # Load as RGB for heatmap
                img = load_img(img_path, color_mode='rgb', target_size=IMAGE_SIZE)
                img_array = img_to_array(img) / 255.0
                
                images.append(img_array)
                labels.append(class_name)
                filenames.append(filename)
    
    return np.array(images), np.array(labels), np.array(filenames)

# Load dataset
print("Loading dataset with filenames...")
X, y, filenames = load_dataset_with_filenames(data_dir)
print(f"Loaded {len(X)} images")

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
y_categorical = tf.keras.utils.to_categorical(y_encoded, num_classes=NUM_CLASSES)

# Split data
X_train, X_val, y_train, y_val, filenames_train, filenames_val = train_test_split(
    X, y_categorical, filenames, test_size=0.2, random_state=42, stratify=y_encoded
)

# Data augmentation
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

train_generator = train_datagen.flow(
    X_train, y_train, batch_size=BATCH_SIZE, shuffle=True
)

val_generator = ImageDataGenerator().flow(
    X_val, y_val, batch_size=BATCH_SIZE, shuffle=False
)

# CNN Model for Heatmap RGB
model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Flatten(),
    layers.Dense(256, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(NUM_CLASSES, activation='softmax')
])

model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# Train model with GPU
print("Training model on GPU...")
with tf.device('/GPU:0'):
    history = model.fit(
        train_generator,
        steps_per_epoch=len(X_train) // BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=val_generator,
        validation_steps=len(X_val) // BATCH_SIZE
    )


# Save results with timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
model_dir = os.path.join(results_dir, f"model_{timestamp}")
os.makedirs(model_dir, exist_ok=True)

# Save model + label encoder
model.save(os.path.join(model_dir, 'malware_heatmap_cnn.keras'))
np.save(os.path.join(model_dir, 'label_encoder.npy'), label_encoder.classes_)

# Predictions
print("Generating predictions...")
val_generator = ImageDataGenerator().flow(X_val, y_val, batch_size=BATCH_SIZE, shuffle=False)
y_pred = model.predict(val_generator)
predicted_classes = label_encoder.inverse_transform(np.argmax(y_pred, axis=1))
true_classes = label_encoder.inverse_transform(np.argmax(y_val, axis=1))

predictions_df = pd.DataFrame({
    'image_name': filenames_val,
    'actual_class': true_classes,
    'predicted_class': predicted_classes
})

csv_path = os.path.join(model_dir, 'predictions.csv')
predictions_df.to_csv(csv_path, index=False)
print(f"Predictions saved to {csv_path}")

# Save training artifacts
def save_training_artifacts():
    # Training history
    with open(os.path.join(model_dir, 'training_history.json'), 'w') as f:
        json.dump(history.history, f)
    
    # Model summary
    with open(os.path.join(model_dir, 'model_summary.txt'), 'w', encoding='utf-8') as f:
     model.summary(print_fn=lambda x: f.write(x + '\n'))

    
    # Metrics
    val_loss, val_acc = model.evaluate(val_generator, verbose=0)
    metrics = {
        'validation_accuracy': float(val_acc),
        'validation_loss': float(val_loss),
        'training_accuracy': float(history.history['accuracy'][-1]),
        'training_loss': float(history.history['loss'][-1])
    }
    with open(os.path.join(model_dir, 'metrics.json'), 'w') as f:
        json.dump(metrics, f, indent=4)
    
    # Training plots
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend()
    
    plt.savefig(os.path.join(model_dir, 'training_plots.png'))
    plt.close()

save_training_artifacts()

print(f"\nAll results saved to: {model_dir}")
print("Sample predictions:")
print(predictions_df.head())


In [None]:
import tensorflow as tf
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))
print("Devices:", tf.config.list_physical_devices())


In [None]:
import tensorflow as tf

print("TensorFlow version:", tf.__version__)
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))
print("Built with CUDA:", tf.test.is_built_with_cuda())
print("GPU Available:", tf.test.is_gpu_available())


In [None]:
import tensorflow as tf

print("TensorFlow version:", tf.__version__)
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))
print("GPU Details:", tf.config.list_physical_devices('GPU'))




In [None]:
import tensorflow as tf
print("TensorFlow:", tf.__version__)
print("GPUs:", tf.config.list_physical_devices("GPU"))


In [None]:
import tensorflow as tf
print("TensorFlow:", tf.__version__)
print("GPUs:", tf.config.list_physical_devices("GPU"))


In [None]:
"""
CNN for Malware Heatmap Images
---------------------------------
This script:
1. Loads heatmap images and preprocesses them
2. Trains a Convolutional Neural Network (CNN)
3. Saves predictions and evaluation metrics
4. Generates plots, confusion matrix, and reports
"""

# =============================
# Imports
# =============================
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import json
import datetime

# =============================
# Configuration
# =============================
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 32
EPOCHS = 50
NUM_CLASSES = 8
CLASS_NAMES = ['adware', 'backdoor', 'benign', 'downloader', 'spyware', 'trojan', 'virus', 'worm']

DATA_DIR = r"D:\malware_LLm\Newheatmapout"
RESULTS_DIR = r"D:\malware_LLm\heatmapresult"
os.makedirs(RESULTS_DIR, exist_ok=True)

# =============================
# Dataset Loader
# =============================
def load_dataset_with_filenames(folder_path):
    """Load heatmap images and return arrays of images, labels, and filenames"""
    images, labels, filenames = [], [], []

    for filename in os.listdir(folder_path):
        if filename.endswith(".png"):
            class_name = filename.split("_")[0].lower()
            if class_name in CLASS_NAMES:
                img_path = os.path.join(folder_path, filename)
                img = load_img(img_path, color_mode="rgb", target_size=IMAGE_SIZE)
                img_array = img_to_array(img) / 255.0

                images.append(img_array)
                labels.append(class_name)
                filenames.append(filename)

    return np.array(images), np.array(labels), np.array(filenames)

print("Loading dataset...")
X, y, filenames = load_dataset_with_filenames(DATA_DIR)
print(f"Loaded {len(X)} images.")

# =============================
# Label Encoding
# =============================
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
y_categorical = tf.keras.utils.to_categorical(y_encoded, num_classes=NUM_CLASSES)

# Train/Test Split
X_train, X_val, y_train, y_val, filenames_train, filenames_val = train_test_split(
    X, y_categorical, filenames, test_size=0.2, random_state=42, stratify=y_encoded
)

# =============================
# Data Augmentation
# =============================
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest"
)

train_generator = train_datagen.flow(X_train, y_train, batch_size=BATCH_SIZE, shuffle=True)
val_generator = ImageDataGenerator().flow(X_val, y_val, batch_size=BATCH_SIZE, shuffle=False)

# =============================
# CNN Model
# =============================
model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation="relu", input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(64, (3, 3), activation="relu"),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(128, (3, 3), activation="relu"),
    layers.MaxPooling2D((2, 2)),
    layers.Flatten(),
    layers.Dense(256, activation="relu"),
    layers.Dropout(0.5),
    layers.Dense(NUM_CLASSES, activation="softmax")
])

model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

# =============================
# Training
# =============================
print("Training model...")
history = model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=val_generator,
    validation_steps=len(X_val) // BATCH_SIZE
)

# =============================
# Save Model + Results Directory
# =============================
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
model_dir = os.path.join(RESULTS_DIR, f"model_{timestamp}")
os.makedirs(model_dir, exist_ok=True)

model.save(os.path.join(model_dir, "malware_heatmap_cnn.keras"))
np.save(os.path.join(model_dir, "label_encoder.npy"), label_encoder.classes_)

# =============================
# Predictions
# =============================
print("Generating predictions...")
y_pred = model.predict(val_generator)
predicted_classes = label_encoder.inverse_transform(np.argmax(y_pred, axis=1))
true_classes = label_encoder.inverse_transform(np.argmax(y_val, axis=1))

predictions_df = pd.DataFrame({
    "image_name": filenames_val,
    "actual_class": true_classes,
    "predicted_class": predicted_classes
})
predictions_df.to_csv(os.path.join(model_dir, "predictions.csv"), index=False)

# =============================
# Evaluation: Confusion Matrix & Report
# =============================
cm = confusion_matrix(true_classes, predicted_classes, labels=CLASS_NAMES)

plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=CLASS_NAMES, yticklabels=CLASS_NAMES)
plt.title("Confusion Matrix")
plt.ylabel("Actual Class")
plt.xlabel("Predicted Class")
plt.savefig(os.path.join(model_dir, "confusion_matrix.png"))
plt.close()

# Classification Report
report = classification_report(true_classes, predicted_classes, target_names=CLASS_NAMES, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df.to_csv(os.path.join(model_dir, "classification_report.csv"))

with open(os.path.join(model_dir, "classification_report.txt"), "w") as f:
    f.write(classification_report(true_classes, predicted_classes, target_names=CLASS_NAMES))

# =============================
# Metrics
# =============================
val_loss, val_acc = model.evaluate(val_generator, verbose=0)
metrics = {
    "validation_accuracy": float(val_acc),
    "validation_loss": float(val_loss),
    "training_accuracy": float(history.history["accuracy"][-1]),
    "training_loss": float(history.history["loss"][-1])
}
metrics_df = pd.DataFrame([metrics])
metrics_df.to_csv(os.path.join(model_dir, "metrics_results.csv"), index=False)

# Save Metrics Table as Image
plt.figure(figsize=(6, 2))
plt.axis("off")
table = plt.table(cellText=metrics_df.values, colLabels=metrics_df.columns, cellLoc="center", loc="center")
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1.2, 1.2)
plt.savefig(os.path.join(model_dir, "metrics_results_table.png"), bbox_inches="tight")
plt.close()

# =============================
# Training Plots
# =============================
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history.history["accuracy"], label="Train Accuracy", color="blue")
plt.plot(history.history["val_accuracy"], label="Validation Accuracy", color="orange")
plt.title("Training vs Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history["loss"], label="Train Loss", color="blue")
plt.plot(history.history["val_loss"], label="Validation Loss", color="orange")
plt.title("Training vs Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

plt.savefig(os.path.join(model_dir, "training_history.png"))
plt.close()

# Save Training History JSON
with open(os.path.join(model_dir, "training_history.json"), "w") as f:
    json.dump(history.history, f)

# Model Summary
with open(os.path.join(model_dir, "model_summary.txt"), "w", encoding="utf-8") as f:
    model.summary(print_fn=lambda x: f.write(x + "\n"))

print(f"\n✅ All results saved in: {model_dir}")
print("Sample predictions:")
print(predictions_df.head())

# =============================
# Final Combined Graph: Accuracy and Loss
# =============================
plt.figure(figsize=(10, 6))

epochs_range = range(1, EPOCHS + 1)

# Plot Accuracy
plt.plot(epochs_range, history.history["accuracy"], label="Train Accuracy", marker='o')
plt.plot(epochs_range, history.history["val_accuracy"], label="Validation Accuracy", marker='o')

# Plot Loss (scaled for better visualization)
plt.plot(epochs_range, np.array(history.history["loss"]) * 5, label="Train Loss (x5)", linestyle="--", marker='x')
plt.plot(epochs_range, np.array(history.history["val_loss"]) * 5, label="Validation Loss (x5)", linestyle="--", marker='x')

plt.title("Training History: Accuracy and Loss")
plt.xlabel("Epochs")
plt.ylabel("Accuracy / Loss (scaled)")
plt.legend()
plt.grid(True)

plt.savefig(os.path.join(model_dir, "combined_accuracy_loss.png"))
plt.close()


In [None]:
"""
CNN for Malware Heatmap Images with Hyperparameter Tuning
---------------------------------------------------------
Steps:
1. Loads heatmap images and preprocesses them
2. Runs hyperparameter tuning using KerasTuner
3. Selects the best CNN model
4. Retrains best model with full epochs
5. Saves predictions, metrics, and plots
"""

# =============================
# Imports
# =============================
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import json
import datetime
import keras_tuner as kt

# =============================
# Configuration
# =============================
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 32
EPOCHS = 50
NUM_CLASSES = 8
CLASS_NAMES = ['adware', 'backdoor', 'benign', 'downloader', 'spyware', 'trojan', 'virus', 'worm']

DATA_DIR = r"D:\malware_LLm\Newheatmapout"
RESULTS_DIR = r"D:\malware_LLm\heatmapresult"
os.makedirs(RESULTS_DIR, exist_ok=True)

# =============================
# Dataset Loader
# =============================
def load_dataset_with_filenames(folder_path):
    images, labels, filenames = [], [], []
    for filename in os.listdir(folder_path):
        if filename.endswith(".png"):
            class_name = filename.split("_")[0].lower()
            if class_name in CLASS_NAMES:
                img_path = os.path.join(folder_path, filename)
                img = load_img(img_path, color_mode="rgb", target_size=IMAGE_SIZE)
                img_array = img_to_array(img) / 255.0
                images.append(img_array)
                labels.append(class_name)
                filenames.append(filename)
    return np.array(images), np.array(labels), np.array(filenames)

print("Loading dataset...")
X, y, filenames = load_dataset_with_filenames(DATA_DIR)
print(f"Loaded {len(X)} images.")

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
y_categorical = tf.keras.utils.to_categorical(y_encoded, num_classes=NUM_CLASSES)

# Train/Test Split
X_train, X_val, y_train, y_val, filenames_train, filenames_val = train_test_split(
    X, y_categorical, filenames, test_size=0.2, random_state=42, stratify=y_encoded
)

# Data Augmentation
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest"
)
train_generator = train_datagen.flow(X_train, y_train, batch_size=BATCH_SIZE, shuffle=True)
val_generator = ImageDataGenerator().flow(X_val, y_val, batch_size=BATCH_SIZE, shuffle=False)

# =============================
# Model Builder for Hyperparameter Tuning
# =============================
def build_model(hp):
    model = models.Sequential()
    
    # Conv layers with tunable filters
    model.add(layers.Conv2D(
        filters=hp.Choice("conv1_filters", [32, 64, 128]),
        kernel_size=(3, 3),
        activation="relu",
        input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)
    ))
    model.add(layers.MaxPooling2D((2, 2)))
    
    model.add(layers.Conv2D(
        filters=hp.Choice("conv2_filters", [64, 128, 256]),
        kernel_size=(3, 3),
        activation="relu"
    ))
    model.add(layers.MaxPooling2D((2, 2)))
    
    model.add(layers.Conv2D(
        filters=hp.Choice("conv3_filters", [128, 256]),
        kernel_size=(3, 3),
        activation="relu"
    ))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Flatten
    model.add(layers.Flatten())
    
    # Dense layer with tunable units
    model.add(layers.Dense(
        units=hp.Choice("dense_units", [128, 256, 512]),
        activation="relu"
    ))
    
    # Dropout with tunable rate
    model.add(layers.Dropout(hp.Choice("dropout_rate", [0.3, 0.5, 0.7])))
    
    # Output
    model.add(layers.Dense(NUM_CLASSES, activation="softmax"))
    
    # Compile with tunable learning rate
    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            learning_rate=hp.Choice("learning_rate", [1e-2, 1e-3, 1e-4])
        ),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

# =============================
# Hyperparameter Tuning
# =============================
tuner = kt.RandomSearch(
    build_model,
    objective="val_accuracy",
    max_trials=5,  # try more for deeper search
    executions_per_trial=1,
    directory="tuner_results",
    project_name="malware_heatmap"
)

print("🔍 Running hyperparameter search...")
tuner.search(
    train_generator,
    validation_data=val_generator,
    epochs=10,   # short training for tuning
    steps_per_epoch=len(X_train)//BATCH_SIZE,
    validation_steps=len(X_val)//BATCH_SIZE
)

# Best model + hyperparameters
best_model = tuner.get_best_models(num_models=1)[0]
best_hps = tuner.get_best_hyperparameters(1)[0]
print("✅ Best Hyperparameters found:")
print(best_hps.values)

# =============================
# Retrain Best Model with Full Epochs
# =============================
print("🎯 Retraining best model...")
history = best_model.fit(
    train_generator,
    steps_per_epoch=len(X_train) // BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=val_generator,
    validation_steps=len(X_val) // BATCH_SIZE
)

# =============================
# Save Results
# =============================
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
model_dir = os.path.join(RESULTS_DIR, f"model_{timestamp}")
os.makedirs(model_dir, exist_ok=True)

best_model.save(os.path.join(model_dir, "malware_heatmap_cnn_best.keras"))
np.save(os.path.join(model_dir, "label_encoder.npy"), label_encoder.classes_)

# Predictions
print("Generating predictions...")
y_pred = best_model.predict(val_generator)
predicted_classes = label_encoder.inverse_transform(np.argmax(y_pred, axis=1))
true_classes = label_encoder.inverse_transform(np.argmax(y_val, axis=1))
predictions_df = pd.DataFrame({
    "image_name": filenames_val,
    "actual_class": true_classes,
    "predicted_class": predicted_classes
})
predictions_df.to_csv(os.path.join(model_dir, "predictions.csv"), index=False)

# Confusion Matrix
cm = confusion_matrix(true_classes, predicted_classes, labels=CLASS_NAMES)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=CLASS_NAMES, yticklabels=CLASS_NAMES)
plt.title("Confusion Matrix")
plt.ylabel("Actual Class")
plt.xlabel("Predicted Class")
plt.savefig(os.path.join(model_dir, "confusion_matrix.png"))
plt.close()

# Classification Report
report = classification_report(true_classes, predicted_classes, target_names=CLASS_NAMES, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df.to_csv(os.path.join(model_dir, "classification_report.csv"))
with open(os.path.join(model_dir, "classification_report.txt"), "w") as f:
    f.write(classification_report(true_classes, predicted_classes, target_names=CLASS_NAMES))

# Metrics
val_loss, val_acc = best_model.evaluate(val_generator, verbose=0)
metrics = {
    "validation_accuracy": float(val_acc),
    "validation_loss": float(val_loss),
    "training_accuracy": float(history.history["accuracy"][-1]),
    "training_loss": float(history.history["loss"][-1])
}
metrics_df = pd.DataFrame([metrics])
metrics_df.to_csv(os.path.join(model_dir, "metrics_results.csv"), index=False)

# Training History JSON
with open(os.path.join(model_dir, "training_history.json"), "w") as f:
    json.dump(history.history, f)

# Model Summary
with open(os.path.join(model_dir, "model_summary.txt"), "w", encoding="utf-8") as f:
    best_model.summary(print_fn=lambda x: f.write(x + "\n"))

print(f"\n✅ All results saved in: {model_dir}")


In [None]:
import tensorflow as tf


In [1]:
"""
CNN for Malware Heatmap Images with Hyperparameter Tuning
---------------------------------------------------------
Steps:
1. Loads heatmap images and preprocesses them
2. Runs hyperparameter tuning using KerasTuner
3. Selects the best CNN model
4. Retrains best model with full epochs
5. Saves predictions, metrics, and plots
"""

# =============================
# Imports
# =============================
import os
import math
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import json
import datetime
import keras_tuner as kt

# =============================
# Configuration
# =============================
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 32
EPOCHS = 50
CLASS_NAMES = ['adware', 'backdoor', 'benign', 'downloader', 'spyware', 'trojan', 'virus', 'worm']

DATA_DIR = r"D:\malware_LLm\Newheatmapout"
RESULTS_DIR = r"D:\malware_LLm\heatmapresult"
os.makedirs(RESULTS_DIR, exist_ok=True)

# =============================
# Dataset Loader
# =============================
def load_dataset_with_filenames(folder_path):
    images, labels, filenames = [], [], []
    for filename in os.listdir(folder_path):
        if filename.lower().endswith(".png"):
            class_name = filename.split("_")[0].lower()
            if class_name in CLASS_NAMES:
                img_path = os.path.join(folder_path, filename)
                img = load_img(img_path, color_mode="rgb", target_size=IMAGE_SIZE)
                img_array = img_to_array(img) / 255.0
                images.append(img_array)
                labels.append(class_name)
                filenames.append(filename)
    return np.array(images), np.array(labels), np.array(filenames)

print("Loading dataset...")
X, y, filenames = load_dataset_with_filenames(DATA_DIR)
print(f"Loaded {len(X)} images.")

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
NUM_CLASSES = len(label_encoder.classes_)   # dynamic classes
y_categorical = tf.keras.utils.to_categorical(y_encoded, num_classes=NUM_CLASSES)

print("Class counts:", dict(zip(*np.unique(y, return_counts=True))))
print("NUM_CLASSES:", NUM_CLASSES)

# Train/Test Split
X_train, X_val, y_train, y_val, filenames_train, filenames_val = train_test_split(
    X, y_categorical, filenames, test_size=0.2, random_state=42, stratify=y_encoded
)

print("Train size:", len(X_train), "Val size:", len(X_val))

# Data Augmentation
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest"
)
train_generator = train_datagen.flow(X_train, y_train, batch_size=BATCH_SIZE, shuffle=True)
val_generator = ImageDataGenerator().flow(X_val, y_val, batch_size=BATCH_SIZE, shuffle=False)

# Safe steps
steps_per_epoch = max(1, math.ceil(len(X_train) / BATCH_SIZE))
validation_steps = max(1, math.ceil(len(X_val) / BATCH_SIZE))
print("steps_per_epoch:", steps_per_epoch, "validation_steps:", validation_steps)

# =============================
# Model Builder for Hyperparameter Tuning
# =============================
def build_model(hp):
    model = models.Sequential()
    
    # Conv layers with tunable filters
    model.add(layers.Conv2D(
        filters=hp.Choice("conv1_filters", [32, 64, 128]),
        kernel_size=(3, 3),
        activation="relu",
        input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)
    ))
    model.add(layers.MaxPooling2D((2, 2)))
    
    model.add(layers.Conv2D(
        filters=hp.Choice("conv2_filters", [64, 128, 256]),
        kernel_size=(3, 3),
        activation="relu"
    ))
    model.add(layers.MaxPooling2D((2, 2)))
    
    model.add(layers.Conv2D(
        filters=hp.Choice("conv3_filters", [128, 256]),
        kernel_size=(3, 3),
        activation="relu"
    ))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Flatten
    model.add(layers.Flatten())
    
    # Dense layer with tunable units
    model.add(layers.Dense(
        units=hp.Choice("dense_units", [128, 256, 512]),
        activation="relu"
    ))
    
    # Dropout with tunable rate
    model.add(layers.Dropout(hp.Choice("dropout_rate", [0.3, 0.5, 0.7])))
    
    # Output
    model.add(layers.Dense(NUM_CLASSES, activation="softmax"))
    
    # Compile with tunable learning rate
    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            learning_rate=hp.Choice("learning_rate", [1e-2, 1e-3, 1e-4])
        ),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

# =============================
# Hyperparameter Tuning
# =============================
tuner = kt.RandomSearch(
    build_model,
    objective="val_accuracy",
    max_trials=5,
    executions_per_trial=1,
    directory="tuner_results",
    project_name="malware_heatmap"
)

print("🔍 Running hyperparameter search...")
tuner.search(
    train_generator,
    validation_data=val_generator,
    epochs=10,
    steps_per_epoch=steps_per_epoch,
    validation_steps=validation_steps
)

# Best model + hyperparameters
best_model = tuner.get_best_models(num_models=1)[0]
best_hps = tuner.get_best_hyperparameters(1)[0]
print("✅ Best Hyperparameters found:")
print(best_hps.values)

# =============================
# Retrain Best Model with Full Epochs
# =============================
print("🎯 Retraining best model...")
history = best_model.fit(
    train_generator,
    steps_per_epoch=steps_per_epoch,
    epochs=EPOCHS,
    validation_data=val_generator,
    validation_steps=validation_steps
)

# =============================
# Save Results
# =============================
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
model_dir = os.path.join(RESULTS_DIR, f"model_{timestamp}")
os.makedirs(model_dir, exist_ok=True)

best_model.save(os.path.join(model_dir, "malware_heatmap_cnn_best.keras"))
np.save(os.path.join(model_dir, "label_encoder.npy"), label_encoder.classes_)

# Predictions
print("Generating predictions...")
y_pred = best_model.predict(val_generator)
predicted_classes = label_encoder.inverse_transform(np.argmax(y_pred, axis=1))
true_classes = label_encoder.inverse_transform(np.argmax(y_val, axis=1))
predictions_df = pd.DataFrame({
    "image_name": filenames_val,
    "actual_class": true_classes,
    "predicted_class": predicted_classes
})
predictions_df.to_csv(os.path.join(model_dir, "predictions.csv"), index=False)

# Confusion Matrix
cm = confusion_matrix(true_classes, predicted_classes, labels=CLASS_NAMES)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=CLASS_NAMES, yticklabels=CLASS_NAMES)
plt.title("Confusion Matrix")
plt.ylabel("Actual Class")
plt.xlabel("Predicted Class")
plt.savefig(os.path.join(model_dir, "confusion_matrix.png"))
plt.close()

# Classification Report
report = classification_report(true_classes, predicted_classes, target_names=CLASS_NAMES, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df.to_csv(os.path.join(model_dir, "classification_report.csv"))
with open(os.path.join(model_dir, "classification_report.txt"), "w") as f:
    f.write(classification_report(true_classes, predicted_classes, target_names=CLASS_NAMES))

# Metrics
val_loss, val_acc = best_model.evaluate(val_generator, verbose=0)
metrics = {
    "validation_accuracy": float(val_acc),
    "validation_loss": float(val_loss),
    "training_accuracy": float(history.history["accuracy"][-1]),
    "training_loss": float(history.history["loss"][-1])
}
metrics_df = pd.DataFrame([metrics])
metrics_df.to_csv(os.path.join(model_dir, "metrics_results.csv"), index=False)

# Training History JSON
with open(os.path.join(model_dir, "training_history.json"), "w") as f:
    json.dump(history.history, f)

# Model Summary
with open(os.path.join(model_dir, "model_summary.txt"), "w", encoding="utf-8") as f:
    best_model.summary(print_fn=lambda x: f.write(x + "\n"))

print(f"\n✅ All results saved in: {model_dir}")


Trial 5 Complete [00h 21m 40s]
val_accuracy: 0.3914324641227722

Best val_accuracy So Far: 0.8266092538833618
Total elapsed time: 02h 41m 44s

✅ Best Hyperparameters found:
{'conv1_filters': 32, 'conv2_filters': 128, 'conv3_filters': 128, 'dense_units': 256, 'dropout_rate': 0.7, 'learning_rate': 0.0001}
🎯 Retraining best model...
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
Generating predictions...

✅ All results saved in: D:\malware_LLm\heatmapresu