# Laborübung zum Fuzz-Testing
Im Rahmen der Studienarbeit "Fuzzing-Werkzeuge zum Security-Test" an der DHBW Stuttgart sollte eine Laborübung zum Thema Fuzz-Testing erstellt werden. Die Laborübung soll die Studierenden in die Grundlagen von Fuzz-Testing einführen und sie dazu anleiten, selbstständig eine Website auf Sicherheitslücken zu testen durch die Anwendung von Grammatik-basiertem Fuzzing.

Folgende Installationsschritte sind notwendig, um dieses Notebook zu nutzen:  
`pip install ipython selenium multiprocess`


In [1]:
# Benötigte Imports
import os
import sqlite3
import urllib.parse
import sys
import traceback
import time
import requests
import copy
import subprocess
import re
import random
import string

from optparse import Option
from urllib.parse import urljoin #, urlsplit
from typing import Any, Optional, NoReturn, Tuple, List, Union, Dict, Callable
from IPython.display import display, IFrame
from http.server import HTTPServer, BaseHTTPRequestHandler, HTTPStatus
from multiprocess import Queue, Process

## Setup Webserver

### Methode zum Rendern der HTML Seiten

In [2]:
# HTML() behaves like IPython.core.display.HTML(); but if png is True or the environment
# variable RENDER_HTML is set, it converts the HTML into a PNG image.
# This is useful for producing derived formats without HTML support (LaTeX/PDF, Word, ...)
# TODO klären, ob wir dann einfach nur IPython.core.display.HTML(); nutzen sollen

firefox = None

def HTML(data: Optional[str] = None, 
         url: Optional[str] = None, 
         filename: Optional[str] = None, 
         png: bool = False,
         headless: bool = True,
         zoom: float = 2.0) -> Any:

    if not png and 'RENDER_HTML' not in os.environ:
        # Standard behavior
        import IPython.core.display
        return IPython.core.display.HTML(data=data, url=url, filename=filename)

    # Import only as needed; avoids unnecessary dependencies
    from selenium import webdriver
    from selenium.webdriver.firefox.options import Options
    from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
    from IPython.core.display import Image
    import tempfile

    # Get a webdriver
    global firefox
    if firefox is None:
        options = webdriver.FirefoxOptions()
        options.headless = headless  # type: ignore
        options.set_preference("layout.css.devPixelsPerPx", repr(zoom))
        firefox = webdriver.Firefox(options=options)

    # Create a URL argument
    if data is not None:
        has_html = data.find('<html')
        with tempfile.NamedTemporaryFile(mode='wb', suffix='.html') as fp:
            if has_html:
                fp.write(data.encode('utf8'))
            else:
                fp.write(('<html>' + data + '</html>').encode('utf8'))
            fp.flush()
            return HTML(filename=fp.name, png=True)

    if filename is not None:
        return HTML(url='file://' + filename, png=True)

    assert url is not None

    # Render URL as PNG
    firefox.get(url)
    return Image(firefox.get_screenshot_as_png())

### Setup der HTML-Skripts für die Seiten

In [3]:
# Items, die Webstore anbietet
DHBW_MERCH = {
    "tshirtmennavy": "DHBW T-Shirt Navy Men",
    "tshirtwomennavy": "DHBW T-Shirt Navy Women",
    "tshirtmenwhite": "DHBW T-Shirt White Men",
    "tshirtwomenwhite": "DHBW T-Shirt White Women",
    "hoodienavy": "DHBW Hoodie Navy",
    "hoodiegrey": "DHBW Hoodie Grey",
}

with open("html_files/order_form.html", "r", encoding="utf-8") as f:
    html_order_form = f.read()

with open("html_files/terms_and_cond.html", "r", encoding="utf-8") as f:
    html_terms_and_cond = f.read()

with open("html_files/order_received.html", "r", encoding="utf-8") as f:
    html_order_received = f.read()

In [4]:
HTML(html_order_form)

0,1
Name:,E-Mail:
Ort:,Postleitzahl:


In [5]:
HTML(html_terms_and_cond)

In [6]:
HTML(html_order_received.format(item_name="One FuzzingBook Rotary Hammer",
                                name="Jane Doe",
                                email="doe@example.com",
                                city="Seattle",
                                zip="98104"))

### Setup Datenbank

In [7]:
ORDERS_DB = "orders.db"

In [8]:
def init_db():
    if os.path.exists(ORDERS_DB):
        os.remove(ORDERS_DB)

    db_connection = sqlite3.connect(ORDERS_DB)
    db_connection.execute("DROP TABLE IF EXISTS orders")
    db_connection.execute("CREATE TABLE orders "
                          "(item text, name text, email text, "
                          "city text, zip text)")
    db_connection.commit()

    return db_connection

In [9]:
#TODO wird nie geschlossen

db = init_db()

#### Interaktion mit DB
Eintrag hinzufügen:  
```
db.execute("INSERT INTO orders " +
           "VALUES ('lockset', 'Walter White', "
           "'white@jpwynne.edu', 'Albuquerque', '87101')")
db.commit() 
```  
Eintrag löschen:  
```
db.execute("DELETE FROM orders WHERE name = 'Walter White'")
db.commit()
```


### Handle HTTP requests

In [10]:
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
    """A simple HTTP server"""
    pass

    def do_GET(self):
        try:
            # print("GET " + self.path)
            if self.path == "/":
                self.send_order_form()
            elif self.path.startswith("/order"):
                self.handle_order()
            elif self.path.startswith("/terms"):
                self.send_terms_and_conditions()
            else:
                self.not_found()
        except Exception:
            self.internal_server_error()

class SimpleHTTPRequestHandler(SimpleHTTPRequestHandler):
    def send_order_form(self):
        self.send_response(HTTPStatus.OK, "Place your order")
        self.send_header("Content-type", "text/html")
        self.end_headers()
        self.wfile.write(html_order_form.encode("utf8"))

    def send_terms_and_conditions(self):
        self.send_response(HTTPStatus.OK, "Terms and Conditions")
        self.send_header("Content-type", "text/html")
        self.end_headers()
        self.wfile.write(html_terms_and_cond.encode("utf8"))

    def get_field_values(self):
        # Note: this fails to decode non-ASCII characters properly
        query_string = urllib.parse.urlparse(self.path).query

        # fields is { 'item': ['tshirt'], 'name': ['Jane Doe'], ...}
        fields = urllib.parse.parse_qs(query_string, keep_blank_values=True)

        values = {}
        for key in fields:
            values[key] = fields[key][0]

        return values
    
    def handle_order(self):
        values = self.get_field_values()
        self.store_order(values)
        self.send_order_received(values)

    def store_order(self, values):
        db = sqlite3.connect(ORDERS_DB)
        # The following should be one line
        sql_command = "INSERT INTO orders VALUES ('{item}', '{name}', '{email}', '{city}', '{zip}')".format(**values)
        self.log_message("%s", sql_command)
        db.executescript(sql_command)
        db.commit()

    def send_order_received(self, values):
        # Should use html.escape()
        values["item_name"] = DHBW_MERCH[values["item"]]
        confirmation = html_order_received.format(**values).encode("utf8")

        self.send_response(HTTPStatus.OK, "Order received")
        self.send_header("Content-type", "text/html")
        self.end_headers()
        self.wfile.write(confirmation)

Andere Requests sind möglich, müssen aber implementiert werden.  

### Error Handling

In [11]:
with open("html_files/not_found.html", "r", encoding="utf-8") as f:
    html_not_found = f.read()

with open("html_files/internal_server_error.html", "r", encoding="utf-8") as f:
    html_internal_server_error = f.read()

In [12]:
HTML(html_not_found)

In [13]:
HTML(html_internal_server_error)

In [14]:
class SimpleHTTPRequestHandler(SimpleHTTPRequestHandler):
    def not_found(self):
        self.send_response(HTTPStatus.NOT_FOUND, "Not found")

        self.send_header("Content-type", "text/html")
        self.end_headers()

        message = html_not_found
        self.wfile.write(message.encode("utf8"))

    def internal_server_error(self):
        self.send_response(HTTPStatus.INTERNAL_SERVER_ERROR, "Internal Error")

        self.send_header("Content-type", "text/html")
        self.end_headers()

        exc = traceback.format_exc()
        self.log_message("%s", exc.strip())

        message = html_internal_server_error.format(error_message=exc)
        self.wfile.write(message.encode("utf8"))

### Server Messages
Server-Nachrichten sollen als Logs gesammelt werden. Zum Vereinfachen werden sie abgewandelt angezeigt und in einer Queue gespeichert.

#### Nachrichten stylen

In [15]:
# Check for rich output
def rich_output() -> bool:
    try:
        get_ipython()  # type: ignore
        rich = True
    except NameError:
        rich = False

    return rich

# Escaping unicode characters into ASCII for user-facing strings
def unicode_escape(s: str, error: str = 'backslashreplace') -> str:
    def ascii_chr(byte: int) -> str:
        if 0 <= byte <= 127:
            return chr(byte)
        return r"\x%02x" % byte

    bytes = s.encode('utf-8', error)
    return "".join(map(ascii_chr, bytes))

# Same, but escaping unicode only if output is not a terminal
def terminal_escape(s: str) -> str:
    if rich_output():
        return s
    return unicode_escape(s)

#### Queue erstellen

In [16]:
HTTPD_MESSAGE_QUEUE = Queue()

def display_httpd_message(message: str) -> None:
    if rich_output():
        display(
            HTML(
                '<pre style="background: NavajoWhite;">' +
                message +
                "</pre>"))
    else:
        print(terminal_escape(message))

def print_httpd_messages():
    while not HTTPD_MESSAGE_QUEUE.empty():
        message = HTTPD_MESSAGE_QUEUE.get()
        display_httpd_message(message)

def clear_httpd_messages() -> None:
    while not HTTPD_MESSAGE_QUEUE.empty():
        HTTPD_MESSAGE_QUEUE.get()

In [17]:
class SimpleHTTPRequestHandler(SimpleHTTPRequestHandler):
    def log_message(self, format: str, *args) -> None:
        message = ("%s - - [%s] %s\n" %
                   (self.address_string(),
                    self.log_date_time_string(),
                    format % args))
        HTTPD_MESSAGE_QUEUE.put(message)

### Running the Server

In [18]:
def webbrowser(url: str, mute: bool = False) -> str:
    """Download and return the http/https resource given by the URL"""

    try:
        r = requests.get(url)
        contents = r.text
    finally:
        if not mute:
            print_httpd_messages()
        else:
            clear_httpd_messages()

    return contents

In [19]:
def run_httpd_forever(handler_class: type) -> NoReturn:
    host = "0.0.0.0"  # IP
    localhost = "localhost"
    for port in range(8800, 9000):
        httpd_address = (host, port)

        try:
            httpd = HTTPServer(httpd_address, handler_class)
            break
        except OSError:
            continue

    httpd_url = "http://" + localhost + ":" + repr(port)
    HTTPD_MESSAGE_QUEUE.put(httpd_url)
    httpd.serve_forever()

In [20]:
def start_httpd(handler_class: type = SimpleHTTPRequestHandler) \
        -> Tuple[Process, str]:
    clear_httpd_messages()

    httpd_process = Process(target=run_httpd_forever, args=(handler_class,))
    httpd_process.start()

    httpd_url = HTTPD_MESSAGE_QUEUE.get()
    return httpd_process, httpd_url

httpd_process, httpd_url = start_httpd()
print(httpd_url)

http://localhost:8800


## Crawling des Webservers
Dadurch werden alle Input-Felder der Website ermittelt und nutzbar für das Fuzzing gemacht.

## Beispiel-Interaktion mit dem Webserver
Diese Sektion bietet einen Einblick in den Umgang mit dem Webserver von einem Python-Programm aus.

In [21]:
# get Browser access
IFrame(httpd_url, '100%', 230)

# retrieve messages produced by the HTTP server
print_httpd_messages()

# interact with database
print(db.execute("SELECT * FROM orders").fetchall())

db.execute("DELETE FROM orders")
db.commit()

# retieve the home page
contents = webbrowser(httpd_url)
HTML(contents)

# place an order
urljoin(httpd_url, "/order?foo=bar")
contents = webbrowser(urljoin(httpd_url, "/order?item=tshirtwomennavy&name=Jane+Doe&email=doe%40example.com&city=Seattle&zip=98104"))

HTML(contents)
print(db.execute("SELECT * FROM orders").fetchall())

# test error messages
HTML(webbrowser(urljoin(httpd_url, "/some/other/path")))

[]


[('tshirtwomennavy', 'Jane Doe', 'doe@example.com', 'Seattle', '98104')]


## Fuzzing
Die folgenden Funktionen werden für das Erstellen der Grammatik verwendet.

In [22]:
# cgi_encode() to encode strings for URLs
def cgi_encode(s: str, do_not_encode: str = "") -> str:
    ret = ""
    for c in s:
        if (c in string.ascii_letters or c in string.digits
                or c in "$-_.+!*'()," or c in do_not_encode):
            ret += c
        elif c == ' ':
            ret += '+'
        else:
            ret += "%%%02x" % ord(c)
    return ret

Expansion = Union[str, Tuple[str, Option]]

def crange(character_start: str, character_end: str) -> List[Expansion]:
    return [chr(i)
            for i in range(ord(character_start), ord(character_end) + 1)]

### mit erwarteten Inputs
In dieser Sektion wird das Fuzzing mit erwarteten Inputs durchgeführt.

In [23]:
# create grammar
RE_NONTERMINAL = re.compile(r'(<[^<> ]*>)')

Grammar = Dict[str, List[Expansion]]

DerivationTree = Tuple[str, Optional[List[Any]]]

Outcome = str

def is_nonterminal(s):
    return RE_NONTERMINAL.match(s)

def all_terminals(tree: DerivationTree) -> str:
    (symbol, children) = tree
    if children is None:
        # This is a nonterminal symbol not expanded yet
        return symbol

    if len(children) == 0:
        # This is a terminal symbol
        return symbol

    # This is an expanded symbol:
    # Concatenate all terminal symbols from all children
    return ''.join([all_terminals(c) for c in children])

class Runner:
    """Base class for testing inputs."""

    # Test outcomes
    PASS = "PASS"
    FAIL = "FAIL"
    UNRESOLVED = "UNRESOLVED"

    def __init__(self) -> None:
        """Initialize"""
        pass

    def run(self, inp: str) -> Any:
        """Run the runner with the given input"""
        return (inp, Runner.UNRESOLVED)

class PrintRunner(Runner):
    """Simple runner, printing the input."""

    def run(self, inp) -> Any:
        """Print the given input"""
        print(inp)
        return (inp, Runner.UNRESOLVED)

class Fuzzer:
    """Base class for fuzzers."""

    def __init__(self) -> None:
        """Constructor"""
        pass

    def fuzz(self) -> str:
        """Return fuzz input"""
        return ""

    def run(self, runner: Runner = Runner()) \
            -> Tuple[subprocess.CompletedProcess, Outcome]:
        """Run `runner` with fuzz input"""
        return runner.run(self.fuzz())

    def runs(self, runner: Runner = PrintRunner(), trials: int = 10) \
            -> List[Tuple[subprocess.CompletedProcess, Outcome]]:
        """Run `runner` with fuzz input, `trials` times"""
        return [self.run(runner) for i in range(trials)]

START_SYMBOL = "<start>"

In [24]:
def exp_string(expansion: Expansion) -> str:
    """Return the string to be expanded"""
    if isinstance(expansion, str):
        return expansion
    return expansion[0]

def expansion_to_children(expansion: Expansion) -> List[DerivationTree]:
    # print("Converting " + repr(expansion))
    # strings contains all substrings -- both terminals and nonterminals such
    # that ''.join(strings) == expansion

    expansion = exp_string(expansion)
    assert isinstance(expansion, str)

    if expansion == "":  # Special case: epsilon expansion
        return [("", [])]

    strings = re.split(RE_NONTERMINAL, expansion)
    return [(s, None) if is_nonterminal(s) else (s, [])
            for s in strings if len(s) > 0]


class GrammarFuzzer:
    """Produce strings from grammars efficiently, using derivation trees."""

    def __init__(self,
                 grammar,
                 start_symbol='S',
                 min_nonterminals=0,
                 max_nonterminals=10,
                 disp=False,
                 log=False) -> None:
        """Produce strings from `grammar`, starting with `start_symbol`.
        If `min_nonterminals` or `max_nonterminals` is given, use them as limits 
        for the number of nonterminals produced.  
        If `disp` is set, display the intermediate derivation trees.
        If `log` is set, show intermediate steps as text on standard output."""
        self.grammar = grammar
        self.start_symbol = start_symbol
        self.min_nonterminals = min_nonterminals
        self.max_nonterminals = max_nonterminals
        self.disp = disp
        self.log = log

    def expand_node_max_cost(self, node):
        if self.log:
            print("Expanding", all_terminals(node), "at maximum cost")
        return self.expand_node_by_cost(node, max)

    def possible_expansions(self, node):
        (symbol, children) = node
        if children is None:
            return 1
        return sum(self.possible_expansions(c) for c in children)

    def any_possible_expansions(self, node):
        (symbol, children) = node
        if children is None:
            return True
        return any(self.any_possible_expansions(c) for c in children)

    def expand_node_randomly(self, node):
        """Choose a random expansion for `node` and return it"""
        (symbol, children) = node
        assert children is None
        if self.log:
            print("Expanding", all_terminals(node), "randomly")
        expansions = self.grammar[symbol]
        children_alternatives = [
            self.expansion_to_children(expansion) for expansion in expansions
        ]
        index = self.choose_node_expansion(node, children_alternatives)
        chosen_children = children_alternatives[index]
        chosen_children = self.process_chosen_children(chosen_children,
                                                       expansions[index])
        return (symbol, chosen_children)

    def choose_tree_expansion(self, tree, children):
        """Return index of subtree in `children` to be selected for expansion.
           Defaults to random."""
        return random.randrange(0, len(children))

    def expand_tree_once(self, tree):
        """Choose an unexpanded symbol in tree; expand it.
           Can be overloaded in subclasses."""
        (symbol, children) = tree
        if children is None:
            return self.expand_node(tree)
        expandable_children = [
            c for c in children if self.any_possible_expansions(c)]
        index_map = [i for (i, c) in enumerate(children)
                     if c in expandable_children]
        child_to_be_expanded = self.choose_tree_expansion(tree, expandable_children)
        children[index_map[child_to_be_expanded]] = \
            self.expand_tree_once(expandable_children[child_to_be_expanded])
        return tree

    def log_tree(self, tree):
        """Output a tree if self.log is set; if self.display is also set, show the tree structure"""
        if self.log:
            print("Tree:", all_terminals(tree))
            if self.disp:
                display(display_tree(tree))

    def expand_tree_with_strategy(self, tree, expand_node_method, limit=None):
        """Expand tree using `expand_node_method` as node expansion function
        until the number of possible expansions reaches `limit`."""
        self.expand_node = expand_node_method
        while ((limit is None or self.possible_expansions(tree) < limit)
               and self.any_possible_expansions(tree)):
            tree = self.expand_tree_once(tree)
            self.log_tree(tree)
        return tree

    def expand_tree(self, tree):
        """Expand `tree` in a three-phase strategy until all expansions are complete."""
        self.log_tree(tree)
        tree = self.expand_tree_with_strategy(
            tree, self.expand_node_max_cost, self.min_nonterminals)
        tree = self.expand_tree_with_strategy(
            tree, self.expand_node_randomly, self.max_nonterminals)
        tree = self.expand_tree_with_strategy(
            tree, self.expand_node_min_cost)
        assert self.possible_expansions(tree) == 0
        return tree

    def init_tree(self):
        return (self.start_symbol, None)

    def fuzz_tree(self):
        """Produce a derivation tree from the grammar."""
        tree = self.init_tree()
        tree = self.expand_tree(tree)
        if self.log:
            print(repr(all_terminals(tree)))
        if self.disp:
            display(display_tree(tree))
        return tree

    def fuzz(self):
        """Produce a string from the grammar."""
        self.derivation_tree = self.fuzz_tree()
        return all_terminals(self.derivation_tree)

    def choose_node_expansion(self, node, children_alternatives):
        """Return index of expansion in `children_alternatives` to be selected.
           'children_alternatives`: a list of possible children for `node`.
           Defaults to random. To be overloaded in subclasses."""
        return random.randrange(0, len(children_alternatives))

    def process_chosen_children(self, chosen_children, expansion):
        """Process children after selection. By default, does nothing."""
        return chosen_children

    def expand_node_min_cost(self, node):
        if self.log:
            print("Expanding", all_terminals(node), "at minimum cost")
        return self.expand_node_by_cost(node, min)


In [25]:
class GrammarFuzzer(Fuzzer):
    """Produce strings from grammars efficiently, using derivation trees."""

    def __init__(self,
                 grammar: Grammar,
                 start_symbol: str = START_SYMBOL,
                 min_nonterminals: int = 0,
                 max_nonterminals: int = 10,
                 disp: bool = False,
                 log: Union[bool, int] = False) -> None:
        """Produce strings from `grammar`, starting with `start_symbol`.
        If `min_nonterminals` or `max_nonterminals` is given, use them as limits 
        for the number of nonterminals produced.  
        If `disp` is set, display the intermediate derivation trees.
        If `log` is set, show intermediate steps as text on standard output."""

        self.grammar = grammar
        self.start_symbol = start_symbol
        self.min_nonterminals = min_nonterminals
        self.max_nonterminals = max_nonterminals
        self.disp = disp
        self.log = log
        #self.check_grammar()  # Invokes is_valid_grammar()

    def expand_node_max_cost(self, node: DerivationTree) -> DerivationTree:
        if self.log:
            print("Expanding", all_terminals(node), "at maximum cost")

        return self.expand_node_by_cost(node, max)

    def possible_expansions(self, node: DerivationTree) -> int:
        (symbol, children) = node
        if children is None:
            return 1

        return sum(self.possible_expansions(c) for c in children)

    def any_possible_expansions(self, node: DerivationTree) -> bool:
        (symbol, children) = node
        if children is None:
            return True

        return any(self.any_possible_expansions(c) for c in children)

    def expansion_to_children(self, expansion: Expansion) -> List[DerivationTree]:
        return expansion_to_children(expansion)


    def expand_node_randomly(self, node: DerivationTree) -> DerivationTree:
        """Choose a random expansion for `node` and return it"""
        (symbol, children) = node
        assert children is None

        if self.log:
            print("Expanding", all_terminals(node), "randomly")

        # Fetch the possible expansions from grammar...
        expansions = self.grammar[symbol]
        children_alternatives: List[List[DerivationTree]] = [
            self.expansion_to_children(expansion) for expansion in expansions
        ]

        # ... and select a random expansion
        index = self.choose_node_expansion(node, children_alternatives)
        chosen_children = children_alternatives[index]

        # Process children (for subclasses)
        chosen_children = self.process_chosen_children(chosen_children,
                                                       expansions[index])

        # Return with new children
        return (symbol, chosen_children)

    def choose_tree_expansion(self,
                              tree: DerivationTree,
                              children: List[DerivationTree]) -> int:
        """Return index of subtree in `children` to be selected for expansion.
           Defaults to random."""
        return random.randrange(0, len(children))

    def expand_tree_once(self, tree: DerivationTree) -> DerivationTree:
        """Choose an unexpanded symbol in tree; expand it.
           Can be overloaded in subclasses."""
        (symbol, children) = tree
        if children is None:
            # Expand this node
            return self.expand_node(tree)

        # Find all children with possible expansions
        expandable_children = [
            c for c in children if self.any_possible_expansions(c)]

        # `index_map` translates an index in `expandable_children`
        # back into the original index in `children`
        index_map = [i for (i, c) in enumerate(children)
                     if c in expandable_children]

        # Select a random child
        child_to_be_expanded = \
            self.choose_tree_expansion(tree, expandable_children)

        # Expand in place
        children[index_map[child_to_be_expanded]] = \
            self.expand_tree_once(expandable_children[child_to_be_expanded])

        return tree

    def log_tree(self, tree: DerivationTree) -> None:
        """Output a tree if self.log is set; if self.display is also set, show the tree structure"""
        if self.log:
            print("Tree:", all_terminals(tree))
            if self.disp:
                display(display_tree(tree))
            # print(self.possible_expansions(tree), "possible expansion(s) left")

    def expand_tree_with_strategy(self, tree: DerivationTree,
                                  expand_node_method: Callable,
                                  limit: Optional[int] = None):
        """Expand tree using `expand_node_method` as node expansion function
        until the number of possible expansions reaches `limit`."""
        self.expand_node = expand_node_method
        while ((limit is None
                or self.possible_expansions(tree) < limit)
               and self.any_possible_expansions(tree)):
            tree = self.expand_tree_once(tree)
            self.log_tree(tree)
        return tree

    def expand_tree(self, tree: DerivationTree) -> DerivationTree:
        """Expand `tree` in a three-phase strategy until all expansions are complete."""
        self.log_tree(tree)
        tree = self.expand_tree_with_strategy(
            tree, self.expand_node_max_cost, self.min_nonterminals)
        tree = self.expand_tree_with_strategy(
            tree, self.expand_node_randomly, self.max_nonterminals)
        tree = self.expand_tree_with_strategy(
            tree, self.expand_node_min_cost)

        assert self.possible_expansions(tree) == 0

        return tree

    def init_tree(self) -> DerivationTree:
        return (self.start_symbol, None)

    def fuzz_tree(self) -> DerivationTree:
        """Produce a derivation tree from the grammar."""
        tree = self.init_tree()
        # print(tree)

        # Expand all nonterminals
        tree = self.expand_tree(tree)
        if self.log:
            print(repr(all_terminals(tree)))
        if self.disp:
            display(display_tree(tree))
        return tree

    def fuzz(self) -> str:
        """Produce a string from the grammar."""
        self.derivation_tree = self.fuzz_tree()
        return all_terminals(self.derivation_tree)

    def choose_node_expansion(self, node: DerivationTree,
                              children_alternatives: List[List[DerivationTree]]) -> int:
        """Return index of expansion in `children_alternatives` to be selected.
           'children_alternatives`: a list of possible children for `node`.
           Defaults to random. To be overloaded in subclasses."""
        return random.randrange(0, len(children_alternatives))

    def process_chosen_children(self,
                                chosen_children: List[DerivationTree],
                                expansion: Expansion) -> List[DerivationTree]:
        """Process children after selection.  By default, does nothing."""
        return chosen_children

    def expand_node_min_cost(self, node: DerivationTree) -> DerivationTree:
        if self.log:
            print("Expanding", all_terminals(node), "at minimum cost")

        return self.expand_node_by_cost(node, min)

In [26]:
ORDER_GRAMMAR: Grammar = {
    "<start>": ["<order>"],
    "<order>": ["/order?item=<item>&name=<name>&email=<email>&city=<city>&zip=<zip>"],
    "<item>": ["tshirtmennavy", "tshirtwomennavy", "tshirtmenwhite", "tshirtwomenwhite", "hoodienavy", "hoodiegrey"],
    "<name>": [cgi_encode("Jane Doe"), cgi_encode("John Smith")],
    "<email>": [cgi_encode("j.doe@example.com"), cgi_encode("j_smith@example.com")],
    "<city>": ["Seattle", cgi_encode("New York")],
    "<zip>": ["<digit>" * 5],
    "<digit>": crange('0', '9')
}

In [27]:
order_fuzzer = GrammarFuzzer(ORDER_GRAMMAR)
[order_fuzzer.fuzz() for i in range(5)]

['/order?item=tshirtmennavy&name=John+Smith&email=j.doe%40example.com&city=New+York&zip=86389',
 '/order?item=tshirtwomenwhite&name=Jane+Doe&email=j.doe%40example.com&city=New+York&zip=83870',
 '/order?item=tshirtwomennavy&name=Jane+Doe&email=j.doe%40example.com&city=New+York&zip=78033',
 '/order?item=hoodienavy&name=John+Smith&email=j_smith%40example.com&city=New+York&zip=94275',
 '/order?item=hoodienavy&name=John+Smith&email=j.doe%40example.com&city=New+York&zip=90645']

In [28]:
# use fuzz inputs
HTML(webbrowser(urljoin(httpd_url, order_fuzzer.fuzz())))

In [29]:
# check
print(db.execute("SELECT * FROM orders").fetchall())

[('tshirtwomennavy', 'Jane Doe', 'doe@example.com', 'Seattle', '98104'), ('tshirtmennavy', 'John Smith', 'j_smith@example.com', 'Seattle', '56779')]


## Fuzzing mit unerwarteten Inputs
In dieser Sektion wird das Fuzzing mit unerwarteten Inputs durchgeführt.

In [30]:
# create seed
seed = order_fuzzer.fuzz()
seed

'/order?item=tshirtwomenwhite&name=Jane+Doe&email=j_smith%40example.com&city=Seattle&zip=19235'

In [31]:
# mutate seed    

def delete_random_character(s: str) -> str:
    """Returns s with a random character deleted"""
    if s == "":
        return s

    pos = random.randint(0, len(s) - 1)
    # print("Deleting", repr(s[pos]), "at", pos)
    return s[:pos] + s[pos + 1:]

def insert_random_character(s: str) -> str:
    """Returns s with a random character inserted"""
    pos = random.randint(0, len(s))
    random_character = chr(random.randrange(32, 127))
    # print("Inserting", repr(random_character), "at", pos)
    return s[:pos] + random_character + s[pos:]

def flip_random_character(s):
    """Returns s with a random bit flipped in a random position"""
    if s == "":
        return s

    pos = random.randint(0, len(s) - 1)
    c = s[pos]
    bit = 1 << random.randint(0, 6)
    new_c = chr(ord(c) ^ bit)
    # print("Flipping", bit, "in", repr(c) + ", giving", repr(new_c))
    return s[:pos] + new_c + s[pos + 1:]

def mutate(s: str) -> str:
    """Return s with a random mutation applied"""
    mutators = [
        delete_random_character,
        insert_random_character,
        flip_random_character
    ]
    mutator = random.choice(mutators)
    # print(mutator)
    return mutator(s)

In [None]:
class MutationFuzzer(Fuzzer):
    """Base class for mutational fuzzing"""

    def __init__(self, seed: List[str],
                 min_mutations: int = 2,
                 max_mutations: int = 10) -> None:
        """Constructor.
        `seed` - a list of (input) strings to mutate.
        `min_mutations` - the minimum number of mutations to apply.
        `max_mutations` - the maximum number of mutations to apply.
        """
        self.seed = seed
        self.min_mutations = min_mutations
        self.max_mutations = max_mutations
        self.reset()

    def reset(self) -> None:
        """Set population to initial seed.
        To be overloaded in subclasses."""
        self.population = self.seed
        self.seed_index = 0

    def create_candidate(self) -> str:
        """Create a new candidate by mutating a population member"""
        candidate = random.choice(self.population)
        trials = random.randint(self.min_mutations, self.max_mutations)
        for i in range(trials):
            candidate = self.mutate(candidate)
        return candidate

    def fuzz(self) -> str:
        if self.seed_index < len(self.seed):
            # Still seeding
            self.inp = self.seed[self.seed_index]
            self.seed_index += 1
        else:
            # Mutating
            self.inp = self.create_candidate()
        return self.inp

    def mutate(self, inp: str) -> str:
        return mutate(inp)

In [32]:
mutate_order_fuzzer = MutationFuzzer([seed], min_mutations=1, max_mutations=1)
[mutate_order_fuzzer.fuzz() for i in range(5)]

NameError: name 'MutationFuzzer' is not defined

In [None]:
# fuzz till error
while True:
    path = mutate_order_fuzzer.fuzz()
    url = urljoin(httpd_url, path)
    r = requests.get(url)
    if r.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
        break

In [None]:
failing_path = path
failing_path

In [None]:
# minimize with delta debugging

class WebRunner(Runner):
    """Runner for a Web server"""

    def __init__(self, base_url: Optional[str] = None):
        self.base_url = base_url

    def run(self, url: str) -> Tuple[str, str]:
        if self.base_url is not None:
            url = urljoin(self.base_url, url)

        r = requests.get(url)
        if r.status_code == HTTPStatus.OK:
            return url, Runner.PASS
        elif r.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
            return url, Runner.FAIL
        else:
            return url, Runner.UNRESOLVED
        
web_runner = WebRunner(httpd_url)
web_runner.run(failing_path)

In [None]:
class Reducer:
    """Base class for reducers."""

    def __init__(self, runner: Runner, log_test: bool = False) -> None:
        """Attach reducer to the given `runner`"""
        self.runner = runner
        self.log_test = log_test
        self.reset()

    def reset(self) -> None:
        """Reset the test counter to zero. To be extended in subclasses."""
        self.tests = 0

    def test(self, inp: str) -> Outcome:
        """Test with input `inp`. Return outcome.
        To be extended in subclasses."""

        result, outcome = self.runner.run(inp)
        self.tests += 1
        if self.log_test:
            print("Test #%d" % self.tests, repr(inp), repr(len(inp)), outcome)
        return outcome

    def reduce(self, inp: str) -> str:
        """Reduce input `inp`. Return reduced input.
        To be defined in subclasses."""

        self.reset()
        # Default: Don't reduce
        return inp

class CachingReducer(Reducer):
    """A reducer that also caches test outcomes"""

    def reset(self):
        super().reset()
        self.cache = {}

    def test(self, inp):
        if inp in self.cache:
            return self.cache[inp]

        outcome = super().test(inp)
        self.cache[inp] = outcome
        return outcome

class DeltaDebuggingReducer(CachingReducer):
    """Reduce inputs using delta debugging."""

    def reduce(self, inp: str) -> str:
        """Reduce input `inp` using delta debugging. Return reduced input."""

        self.reset()
        assert self.test(inp) != Runner.PASS

        n = 2     # Initial granularity
        while len(inp) >= 2:
            start = 0.0
            subset_length = len(inp) / n
            some_complement_is_failing = False

            while start < len(inp):
                complement = inp[:int(start)] + \
                    inp[int(start + subset_length):]

                if self.test(complement) == Runner.FAIL:
                    inp = complement
                    n = max(n - 1, 2)
                    some_complement_is_failing = True
                    break

                start += subset_length

            if not some_complement_is_failing:
                if n == len(inp):
                    break
                n = min(n * 2, len(inp))

        return inp

In [None]:
minimized_path = DeltaDebuggingReducer(web_runner).reduce(failing_path)
minimized_path

## Web-Attacken auf den Webserver
In dieser Sektion wird eine Web-Attacke auf den Webserver durchgeführt.

### SQL-Injection

In [None]:
def extend_grammar(grammar: Grammar, extension: Grammar = {}) -> Grammar:
    """Create a copy of `grammar`, updated with `extension`."""
    new_grammar = copy.deepcopy(grammar)
    new_grammar.update(extension)
    return new_grammar

In [None]:
values: Dict[str, str] = {
    "item": "tshirtmenwhite",
    "name": "Jane Doe",
    "email": "j.doe@example.com",
    "city": "Seattle",
    "zip": "98104"
}

sql_command = ("INSERT INTO orders " +
               "VALUES ('{item}', '{name}', '{email}', '{city}', '{zip}')".format(**values))

values["name"] = "Jane', 'x', 'x', 'x'); DELETE FROM orders; -- "

sql_command = ("INSERT INTO orders " +
               "VALUES ('{item}', '{name}', '{email}', '{city}', '{zip}')".format(**values))

ORDER_GRAMMAR_WITH_SQL_INJECTION = extend_grammar(ORDER_GRAMMAR, {
    "<name>": [cgi_encode("Jane', 'x', 'x', 'x'); DELETE FROM orders; --")],
})

sql_injection_fuzzer = GrammarFuzzer(ORDER_GRAMMAR_WITH_SQL_INJECTION)
order_with_injected_sql = sql_injection_fuzzer.fuzz()
order_with_injected_sql

print(db.execute("SELECT * FROM orders").fetchall())

contents = webbrowser(urljoin(httpd_url, order_with_injected_sql))

print(db.execute("SELECT * FROM orders").fetchall())

### HTML Injection

In [None]:
ORDER_GRAMMAR_WITH_HTML_INJECTION: Grammar = extend_grammar(ORDER_GRAMMAR, {
    "<name>": [cgi_encode('''
    Jane Doe<p>
    <strong><a href="www.lots.of.malware">Click here for cute cat pictures!</a></strong>
    </p>
    ''')],
})

html_injection_fuzzer = GrammarFuzzer(ORDER_GRAMMAR_WITH_HTML_INJECTION)
order_with_injected_html = html_injection_fuzzer.fuzz()
order_with_injected_html

HTML(webbrowser(urljoin(httpd_url, order_with_injected_html)))


guck exercises auf https://www.fuzzingbook.org/html/WebFuzzer.html an