# Baseline Network Protocol Fuzzer
This is based on GNS3. Currently we are going to implement a baseline fuzzer using random bit flips and naive oracles.

## Get the project id and choose the node id we are going to fuzz

In [3]:
import json

def get_project_and_node_id(file_path):
    # Load the JSON data
    try:
        with open(file_path, 'r') as file:
            data = json.load(file)
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None, None
    except json.JSONDecodeError:
        print(f"Error: Failed to decode JSON from {file_path}")
        return None, None

    # Extract project_id
    project_id = data.get('project_id')
    if not project_id:
        print("Error: 'project_id' field not found in the JSON file.")
        return None, None

    # Extract node_id and name fields from topology -> nodes
    topology = data.get('topology', {})
    nodes = topology.get('nodes', [])

    if not nodes:
        print("Error: No nodes found in the 'topology' field of the JSON file.")
        return project_id, None

    # Display node options to the user
    print(f"Project ID: {project_id}")
    print("Available Nodes:")
    node_choices = {node['name']: node['node_id'] for node in nodes if 'node_id' in node and 'name' in node}
    
    if not node_choices:
        print("No valid nodes found with both 'node_id' and 'name'.")
        return project_id, None

    # Prompt user to choose a node by name
    print("\nChoose a node by its name from the list:")
    for name in node_choices.keys():
        print(f"  - {name}")

    chosen_name = input("Enter the name of the node you want to select: ").strip()
    node_id = node_choices.get(chosen_name)

    if not node_id:
        print("Invalid choice. Node name not found.")
        return project_id, None

    return project_id, node_id

# Example usage
file_path = '/home/test/GNS3/projects/test-re/test-re.gns3'
project_id, node_id = get_project_and_node_id(file_path)
if project_id and node_id:
    print(f"\nSelected Project ID: {project_id}")
    print(f"Selected Node ID: {node_id}")


Project ID: 293e3470-c90e-44ee-bad0-c6bdfc8dc919
Available Nodes:

Choose a node by its name from the list:
  - R1
  - PC1
  - PC2



Selected Project ID: 293e3470-c90e-44ee-bad0-c6bdfc8dc919
Selected Node ID: 30467298-38a3-439f-a44b-6c4765ae3680


## Try to register a compute node and start the project but at current stage we can use GUI

In [None]:
import requests
import json

# Server configuration
BASE_URL = "http://localhost:3080"
AUTH_HEADER = {
    "Authorization": "Basic YWRtaW46bldCTnI5TWk3RVRqeGFocUxmQXpNTFVwV3daN0ZJbGxicnViR01LVkVVMlpiZldoeG4wajFycGRKZ0JOdVF4Yw==",
    "User-Agent": "GNS3 QT Client v2.2.49",
    "Content-Type": "application/json",
}

# Compute registration details
compute_data = {
    "host": "localhost",
    "port": 3080,
    "protocol": "http"
}

def register_compute():
    url = f"{BASE_URL}/v2/computes"
    response = requests.post(url, headers=AUTH_HEADER, json=compute_data)
    
    if response.status_code == 201:
        print("Compute registered successfully!")
    else:
        print(f"Failed to register compute. Status code: {response.status_code}, Response: {response.text}")
    return response.json() if response.status_code == 201 else None

def get_computes():
    url = f"{BASE_URL}/v2/computes"
    response = requests.get(url, headers=AUTH_HEADER)
    
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to retrieve computes. Status code: {response.status_code}")
        return []

def access_links(project_id, compute):
    compute_host = compute['host']
    compute_port = compute['port']
    url = f"http://{compute_host}:{compute_port}/v2/projects/{project_id}/links"
    
    response = requests.get(url, headers=AUTH_HEADER)
    
    if response.status_code == 200:
        print("Successfully accessed project links.")
        return response.json()
    else:
        print(f"Failed to access project links. Status code: {response.status_code}")
        return None

# Main script
project_id = "293e3470-c90e-44ee-bad0-c6bdfc8dc919"

# Step 1: Register a new compute
register_compute()

# Step 2: Get available computes
computes = get_computes()
if not computes:
    print("No computes available.")
    exit(1)

# Step 3: List computes and let user select one
print("\nAvailable Computes:")
for i, compute in enumerate(computes, start=1):
    print(f"{i}. Compute ID: {compute['compute_id']}, Name: {compute['name']}")

try:
    choice = int(input("\nSelect a compute by number: ")) - 1
    selected_compute = computes[choice] if 0 <= choice < len(computes) else None
except (ValueError, IndexError): 
    print("Invalid choice.")
    exit(1)

# Step 4: Access the links of the selected project using the selected compute
links = access_links(project_id, selected_compute)
if links is not None:
    print("\nProject Links:")
    print(json.dumps(links, indent=2))


Failed to register compute. Status code: 409, Response: {
    "message": "Invalid auth for server b0e423e4-e770-4c94-b7a6-8932abc29d61",
    "status": 409
}

Available Computes:
1. Compute ID: local, Name: chenlu-ThinkPad-X1-Carbon-5th
2. Compute ID: fe430614-4bec-4eae-bd33-15da25c95b87, Name: http://localhost:3080
3. Compute ID: vm, Name: GNS3 VM (GNS3 VM)
4. Compute ID: c24a1d53-1b17-44dc-b468-abc11447275f, Name: http://localhost:3080
5. Compute ID: b0e423e4-e770-4c94-b7a6-8932abc29d61, Name: http://localhost:3080


In [1]:
import requests
import json

# Server configuration
BASE_URL = "http://localhost:3080"
AUTH_HEADER = {
    "Authorization": "Basic YWRtaW46bldCTnI5TWk3RVRqeGFocUxmQXpNTFVwV3daN0ZJbGxicnViR01LVkVVMlpiZldoeG4wajFycGRKZ0JOdVF4Yw==",
    "User-Agent": "GNS3 QT Client v2.2.49",
    "Content-Type": "application/json",
}

def get_computes():
    url = f"{BASE_URL}/v2/computes"
    response = requests.get(url, headers=AUTH_HEADER)
    
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to retrieve computes. Status code: {response.status_code}")
        return []

def access_links(project_id, compute):
    compute_host = compute['host']
    compute_port = compute['port']
    url = f"http://{compute_host}:{compute_port}/v2/projects/{project_id}/links"
    
    response = requests.get(url, headers=AUTH_HEADER)
    
    if response.status_code == 200:
        print("Successfully accessed project links.")
        return response.json()
    else:
        print(f"Failed to access project links. Status code: {response.status_code}")
        return None

# Main script
project_id = "293e3470-c90e-44ee-bad0-c6bdfc8dc919"

# Step 1: Get available computes
computes = get_computes()
if not computes:
    print("No computes available.")
    exit(1)

# Step 2: List computes and let user select one
print("\nAvailable Computes:")
for i, compute in enumerate(computes, start=1):
    print(f"{i}. Compute ID: {compute['compute_id']}, Name: {compute['name']}")

try:
    # choice = int(input("\nSelect a compute by number: ")) - 1
    choice = 3
    selected_compute = computes[choice] if 0 <= choice < len(computes) else None
except (ValueError, IndexError):
    print("Invalid choice.")
    exit(1)

# Step 3: Access the links of the selected project using the selected compute
links = access_links(project_id, selected_compute)
if links is not None:
    print("\nProject Links:")
    print(json.dumps(links, indent=2))



Available Computes:
1. Compute ID: local, Name: chenlu-ThinkPad-X1-Carbon-5th
2. Compute ID: fe430614-4bec-4eae-bd33-15da25c95b87, Name: http://localhost:3080
3. Compute ID: vm, Name: GNS3 VM (GNS3 VM)
4. Compute ID: c24a1d53-1b17-44dc-b468-abc11447275f, Name: http://localhost:3080
5. Compute ID: b0e423e4-e770-4c94-b7a6-8932abc29d61, Name: http://localhost:3080
Successfully accessed project links.

Project Links:
[
  {
    "filters": {},
    "link_id": "1996e8b1-0b3c-4080-9289-ac65113fd74d",
    "link_style": {},
    "nodes": [
      {
        "adapter_number": 0,
        "label": {
          "rotation": 0,
          "style": "font-family: TypeWriter;font-size: 10.0;font-weight: bold;fill: #000000;fill-opacity: 1.0;",
          "text": "f0/0",
          "x": -6,
          "y": 28
        },
        "node_id": "30467298-38a3-439f-a44b-6c4765ae3680",
        "port_number": 0
      },
      {
        "adapter_number": 0,
        "label": {
          "rotation": 0,
          "style": "fon

## Get the configuration file and set the configuration by ourself

In [4]:
import requests

# Base configuration
AUTH_HEADER = {
    "Authorization": "Basic YWRtaW46bldCTnI5TWk3RVRqeGFocUxmQXpNTFVwV3daN0ZJbGxicnViR01LVkVVMlpiZldoeG4wajFycGRKZ0JOdVF4Yw==",
    "User-Agent": "GNS3 QT Client v2.2.49",
    "Content-Type": "application/json"
}
BASE_URL = "http://172.16.169.129:80"  # Base URL for the remote compute

# Selected Project and Node information
project_id = "293e3470-c90e-44ee-bad0-c6bdfc8dc919"
node_id = "30467298-38a3-439f-a44b-6c4765ae3680"

def start_vpcs_node(node_id):
    """Start a VPCS node."""
    url = f"{BASE_URL}/v2/compute/projects/{project_id}/vpcs/nodes/{node_id}/start"
    response = requests.post(url, headers=AUTH_HEADER)
    if response.status_code == 204:
        print(f"Node {node_id} started successfully.")
    else:
        print(f"Failed to start node {node_id}. Status code: {response.status_code}, Response: {response.text}")

def get_config_file(node_id, config_path):
    """Retrieve a configuration file from the node."""
    url = f"{BASE_URL}/v2/compute/projects/{project_id}/files/project-files/dynamips/{node_id}/configs/{config_path}"
    response = requests.get(url, headers=AUTH_HEADER)
    if response.status_code == 200:
        print(f"Configuration file retrieved successfully:\n{response.text}")
    else:
        print(f"Failed to retrieve configuration file. Status code: {response.status_code}")

def upload_config_file(node_id, config_path, config_data):
    """Upload a configuration file to the node."""
    url = f"{BASE_URL}/v2/compute/projects/{project_id}/files/project-files/dynamips/{node_id}/configs/{config_path}"
    headers = AUTH_HEADER.copy()
    headers["Content-Type"] = "application/octet-stream"
    response = requests.post(url, headers=headers, data=config_data)
    if response.status_code == 201:
        print(f"Configuration file uploaded successfully.")
    else:
        print(f"Failed to upload configuration file. Status code: {response.status_code}, Response: {response.text}")

# Example Usage
# Start the selected node
start_vpcs_node(node_id)

# Retrieve a configuration file for the selected node
config_path = "i1_startup-config.cfg"
get_config_file(node_id=node_id, config_path=config_path)

# Upload a configuration file to the selected node
config_data = "interface Gig0/0\n ip address 192.168.1.1 255.255.255.0\n"  # Example config data
upload_config_file(node_id=node_id, config_path=config_path, config_data=config_data)


Failed to start node 30467298-38a3-439f-a44b-6c4765ae3680. Status code: 404, Response: {
    "message": "Node ID 30467298-38a3-439f-a44b-6c4765ae3680 doesn't exist",
    "status": 404
}
Configuration file retrieved successfully:
!
!
!
!
!
!
!
!
!
!
!
!
!
!
!
!
service timestamps debug datetime msec
service timestamps log datetime msec
no service password-encryption
!
hostname R1
!
ip cef
no ip domain-lookup
no ip icmp rate-limit unreachable
ip tcp synwait 5
no cdp log mismatch duplex
!
line con 0
 exec-timeout 0 0
 logging synchronous
 privilege level 15
 no login
line aux 0
 exec-timeout 0 0
 logging synchronous
 privilege level 15
 no login
!
!
end

Configuration file uploaded successfully.


## Start the capture of the traffic

In [5]:
import requests

# Configuration
BASE_URL = "http://127.0.0.1:3080"  # Localhost address, adjust if needed
AUTH_HEADER = {
    "Authorization": "Basic YWRtaW46bldCTnI5TWk3RVRqeGFocUxmQXpNTFVwV3daN0ZJbGxicnViR01LVkVVMlpiZldoeG4wajFycGRKZ0JOdVF4Yw==",
    "User-Agent": "GNS3 QT Client v2.2.49",
    "Content-Type": "application/json"
}

# Project and Node IDs
project_id = "293e3470-c90e-44ee-bad0-c6bdfc8dc919"
node_id = "30467298-38a3-439f-a44b-6c4765ae3680"

def start_node(project_id, node_id):
    """Start the specified node in the given project."""
    url = f"{BASE_URL}/v2/projects/{project_id}/nodes/{node_id}/start"
    response = requests.post(url, headers=AUTH_HEADER)
    
    if response.status_code == 200:
        print(f"Node {node_id} started successfully.")
        print("Response:", response.json())
    else:
        print(f"Failed to start node {node_id}. Status code: {response.status_code}, Response: {response.text}")

# Run the function
start_node(project_id, node_id)


Node 30467298-38a3-439f-a44b-6c4765ae3680 started successfully.
Response: {'command_line': None, 'compute_id': 'vm', 'console': 5000, 'console_auto_start': False, 'console_host': '172.16.169.129', 'console_type': 'telnet', 'custom_adapters': [], 'first_port_name': None, 'height': 45, 'label': {'rotation': 0, 'style': 'font-family: TypeWriter;font-size: 10.0;font-weight: bold;fill: #000000;fill-opacity: 1.0;', 'text': 'R1', 'x': 21, 'y': -25}, 'locked': False, 'name': 'R1', 'node_directory': '/opt/gns3/projects/293e3470-c90e-44ee-bad0-c6bdfc8dc919/project-files/dynamips/30467298-38a3-439f-a44b-6c4765ae3680', 'node_id': '30467298-38a3-439f-a44b-6c4765ae3680', 'node_type': 'dynamips', 'port_name_format': 'Ethernet{0}', 'port_segment_size': 0, 'ports': [{'adapter_number': 0, 'data_link_types': {'Ethernet': 'DLT_EN10MB'}, 'link_type': 'ethernet', 'name': 'FastEthernet0/0', 'port_number': 0, 'short_name': 'f0/0'}, {'adapter_number': 0, 'data_link_types': {'Ethernet': 'DLT_EN10MB'}, 'link_typ

In [9]:
import requests

# Base configuration
BASE_URL = "http://127.0.0.1:3080"  # Localhost address, adjust if needed
AUTH_HEADER = {
    "Authorization": "Basic YWRtaW46bldCTnI5TWk3RVRqeGFocUxmQXpNTFVwV3daN0ZJbGxicnViR01LVkVVMlpiZldoeG4wajFycGRKZ0JOdVF4Yw==",
    "User-Agent": "GNS3 QT Client v2.2.49",
    "Content-Type": "application/json"
}

# Project and link information
project_id = "293e3470-c90e-44ee-bad0-c6bdfc8dc919"
link_id = "1996e8b1-0b3c-4080-9289-ac65113fd74d"  # Link ID for which to start capture

def start_capture(project_id, link_id):
    """Start packet capture on a specified link in the given project."""
    url = f"{BASE_URL}/v2/projects/{project_id}/links/{link_id}/start_capture"
    data = {
        "capture_file_name": "R1_FastEthernet00_to_PC1_Ethernet0.pcap",
        "data_link_type": "DLT_EN10MB"  # Ethernet data link type
    }
    response = requests.post(url, headers=AUTH_HEADER, json=data)
    
    if response.status_code == 201:
        print("Capture started successfully.")
        print("Response data:", response.json())
    else:
        print(f"Failed to start capture. Status code: {response.status_code}, Response: {response.text}")

# Run the function
start_capture(project_id, link_id)


Capture started successfully.
Response data: {'capture_compute_id': 'vm', 'capture_file_name': 'R1_FastEthernet00_to_PC1_Ethernet0.pcap', 'capture_file_path': '/home/test/GNS3/projects/test-re/project-files/captures/R1_FastEthernet00_to_PC1_Ethernet0.pcap', 'capturing': True, 'filters': {}, 'link_id': '1996e8b1-0b3c-4080-9289-ac65113fd74d', 'link_style': {}, 'link_type': 'ethernet', 'nodes': [{'adapter_number': 0, 'label': {'rotation': 0, 'style': 'font-family: TypeWriter;font-size: 10.0;font-weight: bold;fill: #000000;fill-opacity: 1.0;', 'text': 'f0/0', 'x': -6, 'y': 28}, 'node_id': '30467298-38a3-439f-a44b-6c4765ae3680', 'port_number': 0}, {'adapter_number': 0, 'label': {'rotation': 0, 'style': 'font-family: TypeWriter;font-size: 10.0;font-weight: bold;fill: #000000;fill-opacity: 1.0;', 'text': 'e0', 'x': 72, 'y': 23}, 'node_id': 'bdbe7b33-1fb6-4c1c-9da6-fca119cb5bef', 'port_number': 0}], 'project_id': '293e3470-c90e-44ee-bad0-c6bdfc8dc919', 'suspend': False}


## Baseline Random Bit Flip generating input
sudo apt-get install graphviz graphviz-devel

In [4]:
pip install fuzzingbook

[33mDEPRECATION: Loading egg at /home/test/Routing-Protocol-Fuzzing/GNS3-Ubuntu-Setup/venv4gns3/lib/python3.12/site-packages/pygraphviz-2.0b0.dev0-py3.12-linux-x86_64.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330[0m[33m
Note: you may need to restart the kernel to use updated packages.


In [1]:
import random
from typing import Tuple, List, Callable, Set, Any
from fuzzingbook.Fuzzer import Fuzzer


In [12]:
class base():
    def tbo():
        x = 9
        print(x)
    def complex(x,y):
        print(x+y)

class son(base):
    def tbo():
        pass
    def complex(x, y):
        return super().complex(y)
    
import fuzzingbook

In [None]:
!pip show fuzzingbook

[33mDEPRECATION: Loading egg at /home/test/Routing-Protocol-Fuzzing/GNS3-Ubuntu-Setup/venv4gns3/lib/python3.12/site-packages/pygraphviz-2.0b0.dev0-py3.12-linux-x86_64.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330[0m[33m
[0mName: fuzzingbook
Version: 1.2.1
Summary: Code for 'The Fuzzing Book' (https://www.fuzzingbook.org/)
Home-page: https://www.fuzzingbook.org/
Author: Andreas Zeller et al.
Author-email: zeller@cispa.de
License: 
Location: /home/test/Routing-Protocol-Fuzzing/GNS3-Ubuntu-Setup/venv4gns3/lib/python3.12/site-packages
Requires: beautifulsoup4, cargo, diff-match-patch, easyplotly, enforce, graphviz, ipython, isla-solver, isla-solver, lxml, Markdown, markdown, matplotlib, multiprocess, nbconvert, nbformat, networkx, notedown, numpy, pandas, pydriller, Pygments, pygraphviz, pyparsing, python-magic, requests, scikit-learn, sc

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Code is modifed/simplified based on the Mutation-Based Fuzzer in the FuzzingBook
# "Mutation-Based Fuzzing" - a chapter of "The Fuzzing Book"
# Web site: https://www.fuzzingbook.org/html/MutationFuzzer.html

import random
from typing import Tuple, List, Callable, Set, Any

from fuzzingbook.Fuzzer import Fuzzer

# List of mutation operators
def delete_random_character(s: str) -> str:
    """Returns s with a random character deleted"""
    if s == "":
        return s

    pos = random.randint(0, len(s) - 1)
    # print("Deleting", repr(s[pos]), inp"at", pos)
    return s[:pos] + s[pos + 1:]

def insert_random_character(s: str) -> str:
    """Returns s with a random character inserted"""
    pos = random.randint(0, len(s))
    random_character = chr(random.randrange(32, 127))
    # print("Inserting", repr(random_character), "at", pos)
    return s[:pos] + random_character + s[pos:]

def flip_random_character(s):
    """Returns s with a random bit flipped in a random position"""
    if s == "":
        return s

    pos = random.randint(0, len(s) - 1)
    c = s[pos]
    bit = 1 << random.randint(0, 6)
    new_c = chr(ord(c) ^ bit)
    # print("Flipping", bit, "in", repr(c) + ", giving", repr(new_c))
    return s[:pos] + new_c + s[pos + 1:]

class MyMutationFuzzer(Fuzzer):
    """Base class for mutational fuzzing"""

    def __init__(self, seed: List[str],
                 min_mutations: int = 2,
                 max_mutations: int = 10) -> None:
        """Constructor.
        `seed` - a list of (input) strings to mutate.
        `min_mutations` - the minimum number of mutations to apply.
        `max_mutations` - the maximum number of mutations to apply.
        """
        self.seed = seed
        self.min_mutations = min_mutations
        self.max_mutations = max_mutations
        self.reset()

    def reset(self) -> None:
        """Set population to initial seed.
        To be overloaded in subclasses."""
        self.population = self.seed
        self.seed_index = 0

    def mutate(self, inp: str) -> str:
        """Return s with a random mutation applied"""
        mutators = [
	#    delete_random_character,
	#    insert_random_character,
	   flip_random_character
        ]
        mutator = random.choice(mutators)
        return mutator(inp)

    def create_candidate(self) -> str:
        """Create a new candidate by mutating a population member"""
        candidate = random.choice(self.population)
        trials = random.randint(self.min_mutations, self.max_mutations)
        for i in range(trials):
            candidate = self.mutate(candidate)
        return candidate

    def add_seed(self, seed: str) -> None:
        self.population.append(seed)
        print("new seed has been added to the corpus")

    def fuzz(self) -> str:
        if self.seed_index < len(self.seed):
            # Still seeding
            self.inp = self.seed[self.seed_index]
            self.seed_index += 1
        else:
            # Mutating
            self.inp = self.create_candidate()
        return self.inp

In [None]:
print(Fuzzer.__init__.__doc__)
from fuzzingbook.Fuzzer import RandomFuzzer
print(RandomFuzzer.__init__.__doc__)
print(Fuzzer.reset)

Constructor
Produce strings of `min_length` to `max_length` characters
           in the range [`char_start`, `char_start` + `char_range`)


In [8]:
import glob
import os
import random
from typing import List
from fuzzingbook.Fuzzer import Fuzzer

# Define the directory containing .cfg files
directory = "/home/test/Routing-Protocol-Fuzzing/Fuzz-Material/cfgSeedCorpus"

# List to hold the contents of each .cfg file
cfg_contents = []

# Read each .cfg file and add its content to cfg_contents
for filepath in glob.glob(os.path.join(directory, "*.cfg")):
    with open(filepath, 'r') as file:
        content = file.read()
        cfg_contents.append(content)

# Confirm the number of files read
print("Total .cfg files read:", len(cfg_contents))

# Define flip_random_character as required by the fuzzer
def flip_random_character(s):
    """Returns s with a random bit flipped in a random position"""
    if s == "":
        return s

    pos = random.randint(0, len(s) - 1)
    c = s[pos]
    bit = 1 << random.randint(0, 6)
    new_c = chr(ord(c) ^ bit)
    return s[:pos] + new_c + s[pos + 1:]

class MyBaselineFuzzer(Fuzzer):
    """Base class for mutational fuzzing"""

    def __init__(self, seed: List[str], min_mutations: int = 2, max_mutations: int = 10) -> None:
        """Constructor."""
        self.seed = seed
        self.min_mutations = min_mutations
        self.max_mutations = max_mutations
        self.reset()

    def reset(self) -> None:
        """Set population to initial seed."""
        self.population = self.seed
        self.seed_index = len(self.seed)

    def mutate(self, inp: str) -> str:
        """Apply a random mutation to inp"""
        mutators = [flip_random_character]
        mutator = random.choice(mutators)
        return mutator(inp)

    def create_candidate(self) -> str:
        """Create a new candidate by mutating a population member"""
        # candidate = random.choice(self.population)
        candidate = self.population[0]
        trials = random.randint(self.min_mutations, self.max_mutations)
        for _ in range(trials):
            candidate = self.mutate(candidate)
        return candidate

    def fuzz(self) -> str:
        """Generate a new fuzzed input"""
        if self.seed_index < len(self.seed):
            self.inp = self.seed[self.seed_index]
            self.seed_index += 1
        else:
            self.inp = self.create_candidate()
        return self.inp

# Instantiate MyBaselineFuzzer with cfg_contents as seed
fuzzer = MyBaselineFuzzer(seed=cfg_contents)

# Call fuzzer.fuzz() 15 times and print each output
for i in range(3):
    print(f"Fuzzed output {i+1}: {fuzzer.fuzz()}")


Total .cfg files read: 11
Fuzzed output 1: !
!
!
service timestamps debug datetime msec
service timestamps log datetime msec
no service password-encryption
!
hostname R10
!
i` cef
n/ ip domain-lookup
no ip icmp rate-limit unreachable
ip tcp synwait 5
no cdp log mismatch duplex
!
line con 0
 exec-timeout 0 0
 logging synchronous
 privilege level 15
 no login
line au8 0
 exec-tiMeout 0 0
 logging synchronous
 privilege level 95
 no login
!
!
end

Fuzzed output 2: !
!
!
serviae timestamps debug datetime m{ec
service timestamps log datetime msec
no {ervice password-encryption
!
hostname R11
!
ip cef
no ip domain-lookup
no ip icmp$rate-limit unreachable
mp tcp synwait 5
no cdp log mismatch duplex
!
line con 0
 exec-timeout 0$0
 logging synchronous
 privilege level 15
 no login
line aux 0
 exec-timeout 0 0
 logging synchronous
 privilege leval 15
 no login
!
!
end

Fuzzed output 3: !
!
!
service timestamps debug datetime msec
service timestamps log datetime msec
no service$password-encryptio

In [1]:
import requests
import random
from typing import List
from fuzzingbook.Fuzzer import Fuzzer

# Base configuration
AUTH_HEADER = {
    "Authorization": "Basic YWRtaW46bldCTnI5TWk3RVRqeGFocUxmQXpNTFVwV3daN0ZJbGxicnViR01LVkVVMlpiZldoeG4wajFycGRKZ0JOdVF4Yw==",
    "User-Agent": "GNS3 QT Client v2.2.49",
    "Content-Type": "application/json"
}
BASE_URL = "http://172.16.169.129:80"  # Base URL for the remote compute

# Selected Project and Node information
project_id = "293e3470-c90e-44ee-bad0-c6bdfc8dc919"
node_id = "30467298-38a3-439f-a44b-6c4765ae3680"

def get_config_file(node_id, config_path):
    """Retrieve a configuration file from the node."""
    url = f"{BASE_URL}/v2/compute/projects/{project_id}/files/project-files/dynamips/{node_id}/configs/{config_path}"
    response = requests.get(url, headers=AUTH_HEADER)
    if response.status_code == 200:
        print(f"Configuration file retrieved successfully.")
        return response.text
    else:
        print(f"Failed to retrieve configuration file. Status code: {response.status_code}")
        return None

def upload_config_file(node_id, config_path, config_data):
    """Upload a configuration file to the node."""
    url = f"{BASE_URL}/v2/compute/projects/{project_id}/files/project-files/dynamips/{node_id}/configs/{config_path}"
    headers = AUTH_HEADER.copy()
    headers["Content-Type"] = "application/octet-stream"
    response = requests.post(url, headers=headers, data=config_data)
    if response.status_code == 201:
        print(f"Configuration file uploaded successfully.")
    else:
        print(f"Failed to upload configuration file. Status code: {response.status_code}, Response: {response.text}")

# Define mutation function
def flip_random_character(s: str) -> str:
    """Returns s with a random bit flipped in a random position"""
    if s == "":
        return s
    pos = random.randint(0, len(s) - 1)
    c = s[pos]
    bit = 1 << random.randint(0, 6)
    new_c = chr(ord(c) ^ bit)
    return s[:pos] + new_c + s[pos + 1:]

class MyBaselineFuzzer(Fuzzer):
    """Base class for mutational fuzzing"""

    def __init__(self, seed: List[str], min_mutations: int = 2, max_mutations: int = 10) -> None:
        self.seed = seed
        self.min_mutations = min_mutations
        self.max_mutations = max_mutations
        self.reset()

    def reset(self) -> None:
        self.population = self.seed
        self.seed_index = 0

    def mutate(self, inp: str) -> str:
        mutators = [flip_random_character]
        mutator = random.choice(mutators)
        return mutator(inp)

    def create_candidate(self) -> str:
        candidate = random.choice(self.population)
        trials = random.randint(self.min_mutations, self.max_mutations)
        for _ in range(trials):
            candidate = self.mutate(candidate)
        return candidate

    def fuzz(self) -> str:
        if self.seed_index < len(self.seed):
            self.inp = self.seed[self.seed_index]
            self.seed_index += 1
        else:
            self.inp = self.create_candidate()
        return self.inp

# Main execution
config_path = "i1_startup-config.cfg"
original_config = get_config_file(node_id=node_id, config_path=config_path)

if original_config:
    # Initialize fuzzer with the original configuration as the seed
    fuzzer = MyBaselineFuzzer(seed=[original_config])

    # Generate and upload 15 mutated configurations
    for i in range(15):
        mutated_config = fuzzer.fuzz()
        print(f"\nMutated configuration {i+1}:\n{mutated_config}\n")
        upload_config_file(node_id=node_id, config_path=config_path, config_data=mutated_config)


Configuration file retrieved successfully.

Mutated configuration 1:
!
!
!
!
!
!
!
!
!
!
!
!
!
!
!
!
service timestamps debug datetime msec
service timestamps log datetime msec
no service password-encryption
!
hostname R1
!
ip cef
no ip domain-lookup
no ip icmp rate-limit unreachable
ip tcp synwait 5
no cdp log mismatch duplex
!
line con 0
 exec-timeout 0 0
 logging synchronous
 privilege level 15
 no login
line aux 0
 exec-timeout 0 0
 logging synchronous
 privilege level 15
 no login
!
!
end


Configuration file uploaded successfully.

Mutated configuration 2:
!
!
!
!
!
!
!
!
!
!
!
!
!
!
!
!
service timestamps debug datetime msec
service timestamps log datetime msec
no service password-encryption
!
hostname R
!
ip cef
no ip domain-lookup
no ip icmp rate-limit unreachable
ip tcp synuait 4
no cdp log mismatch duplex
!
line con 0
 exec-timeout 0 0
 logging synchronous
 privilege leved 15
 no lofin
line aux 0
 exec-timeout 0 0
 logging 3ynchronous
 privilege level 15
 no login
!
!
end



In [3]:
import time
import re

# Path to the pcap file
file_path = "/home/test/GNS3/projects/multi-router-project/project-files/captures/R2_FastEthernet00_to_R1_FastEthernet00-1107.pcap"

# Error pattern to search for (adjust as needed)
error_pattern = re.compile(b"error", re.IGNORECASE)  # Note the 'b' prefix for a byte pattern

def tail_f(filename):
    """Simulate 'tail -f' to monitor new bytes added to a binary file."""
    with open(filename, "rb") as f:  # Open file in binary mode
        # Move to the end of the file
        f.seek(0, 2)
        
        while True:
            line = f.readline()
            if not line:
                # Sleep briefly and wait for new data
                time.sleep(0.5)
                continue

            # Check if the line contains the error pattern
            if error_pattern.search(line):
                print("Error found:", line.strip())

# Start monitoring the file
print(f"Monitoring {file_path} for errors...")
tail_f(file_path)


Monitoring /home/test/GNS3/projects/multi-router-project/project-files/captures/R2_FastEthernet00_to_R1_FastEthernet00-1107.pcap for errors...


KeyboardInterrupt: 