In [None]:
import logging
import sys

# Configure logger
logger = logging.getLogger("neuzzpp")
logger.setLevel(logging.INFO)
console_logger = logging.StreamHandler(sys.stdout)
log_formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
console_logger.setFormatter(log_formatter)
logger.addHandler(console_logger)

In [None]:
import pathlib

# Analysis parameters
n_tests = 100
trial = 0  # Index for the trial run (0-29)
rng_seeds = [
    7739,
    6545,
    4388,
    4330,
    8585,
    859,
    6973,
    2014,
    941,
    5264,
    9756,
    7357,
    7611,
    7174,
    7860,
    5132,
    1281,
    8397,
    4503,
    5003,
    3707,
    1825,
    9267,
    7815,
    6438,
    4024,
    8227,
    5454,
    4434,
    4504,
]  # Same seeds as the 30 runs
random_seed = rng_seeds[trial]

# ML training parameters
validation_split = 0.1
n_epochs = 100
batch_size = 32
early_stopping = 10
add_unseen_neighbors = False
percentile_len = 80
hidden_neurons = 4096
lr = 1e-4

# Define paths - adapt to your values
root_folder = pathlib.Path("/shared")
target_name = "harfbuzz-1.3.2"
target = root_folder / "binaries" / (target_name + ".aflpp")
fuzzer = "NEUZZPP"
tests_path = (
    root_folder
    / "results"
    / "baselines"
    / target_name
    / fuzzer
    / f"trial-{trial}"
    / "default"
    / "queue"
)
tests_list = [test for test in tests_path.glob("id*")]
model_path = tests_path.parent / "models"
grads_path = tests_path.parent / "grads"

# Create missing folders
model_path.mkdir(parents=True, exist_ok=True)
grads_path.mkdir(parents=True, exist_ok=True)

# Model loading

In [None]:
from neuzzpp.data_loaders import CoverageSeedHandler, seed_data_generator

data_loader = CoverageSeedHandler(
    seeds_path=tests_path,
    target=[str(target)],
    percentile_len=percentile_len,
    validation_split=validation_split,
)

Try to load trained model for this fuzzing campaign, otherwise train it on the full corpus (but keep 10% of the data for validation).

In [None]:
import numpy as np
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.metrics import confusion_matrix

from neuzzpp.models import MLP
from neuzzpp.utils import LRTensorBoard, print_confusion_matrix


def train_model(
    seed_handler,
    random_seed,
    batch_size,
    val_split,
    seeds,
    epochs,
    early_stopping,
    lr,
    n_hidden_neurons,
    fast: bool = False,
):
    # Create data generators
    seed_handler.load_seeds_from_folder()
    seed_handler.split_dataset(random_seed=random_seed)
    training_generator = seed_handler.get_generator(
        batch_size=batch_size, subset="training"
    )
    if val_split > 0.0:
        validation_generator = seed_handler.get_generator(
            batch_size=batch_size, subset="validation"
        )
        monitor_metric = "val_prc"
    else:
        validation_generator = None
        monitor_metric = "prc"

    # Compute class frequencies and weights
    class_weights, initial_bias = seed_handler.get_class_weights()

    # Create training callbacks
    seeds_path = pathlib.Path(seeds)
    model_path = seeds_path.parent / "models"
    callbacks = []
    if not fast:
        model_save = ModelCheckpoint(
            str(model_path / "model.h5"),
            verbose=0,
            save_best_only=True,
            monitor=monitor_metric,
            mode="max",
        )
        callbacks.append(model_save)
    if early_stopping is not None:
        es = EarlyStopping(monitor=monitor_metric, patience=early_stopping, mode="max")
        callbacks.append(es)

    # Create model
    if not fast:
        tb_callback = LRTensorBoard(
            log_dir=str(model_path / "tensorboard"), write_graph=False
        )
        callbacks.append(tb_callback)
    model = MLP(
        input_dim=seed_handler.max_file_size,
        output_dim=seed_handler.max_bitmap_size,
        lr=lr,
        ff_dim=n_hidden_neurons,
        output_bias=initial_bias,
        fast=fast,
    )

    # Fit model
    model.model.fit(
        training_generator,
        steps_per_epoch=np.ceil(seed_handler.training_size / batch_size),
        validation_data=validation_generator,
        validation_steps=None
        if validation_generator is None
        else np.ceil(seed_handler.val_size / batch_size),
        epochs=epochs,
        callbacks=callbacks,
        # class_weight=class_weights,
        verbose=2,
    )

    # Compute confusion matrix on validation data
    if val_split > 0.0 and not fast:
        class_threshold = 0.5  # Classification threshold, for now hard-coded
        val_gen = seed_data_generator(
            *seed_handler.val_set, batch_size, seed_handler.max_file_size
        )
        preds_val = model.model.predict(
            val_gen, steps=np.ceil(seed_handler.val_size / batch_size)
        )
        cm = confusion_matrix(
            seed_handler.val_set[1].flatten(),
            preds_val.flatten() > class_threshold,
            normalize="all",
        )
        print(f"Confusion matrix @{class_threshold}\n")
        print_confusion_matrix(cm, labels=["0", "1"])

    return model.model

In [None]:
import tensorflow as tf

from neuzzpp.models import create_logits_model

try:
    model = tf.keras.models.load_model(model_path / "model.h5")
    data_loader.load_seeds_from_folder()
except IOError:
    logger.warning(
        f"No trained model found at {model_path / 'model.h5'}. Training new model."
    )
    model = train_model(
        data_loader,
        random_seed=random_seed,
        batch_size=batch_size,
        val_split=validation_split,
        seeds=tests_path,
        epochs=n_epochs,
        early_stopping=early_stopping,
        lr=lr,
        n_hidden_neurons=hidden_neurons,
    )
grad_model = create_logits_model(model)

# Gradient precomputation
Randomly select `n_tests` test cases from the corpus for analysis.

In [None]:
if n_tests is not None:
    rng = np.random.default_rng(seed=random_seed)
    tests_indices = rng.choice(
        data_loader.reduced_bitmap.shape[0], n_tests, replace=False
    )
    tests_list = [data_loader.seed_list[i] for i in tests_indices]
    bitmap = data_loader.reduced_bitmap[tests_indices]

For the selected test cases, compute and serialize the gradient value for targeting each edge for mutation.

In [None]:
from tqdm import tqdm

from neuzzpp.data_loaders import get_seed_len, load_normalized_seeds
from neuzzpp.mutations import compute_gradient


def generate_all_grads(seed_list, bitmap, model, mutation_strategy: str = "sign"):
    input_dim = model.input.shape.as_list()[-1]
    grads_folder = seed_list[0].parent.parent / "grads"

    # Compute and store gradient info to file
    for seed_name in tqdm(seed_list):
        # Read seed
        seed = load_normalized_seeds([seed_name], max_len=input_dim)
        len_seed = get_seed_len(seed_name)
        n_keep_vals = min(len_seed, input_dim)

        grad_name = grads_folder / seed_name.name
        if not grad_name.exists():
            with open(grad_name, "a") as f:
                for edge in range(bitmap.shape[-1]):
                    # Compute gradient direction for seed
                    sorting_index, gradient = compute_gradient(
                        model, edge, seed, n_keep_vals, mutation_strategy
                    )

                    # Write gradient info to file
                    sorting_index = [str(el) for el in sorting_index]
                    gradient = [str(int(el)) for el in gradient]
                    f.write(",".join(sorting_index) + "|" + ",".join(gradient) + "\n")
        else:
            logger.warning(f"Found existing gradient file for {grad_name}. Skipping.")

In [None]:
# Compute and store gradients
generate_all_grads(tests_list, bitmap, grad_model)

# Mutation of targeted addresses
Use the previously computed gradients to compute all mutations targetting each test case and possible edge. The mutations are generated using the same strategy as Neuzz, PreFuzz and Neuzz++.

In [None]:
mut_path = grads_path.parent / "mutations"
mut_path.mkdir(parents=True, exist_ok=True)

In [None]:
from neuzzpp.mutations import compute_mutations_success_all

compute_mutations_success_all(model, tests_list, mut_path)

# Analysis of targeted addresses
First, load the selected test cases and get coverage predictions for them from the trained model.

In [None]:
from neuzzpp.data_loaders import load_normalized_seeds
from neuzzpp.models import predict_coverage

tests = load_normalized_seeds(tests_list, max_len=model.input.shape.as_list()[-1])
tests_preds = predict_coverage(model, tests)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_theme()

plt.figure(figsize=(10, 7))
plot = sns.heatmap(
    tests_preds,
    vmin=0,
    vmax=1,
    cmap=plt.cm.get_cmap("flare_r", 2),
    #     yticklabels=[seed_file.stem for seed_file in tests_list],
)

# Customize labels
plot.set_title("Coverage of code edge by test case")
plot.set_xlabel("Edge ID")
plot.set_ylabel("Test ID")
plt.yticks(np.arange(0, n_tests, 5), np.arange(0, n_tests, 5), rotation=0)

# Segment colorbar in legend
colorbar = plot.collections[0].colorbar
colorbar.set_ticks([0.25, 0.75])
colorbar.set_ticklabels(["Not covered", "Covered"])
pass

Load coverage information for each seed and target edge (precomputed in previous section):

In [None]:
# Load coverage counts for mutations
summaries = {cov_file.stem: np.load(cov_file) for cov_file in mut_path.glob("**/*.npy")}
len(summaries)

In [None]:
# Binarize coverage of mutations to "reached" (True) or "not reached" (False)
summaries_binary = {
    test_case: coverage != 0 for test_case, coverage in summaries.items()
}

In [None]:
mutations_coverage = np.stack(
    [np.diag(summaries_binary[test.name]) for test in tests_list],
    axis=0,
)
mutations_coverage.shape

In [None]:
# Include coverage from original test cases (without mutations) as separate value
mutations_coverage = np.where(tests_preds == True, 0.5, mutations_coverage)

Plot heatmap of mutations success per seed and targeted edge:

In [None]:
plt.figure(figsize=(10, 7))
plot = sns.heatmap(
    mutations_coverage,
    vmin=0,
    vmax=1,
    cmap=plt.cm.get_cmap("flare_r", 3),
)

# Customize labels
plot.set_title("Coverage of code edge by test case")
plot.set_xlabel("Targeted edge ID")
plot.set_ylabel("Test ID")
plt.yticks(np.arange(0, n_tests, 5), np.arange(0, n_tests, 5), rotation=0)

# Segment colorbar in legend
colorbar = plot.collections[0].colorbar
colorbar.set_ticks([0.18, 0.5, 0.82])
colorbar.set_ticklabels(["Not covered", "Original coverage", "Mutations coverage"])
pass

In [None]:
# Store computed coverage for future plotting
with open(tests_path.parent / "coverage_plot.npy", "wb") as save_file:
    np.save(save_file, mutations_coverage)