In [None]:
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import logging
from tqdm import tqdm
import tensorflow as tf
from google.colab import drive

In [None]:
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import logging
from tqdm import tqdm
import tensorflow as tf

In [None]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = logging.getLogger()

In [None]:
def bytes_to_image(file_path, width=256, height=256):
    """Convert binary file to grayscale image"""
    try:
        with open(file_path, 'rb') as f:
            byte_data = f.read()

        byte_values = np.frombuffer(byte_data, dtype=np.uint8)
        target_size = width * height

        if len(byte_values) > target_size:
            byte_values = byte_values[:target_size]
        else:
            byte_values = np.pad(byte_values, (0, target_size - len(byte_values)), 'constant')

        return byte_values.reshape(height, width)
    except Exception as e:
        logger.error(f"Error processing {file_path}: {str(e)}")
        return None


In [None]:
def convert_bytes_to_image(file_path, image_size=(256, 256)):
    try:
        with open(file_path, 'r') as file:
            lines = file.readlines()
            bytes_list = []

            for line in lines:
                parts = line.strip().split()
                if len(parts) > 1:
                    bytes_line = [int(b, 16) if b != '??' else 0 for b in parts[1:] if len(b) == 2]
                    bytes_list.extend(bytes_line)

            image_array = np.array(bytes_list, dtype=np.uint8)
            image_size_side = int(np.ceil(np.sqrt(len(image_array))))
            padded_array = np.pad(image_array, (0, image_size_side**2 - len(image_array)), 'constant')
            image = padded_array.reshape((image_size_side, image_size_side))
            image = Image.fromarray(image).resize(image_size)
            return np.array(image)
    except Exception as e:
        logger.warning(f"Error processing file {file_path}: {e}")
        return None

In [None]:
def load_data_from_csv(csv_path, data_dir):
    """Load dataset from CSV and prepare file paths"""
    df = pd.read_csv(csv_path)
    df['file_path'] = df['Id'].apply(lambda x: os.path.join(data_dir, f"{x}.bytes"))
    return df[['file_path', 'Class']]

In [None]:
def load_data_from_csv(csv_path, data_dir):
    df = pd.read_csv(csv_path)
    df = load_data_from_csv(CSV_PATH, DATA_DIR)

    df.columns = df.columns.str.strip()

    images = []
    labels = []

    for _, row in df.iterrows():
        file_id = row['file_name']  # Adjust this to match your CSV column
        label = row['classification']
        file_path = os.path.join(data_dir, f"{file_id}.bytes")

        if os.path.exists(file_path):
            image = convert_bytes_to_image(file_path)
            if image is not None:
                images.append(image)
                labels.append(label)
        else:
            logger.warning(f"File not found: {file_path}")

    return pd.DataFrame({'image': images, 'label': labels})


In [None]:
def create_dataset(df, img_size=(256, 256), max_samples=None):
    """Create image dataset from dataframe"""
    if max_samples:
        df = df.sample(max_samples)

    images = []
    labels = []

    for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing files"):
        img = bytes_to_image(row['file_path'], *img_size)
        if img is not None:
            images.append(img)
            labels.append(row['Class'])

    return np.array(images), np.array(labels)

In [None]:
def create_dataset(df, image_size, max_samples=None):
    images = df['image'].tolist()
    labels = df['label'].tolist()

    if max_samples:
        images = images[:max_samples]
        labels = labels[:max_samples]

    images = np.array(images).reshape(-1, *image_size)
    labels = np.array(labels)

    return images, labels

In [None]:
def build_model(input_shape, num_classes):
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model


In [None]:
def build_model(input_shape, num_classes):
    """Build CNN model for classification"""
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape),
        BatchNormalization(),
        Conv2D(32, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),

        Conv2D(64, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),

        Conv2D(128, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),

        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])

    model.compile(optimizer=Adam(0.001),
                  loss='categorical_crossentropy',
                  metrics=['accuracy', 'AUC',
                          tf.keras.metrics.Precision(name='precision'),
                          tf.keras.metrics.Recall(name='recall')])
    return model



In [None]:
# Plot accuracy and loss
def plot_results(history):
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
def plot_results(history):
    """Plot training history"""
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Accuracy
    axes[0, 0].plot(history.history['accuracy'], label='Train')
    axes[0, 0].plot(history.history['val_accuracy'], label='Validation')
    axes[0, 0].set_title('Accuracy')
    axes[0, 0].legend()

    # Loss
    axes[0, 1].plot(history.history['loss'], label='Train')
    axes[0, 1].plot(history.history['val_loss'], label='Validation')
    axes[0, 1].set_title('Loss')
    axes[0, 1].legend()

    # AUC
    axes[1, 0].plot(history.history['auc'], label='Train')
    axes[1, 0].plot(history.history['val_auc'], label='Validation')
    axes[1, 0].set_title('AUC')
    axes[1, 0].legend()

    # Precision & Recall
    axes[1, 1].plot(history.history['precision'], label='Precision')
    axes[1, 1].plot(history.history['recall'], label='Recall')
    axes[1, 1].set_title('Precision & Recall')
    axes[1, 1].legend()

    plt.tight_layout()
    plt.show()

In [None]:
def evaluate_performance(model, X_test, y_test, classes):
    """Evaluate model performance"""
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)

    print("\nClassification Report:")
    print(classification_report(y_true, y_pred_classes, target_names=classes))

    plt.figure(figsize=(10, 8))
    cm = confusion_matrix(y_true, y_pred_classes)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title('Confusion Matrix')
    plt.show()

    plt.figure(figsize=(10, 8))
    for i, class_name in enumerate(classes):
        fpr, tpr, _ = roc_curve(y_test[:, i], y_pred[:, i])
        auc = roc_auc_score(y_test[:, i], y_pred[:, i])
        plt.plot(fpr, tpr, label=f'{class_name} (AUC = {auc:.2f})')

    plt.plot([0, 1], [0, 1], 'k--')
    plt.title('ROC Curves')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.legend()
    plt.show()

In [None]:
def evaluate_performance(model, X_test, y_test, classes):
    print("\nEvaluating model performance...")
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test Accuracy: {test_acc:.4f}, Test Loss: {test_loss:.4f}")

    # Predictions
    y_pred_probs = model.predict(X_test)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = np.argmax(y_test, axis=1)

    # Classification report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=classes))

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.tight_layout()
    plt.show()

In [None]:
from google.colab import files
import os

# Upload the CSV file
uploaded = files.upload()

# Create directory for .bytes files
os.makedirs('/content/malware_bytes', exist_ok=True)

# Upload .bytes files
print("Upload your .bytes files to '/content/malware_bytes'")
uploaded_bytes = files.upload()
for filename in uploaded_bytes:
    os.rename(filename, f'/content/malware_bytes/{filename}')


Saving Malware dataset.csv to Malware dataset (1).csv
Upload your .bytes files to '/content/malware_bytes'


In [None]:
def evaluate_performance(model, X_test, y_test, classes):
    print("\nEvaluating model performance...")

    # Predict and convert predictions to label indices
    y_pred_probs = model.predict(X_test)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = np.argmax(y_test, axis=1)

    # Classification Report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=classes))

    # Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()

    # Print test accuracy and loss
    test_loss, test_acc = model.evaluate(X_test, y_test)
    print(f"\nTest Accuracy: {test_acc:.4f}, Test Loss: {test_loss:.4f}")

In [None]:
def evaluate_performance(model, X_test, y_test, classes):
    print("\nEvaluating model performance...")

    # Predict and convert predictions to label indices
    y_pred_probs = model.predict(X_test)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = np.argmax(y_test, axis=1)

    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()


In [None]:
def main():
    # Configuration
    CSV_PATH = '/content/Malware dataset.csv'
    DATA_DIR = '/content/malware_bytes'
    IMG_SIZE = (256, 256)
    BATCH_SIZE = 32
    EPOCHS = 30
    TEST_SIZE = 0.2
    RANDOM_STATE = 42
    MAX_SAMPLES = None  # Set to a number (e.g., 100) for quick testing

    # Load CSV
    logger.info("Loading and previewing CSV data...")
    df = pd.read_csv(CSV_PATH)
    df.columns = df.columns.str.strip()
    print(df.head())
    print("\nClass distribution:")
    print(df['classification'].value_counts())

    # Prepare dataset
    df = load_data_from_csv(CSV_PATH, DATA_DIR)
    X, y = create_dataset(df, IMG_SIZE, MAX_SAMPLES)

    # Preprocess
    X = X.reshape(-1, *IMG_SIZE, 1) / 255.0
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)
    y_categorical = to_categorical(y_encoded)
    classes = le.classes_

    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y_categorical, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y_encoded)

    # Data augmentation
    datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        fill_mode='constant',
        cval=0)

    # Build and train model
    model = build_model((*IMG_SIZE, 1), len(classes))
    model.summary()

    callbacks = [
        EarlyStopping(patience=5, restore_best_weights=True),
        ModelCheckpoint('best_model_colab.h5', save_best_only=True),
        ReduceLROnPlateau(factor=0.2, patience=3)
    ]

    logger.info("Training model...")
    history = model.fit(
        datagen.flow(X_train, y_train, batch_size=BATCH_SIZE),
        epochs=EPOCHS,
        validation_data=(X_test, y_test),
        callbacks=callbacks,
        verbose=1)

    # Evaluate
    plot_results(history)
    evaluate_performance(model, X_test, y_test, classes)

    # Save model
    model.save('malware_classifier_colab.h5')
    logger.info("Model saved successfully as 'malware_classifier_colab.h5'")

if __name__ == "__main__":
    main()

                                                hash  millisecond  \
0  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            0   
1  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            1   
2  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            2   
3  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            3   
4  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            4   

  classification  state  usage_counter        prio  static_prio  normal_prio  \
0        malware      0              0  3069378560        14274            0   
1        malware      0              0  3069378560        14274            0   
2        malware      0              0  3069378560        14274            0   
3        malware      0              0  3069378560        14274            0   
4        malware      0              0  3069378560        14274            0   

   policy  vm_pgoff  ...  nivcsw  min_flt  maj_flt  fs_excl_counter  \
0       0         0  ...       0 

KeyError: 'file_name'

In [None]:
import os
import pandas as pd
import numpy as np
import logging
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Convert .bytes to grayscale image
def convert_bytes_to_image(file_path, image_size=(256, 256)):
    try:
        with open(file_path, 'r') as file:
            lines = file.readlines()
            bytes_list = []

            for line in lines:
                parts = line.strip().split()
                if len(parts) > 1:
                    bytes_line = [int(b, 16) if b != '??' else 0 for b in parts[1:] if len(b) == 2]
                    bytes_list.extend(bytes_line)

            image_array = np.array(bytes_list, dtype=np.uint8)
            image_size_side = int(np.ceil(np.sqrt(len(image_array))))
            padded_array = np.pad(image_array, (0, image_size_side**2 - len(image_array)), 'constant')
            image = padded_array.reshape((image_size_side, image_size_side))
            image = Image.fromarray(image).resize(image_size)
            return np.array(image)
    except Exception as e:
        logger.warning(f"Error processing file {file_path}: {e}")
        return None

# Load data from CSV and bytes files
def load_data_from_csv(csv_path, data_dir):
    df = pd.read_csv(csv_path)
    df.columns = df.columns.str.strip()

    images = []
    labels = []

    for _, row in df.iterrows():
        file_id = row['file_name']  # Adjust this to match your CSV column
        label = row['classification']
        file_path = os.path.join(data_dir, f"{file_id}.bytes")

        if os.path.exists(file_path):
            image = convert_bytes_to_image(file_path)
            if image is not None:
                images.append(image)
                labels.append(label)
        else:
            logger.warning(f"File not found: {file_path}")

    return pd.DataFrame({'image': images, 'label': labels})

# Create numpy arrays from dataset
def create_dataset(df, image_size, max_samples=None):
    images = df['image'].tolist()
    labels = df['label'].tolist()

    if max_samples:
        images = images[:max_samples]
        labels = labels[:max_samples]

    images = np.array(images).reshape(-1, *image_size)
    labels = np.array(labels)

    return images, labels

# Build CNN model
def build_model(input_shape, num_classes):
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Plot accuracy and loss
def plot_results(history):
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Evaluate with classification matrix
def evaluate_performance(model, X_test, y_test, classes):
    print("\nEvaluating model performance...")
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test Accuracy: {test_acc:.4f}, Test Loss: {test_loss:.4f}")

    # Predictions
    y_pred_probs = model.predict(X_test)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = np.argmax(y_test, axis=1)

    # Classification report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=classes))

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.tight_layout()
    plt.show()

# Main pipeline
def main():
    # Configuration
    CSV_PATH = '/content/Malware dataset.csv'
    DATA_DIR = '/content/malware_bytes'
    IMG_SIZE = (256, 256)
    BATCH_SIZE = 32
    EPOCHS = 30
    TEST_SIZE = 0.2
    RANDOM_STATE = 42
    MAX_SAMPLES = None  # Optional: set to limit sample count

    # Load and preview CSV
    logger.info("Loading and previewing CSV data...")
    df_preview = pd.read_csv(CSV_PATH)
    df_preview.columns = df_preview.columns.str.strip()
    print("CSV Columns:", df_preview.columns.tolist())
    print(df_preview.head())
    print("\nClass distribution:")
    print(df_preview['classification'].value_counts())

    # Load image data
    df = load_data_from_csv(CSV_PATH, DATA_DIR)
    X, y = create_dataset(df, IMG_SIZE, MAX_SAMPLES)

    # Preprocessing
    X = X.reshape(-1, *IMG_SIZE, 1) / 255.0
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)
    y_categorical = to_categorical(y_encoded)
    classes = le.classes_

    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y_categorical, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y_encoded)

    # Data augmentation
    datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        fill_mode='constant',
        cval=0)

    # Build and train model
    model = build_model((*IMG_SIZE, 1), len(classes))
    model.summary()

    callbacks = [
        EarlyStopping(patience=5, restore_best_weights=True),
        ModelCheckpoint('best_model_colab.h5', save_best_only=True),
        ReduceLROnPlateau(factor=0.2, patience=3)
    ]

    logger.info("Training model...")
    history = model.fit(
        datagen.flow(X_train, y_train, batch_size=BATCH_SIZE),
        epochs=EPOCHS,
        validation_data=(X_test, y_test),
        callbacks=callbacks,
        verbose=1
    )

    # Plot and evaluate
    plot_results(history)
    evaluate_performance(model, X_test, y_test, classes)

    # Save model
    model.save('malware_classifier_colab.h5')
    logger.info("Model saved successfully as 'malware_classifier_colab.h5'")

import os
import pandas as pd
import numpy as np
import logging
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Convert .bytes to grayscale image
def convert_bytes_to_image(file_path, image_size=(256, 256)):
    try:
        with open(file_path, 'r') as file:
            lines = file.readlines()
            bytes_list = []

            for line in lines:
                parts = line.strip().split()
                if len(parts) > 1:
                    bytes_line = [int(b, 16) if b != '??' else 0 for b in parts[1:] if len(b) == 2]
                    bytes_list.extend(bytes_line)

            image_array = np.array(bytes_list, dtype=np.uint8)
            image_size_side = int(np.ceil(np.sqrt(len(image_array))))
            padded_array = np.pad(image_array, (0, image_size_side**2 - len(image_array)), 'constant')
            image = padded_array.reshape((image_size_side, image_size_side))
            image = Image.fromarray(image).resize(image_size)
            return np.array(image)
    except Exception as e:
        logger.warning(f"Error processing file {file_path}: {e}")
        return None

# Load data from CSV and bytes files
def load_data_from_csv(csv_path, data_dir):
    df = pd.read_csv(csv_path)
    df.columns = df.columns.str.strip()

    images = []
    labels = []

    for _, row in df.iterrows():
        file_id = row['file_name']  # Adjust this to match your CSV column
        label = row['classification']
        file_path = os.path.join(data_dir, f"{file_id}.bytes")

        if os.path.exists(file_path):
            image = convert_bytes_to_image(file_path)
            if image is not None:
                images.append(image)
                labels.append(label)
        else:
            logger.warning(f"File not found: {file_path}")

    return pd.DataFrame({'image': images, 'label': labels})

# Create numpy arrays from dataset
def create_dataset(df, image_size, max_samples=None):
    images = df['image'].tolist()
    labels = df['label'].tolist()

    if max_samples:
        images = images[:max_samples]
        labels = labels[:max_samples]

    images = np.array(images).reshape(-1, *image_size)
    labels = np.array(labels)

    return images, labels

# Build CNN model
def build_model(input_shape, num_classes):
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Plot accuracy and loss
def plot_results(history):
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Evaluate with classification matrix
def evaluate_performance(model, X_test, y_test, classes):
    print("\nEvaluating model performance...")
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test Accuracy: {test_acc:.4f}, Test Loss: {test_loss:.4f}")

    # Predictions
    y_pred_probs = model.predict(X_test)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = np.argmax(y_test, axis=1)

    # Classification report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=classes))

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.tight_layout()
    plt.show()

# Main pipeline
def main():
    # Configuration
    CSV_PATH = '/content/Malware dataset.csv'
    DATA_DIR = '/content/malware_bytes'
    IMG_SIZE = (256, 256)
    BATCH_SIZE = 32
    EPOCHS = 30
    TEST_SIZE = 0.2
    RANDOM_STATE = 42
    MAX_SAMPLES = None  # Optional: set to limit sample count

    # Load and preview CSV
    logger.info("Loading and previewing CSV data...")
    df_preview = pd.read_csv(CSV_PATH)
    df_preview.columns = df_preview.columns.str.strip()
    print("CSV Columns:", df_preview.columns.tolist())
    print(df_preview.head())
    print("\nClass distribution:")
    print(df_preview['classification'].value_counts())

    # Load image data
    df = load_data_from_csv(CSV_PATH, DATA_DIR)
    X, y = create_dataset(df, IMG_SIZE, MAX_SAMPLES)

    # Preprocessing
    X = X.reshape(-1, *IMG_SIZE, 1) / 255.0
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)
    y_categorical = to_categorical(y_encoded)
    classes = le.classes_

    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y_categorical, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y_encoded)

    # Data augmentation
    datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        fill_mode='constant',
        cval=0)

    # Build and train model
    model = build_model((*IMG_SIZE, 1), len(classes))
    model.summary()

    callbacks = [
        EarlyStopping(patience=5, restore_best_weights=True),
        ModelCheckpoint('best_model_colab.h5', save_best_only=True),
        ReduceLROnPlateau(factor=0.2, patience=3)
    ]

    logger.info("Training model...")
    history = model.fit(
        datagen.flow(X_train, y_train, batch_size=BATCH_SIZE),
        epochs=EPOCHS,
        validation_data=(X_test, y_test),
        callbacks=callbacks,
        verbose=1
    )

    # Plot and evaluate
    plot_results(history)
    evaluate_performance(model, X_test, y_test, classes)

    # Save model
    model.save('malware_classifier_colab.h5')
    logger.info("Model saved successfully as 'malware_classifier_colab.h5'")

if __name__ == "__main__":
    main()


CSV Columns: ['hash', 'millisecond', 'classification', 'state', 'usage_counter', 'prio', 'static_prio', 'normal_prio', 'policy', 'vm_pgoff', 'vm_truncate_count', 'task_size', 'cached_hole_size', 'free_area_cache', 'mm_users', 'map_count', 'hiwater_rss', 'total_vm', 'shared_vm', 'exec_vm', 'reserved_vm', 'nr_ptes', 'end_data', 'last_interval', 'nvcsw', 'nivcsw', 'min_flt', 'maj_flt', 'fs_excl_counter', 'lock', 'utime', 'stime', 'gtime', 'cgtime', 'signal_nvcsw']
                                                hash  millisecond  \
0  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            0   
1  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            1   
2  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            2   
3  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            3   
4  42fb5e2ec009a05ff5143227297074f1e9c6c3ebb9c914...            4   

  classification  state  usage_counter        prio  static_prio  normal_prio  \
0        malware      0              0 

KeyError: 'file_name'