In [4]:
## json grammar from antlr in python 
## CITE: https://github.com/antlr/grammars-v4/blob/master/json/JSON.g4, Fuzzing book 

JSON_GRAMMAR = {
    "<json>":
        ["<value>"],

    "<obj>":
        ["<'{' pair (',' pair)* '}'>",  "<'{' '}'>"],

    "<pair>":
        ["<STRING ':' value>"],

    "<arr>":
        ["<'[' value (',' value)* ']'>", "<'[' ']'>"],
    
    "<value>":
        ["<STRING>", "<NUMBER>", "<obj>", "<arr>", "<'true'>", "<'false'>", "<'null'>"], 

    "<STRING>":
        ["<'"' (ESC | SAFECODEPOINT)* '"'>"],

    "<fragment ESC>":
        ["<"+"'\\' (["+'"\\/bfnrt] | UNICODE)'+">"],
   
    "<fragment UNICODE>":
        ["<'u' HEX HEX HEX HEX>"],

    "<fragment HEX>":
        ["<0-9a-fA-F>"],
   
    "<fragment SAFECODEPOINT>":
        ["<"+"~ ["+'"\\\u0000-\u001F]'+">"],

    "<NUMBER>":
        ["<'-'? INT ('.' [0-9] +)? EXP?>"],
  
    "<fragment INT>":
        ["<'0' | [1-9] [0-9]*>"],
   
    # no leading zeros

    "<fragment EXP>":
      ["<[Ee] [+\-]? INT>"],
   
    # \- since - means "range" inside [...]

    "<WS>":
      ["<[ \t\n\r] + -> skip>"]


} 

In [5]:
# test print 
print(JSON_GRAMMAR)

{'<json>': ['<value>'], '<obj>': ["<'{' pair (',' pair)* '}'>", "<'{' '}'>"], '<pair>': ["<STRING ':' value>"], '<arr>': ["<'[' value (',' value)* ']'>", "<'[' ']'>"], '<value>': ['<STRING>', '<NUMBER>', '<obj>', '<arr>', "<'true'>", "<'false'>", "<'null'>"], '<STRING>': ["<' (ESC | SAFECODEPOINT)* '>"], '<fragment ESC>': ['<\'\\\' (["\\/bfnrt] | UNICODE)>'], '<fragment UNICODE>': ["<'u' HEX HEX HEX HEX>"], '<fragment HEX>': ['<0-9a-fA-F>'], '<fragment SAFECODEPOINT>': ['<~ ["\\\x00-\x1f]>'], '<NUMBER>': ["<'-'? INT ('.' [0-9] +)? EXP?>"], '<fragment INT>': ["<'0' | [1-9] [0-9]*>"], '<fragment EXP>': ['<[Ee] [+\\-]? INT>'], '<WS>': ['<[ \t\n\r] + -> skip>']}


In [24]:
import ast
import inspect

class Mutant:
    def __init__(self, pm, location, log=False):
        self.pm = pm
        self.i = location
        self.name = "%s_%s" % (self.pm.name, self.i)
        self._src = None
        self.tests = []
        self.detected = False
        self.log = log
  
    def __enter__(self):
        if self.log:
            print('->\t%s' % self.name)
        c = compile(self.src(), '<mutant>', 'exec')
        eval(c, globals())

    def generate_mutant(self, location):
        mutant_ast = self.pm.mutator_object(
            location).visit(ast.parse(self.pm.src))  # copy
        return ast.unparse(mutant_ast)

    def src(self):
        if self._src is None:
            self._src = self.generate_mutant(self.i)
        return self._src
    
    def diff(self):
        return '\n'.join(difflib.unified_diff(self.pm.src.split('\n'),
                                              self.src().split('\n'),
                                              fromfile='original',
                                              tofile='mutant',
                                              n=3))

    def __exit__(self, exc_type, exc_value, traceback):
        if self.log:
            print('<-\t%s' % self.name)
        if exc_type is not None:
            self.detected = True
            if self.log:
                print("Detected %s" % self.name, exc_type, exc_value)
        globals()[self.pm.name] = self.pm.fn
        if self.log:
            print()
        return True
class PMIterator:
    def __init__(self, pm):
        self.pm = pm
        self.idx = 0
  
    def __next__(self):
        i = self.idx
        if i >= self.pm.nmutations:
            self.pm.finish()
            raise StopIteration()
        self.idx += 1
        mutant = Mutant(self.pm, self.idx, log=self.pm.log)
        self.pm.register(mutant)
        return mutant

class MuFunctionAnalyzer:
    def __init__(self, fn, log=False):
        self.fn = fn
        self.name = fn.__name__
        src = inspect.getsource(fn)
        self.ast = ast.parse(src)
        self.src = ast.unparse(self.ast)  # normalize
        self.mutator = self.mutator_object()
        self.nmutations = self.get_mutation_count()
        self.un_detected = set()
        self.mutants = []
        self.log = log

    def mutator_object(self, locations=None):
        return StmtDeletionMutator(locations)

    def register(self, m):
        self.mutants.append(m)

    def get_mutation_count(self):
        self.mutator.visit(self.ast)
        return self.mutator.count

    def __iter__(self):
        return PMIterator(self)

    def finish(self):
        self.un_detected = {mutant for mutant in self.mutants if not mutant.detected}

    def score(self):
        return (self.nmutations - len(self.un_detected)) / self.nmutations
  
  


class Mutator(ast.NodeTransformer):
    def __init__(self, mutate_location=-1):
        self.count = 0
        self.mutate_location = mutate_location

    def mutable_visit(self, node):
        self.count += 1  # statements start at line no 1
        if self.count == self.mutate_location:
            return self.mutation_visit(node)
        return self.generic_visit(node)

class StmtDeletionMutator(Mutator):
    def visit_Return(self, node): return self.mutable_visit(node)
    def visit_Delete(self, node): return self.mutable_visit(node)

    def visit_Assign(self, node): return self.mutable_visit(node)
    def visit_AnnAssign(self, node): return self.mutable_visit(node)
    def visit_AugAssign(self, node): return self.mutable_visit(node)

    def visit_Raise(self, node): return self.mutable_visit(node)
    def visit_Assert(self, node): return self.mutable_visit(node)

    def visit_Global(self, node): return self.mutable_visit(node)
    def visit_Nonlocal(self, node): return self.mutable_visit(node)

    def visit_Expr(self, node): return self.mutable_visit(node)

    def visit_Pass(self, node): return self.mutable_visit(node)
    def visit_Break(self, node): return self.mutable_visit(node)
    def visit_Continue(self, node): return self.mutable_visit(node)

    def mutation_visit(self, node): return ast.Return(None)#return ast.Pass() 


In [7]:
import difflib

In [8]:
from inspect import getmembers, isfunction

#import json 
#print(getmembers(json, isfunction))
import json_parser
fns = getmembers(json_parser, isfunction)
print(fns)

[('all_parsers', <function all_parsers at 0x0000025111AD4D30>), ('array_parser', <function array_parser at 0x000002510F74BEE0>), ('boolean_parser', <function boolean_parser at 0x0000025111AD48B0>), ('colon_parser', <function colon_parser at 0x0000025111AD49D0>), ('comma_parser', <function comma_parser at 0x0000025111AD4A60>), ('main', <function main at 0x0000025111AD4E50>), ('null_parser', <function null_parser at 0x0000025111AD4AF0>), ('number_parser', <function number_parser at 0x0000025111AD4B80>), ('object_parser', <function object_parser at 0x0000025111AD4C10>), ('string_parser', <function string_parser at 0x0000025111AD4CA0>), ('value_parser', <function all_parsers.<locals>.<lambda> at 0x0000025111AD4DC0>)]


In [9]:
LAMBDA = lambda:0
for fn in fns:
    print(fn[0])
    if isinstance(fn[1], type(LAMBDA)) and fn[1].__name__ == LAMBDA.__name__: #check if fn is lambda fn, dont consider that
        continue
    for mutant in MuFunctionAnalyzer(fn[1]): ## fn[0] is the function name. We need to pass the function, so take fn[1]
        shape_src = mutant.pm.src
    for line in difflib.unified_diff(mutant.pm.src.split('\n'),
                                     mutant.src().split('\n'),
                                     fromfile=mutant.pm.name,
                                     tofile=mutant.name, n=3):
        print(line)

all_parsers
--- all_parsers

+++ all_parsers_1

@@ -1,2 +1,2 @@

 def all_parsers(*args):
-    return lambda data: reduce(lambda f, g: f if f(data) else g, args)(data)
+    pass
array_parser
--- array_parser

+++ array_parser_11

@@ -14,4 +14,4 @@

         res = comma_parser(data)
         if res is None:
             return None
-        data = res[1].strip()
+        pass
boolean_parser
--- boolean_parser

+++ boolean_parser_2

@@ -2,4 +2,4 @@

     if data[0:4] == 'true':
         return [True, data[4:].strip()]
     elif data[0:5] == 'false':
-        return [False, data[5:].strip()]
+        pass
colon_parser
--- colon_parser

+++ colon_parser_1

@@ -1,3 +1,3 @@

 def colon_parser(data):
     if data[0] == ':':
-        return [data[0], data[1:].lstrip()]
+        pass
comma_parser
--- comma_parser

+++ comma_parser_1

@@ -1,3 +1,3 @@

 def comma_parser(data):
     if data and data[0] == ',':
-        return [data[0], data[1:].strip()]
+        pass
main
--- main

+++ main_5

@@ 

In [10]:
LAMBDA = lambda:0
for fn in fns:
    print(fn[0])
    if isinstance(fn[1], type(LAMBDA)) and fn[1].__name__ == LAMBDA.__name__: #check if fn is lambda fn, dont consider that
        continue
    for mutant in MuFunctionAnalyzer(fn[1]): ## fn[0] is the function name. We need to pass the function, so take fn[1]
        shape_src = mutant.pm.src
        print(mutant.src())

all_parsers
def all_parsers(*args):
    pass
array_parser
def array_parser(data):
    if data[0] != '[':
        pass
    parse_list = []
    data = data[1:].strip()
    while len(data):
        res = value_parser(data)
        if res is None:
            return None
        parse_list.append(res[0])
        data = res[1].strip()
        if data[0] == ']':
            return [parse_list, data[1:].strip()]
        res = comma_parser(data)
        if res is None:
            return None
        data = res[1].strip()
def array_parser(data):
    if data[0] != '[':
        return None
    pass
    data = data[1:].strip()
    while len(data):
        res = value_parser(data)
        if res is None:
            return None
        parse_list.append(res[0])
        data = res[1].strip()
        if data[0] == ']':
            return [parse_list, data[1:].strip()]
        res = comma_parser(data)
        if res is None:
            return None
        data = res[1].strip()
def array_parser(data)

In [14]:
from fuzzingbook import Grammars
from fuzzingbook.Grammars import *

In [9]:
# from fuzzingbook import Fuzzer
# from fuzzingbook.Fuzzer import Fuzzer
# class GrammarFuzzer(Fuzzer):
#     """Produce strings from grammars efficiently, using derivation trees."""

#     def __init__(self,
#                  grammar: Grammar,
#                  start_symbol: str = START_SYMBOL,
#                  min_nonterminals: int = 0,
#                  max_nonterminals: int = 10,
#                  disp: bool = False,
#                  log: Union[bool, int] = False) -> None:
#         """Produce strings from `grammar`, starting with `start_symbol`.
#         If `min_nonterminals` or `max_nonterminals` is given, use them as limits 
#         for the number of nonterminals produced.  
#         If `disp` is set, display the intermediate derivation trees.
#         If `log` is set, show intermediate steps as text on standard output."""

#         self.grammar = grammar
#         self.start_symbol = start_symbol
#         self.min_nonterminals = min_nonterminals
#         self.max_nonterminals = max_nonterminals
#         self.disp = disp
#         self.log = log
#         self.check_grammar()  # Invokes is_valid_grammar()

In [15]:
from fuzzingbook import GrammarFuzzer
from fuzzingbook.GrammarFuzzer import GrammarFuzzer
f = GrammarFuzzer(JSON_GRAMMAR)

In [16]:
cnt = 0
#for i in range(10):
    #print(f.fuzz())
    LAMBDA = lambda:0
    for fn in fns:
        print(fn[0])
        if isinstance(fn[1], type(LAMBDA)) and fn[1].__name__ == LAMBDA.__name__: #check if fn is lambda fn, dont consider that
            continue
        for mutant in MuFunctionAnalyzer(fn[1]): ## fn[0] is the function name. We need to pass the function, so take fn[1]
            cnt+=1
#             all_parsers(null_parser, number_parser, boolean_parser,
#                            string_parser, object_parser, array_parser)

#             res = value_parser(f.fuzz().strip())
#             try:
#                 pprint.pprint(res[0])
#             except TypeError:
#                 print(None)
            file = open("json_parser"+str(cnt),'w')
            difflib.unified_diff(mutant.src(), mutant.pm.src, fromfile=json_parser, tofile=file)
#             for line in difflib.unified_diff(mutant.pm.src.split('\n'),
#                                              mutant.src().split('\n'),
#                                              fromfile=mutant.pm.name,
#                                              tofile=mutant.name, n=3):
#                 print(line)

IndentationError: unexpected indent (2419511157.py, line 4)

In [29]:
import diff_match_patch as dmp_module
dmp = dmp_module.diff_match_patch()

In [17]:
LAMBDA = lambda:0
cnt = 0
f = open("json_parser.py",'r')
f = f.read()
for fn in fns:
    print(fn[0])
    if isinstance(fn[1], type(LAMBDA)) and fn[1].__name__ == LAMBDA.__name__: #check if fn is lambda fn, dont consider that
        continue
    for mutant in MuFunctionAnalyzer(fn[1]): ## fn[0] is the function name. We need to pass the function, so take fn[1]
        cnt+=1
        print(mutant.pm.src)
        print('\n#########')
        print(f.split(mutant.pm.src)[0])
        print('\n######')
        new_text = open("json_parser"+str(cnt),'w')
        new_text.write(f.split(mutant.pm.src)[0]+mutant.src()+f.split(mutant.pm.src)[1])
        new_text.close()

#         diff = dmp.diff_main(mutant.src(), mutant.pm.src)
#         dmp.diff_cleanupSemantic(diff)
#         print(diff)
#         patches = dmp.patch_make(f,mutant.src())
#         diff = dmp.patch_toText(patches)
#         new_text,_=dmp.patch_apply(patches,f)
#         print(new_text)
    

all_parsers
def all_parsers(*args):
    return lambda data: reduce(lambda f, g: f if f(data) else g, args)(data)

#########
from functools import reduce
import re
import pprint


def array_parser(data):
    if data[0] != "[":
        return None
    parse_list = []
    data = data[1:].strip()
    while len(data):
        res = value_parser(data)
        if res is None:
            return None
        parse_list.append(res[0])
        data = res[1].strip()
        if data[0] == "]":
            return [parse_list, data[1:].strip()]
        res = comma_parser(data)
        if res is None:
            return None
        data = res[1].strip()


def boolean_parser(data):
    if data[0:4] == "true":
        return [True, data[4:].strip()]
    elif data[0:5] == "false":
        return [False, data[5:].strip()]


def colon_parser(data):
    if data[0] == ":":
        return [data[0], data[1:].lstrip()]


def comma_parser(data):
    if data and data[0] == ",":
        return [data[0], data[1:]

IndexError: list index out of range

In [19]:
import subprocess
import json
from collections import defaultdict
LAMBDA = lambda:0
cnt = 0
f = open("json_parser.py",'r')
f = f.read()
d_res = defaultdict(list)
from fuzzingbook import Grammars
from fuzzingbook.Grammars import *
from fuzzingbook import GrammarFuzzer
from fuzzingbook.GrammarFuzzer import GrammarFuzzer
inp = GrammarFuzzer(JSON_GRAMMAR)
killer_inputs = set()
for fn in fns:
    print(fn[0])
    if isinstance(fn[1], type(LAMBDA)) and fn[1].__name__ == LAMBDA.__name__: #check if fn is lambda fn, dont consider that
        continue
    for mutant in MuFunctionAnalyzer(fn[1]): ## fn[0] is the function name. We need to pass the function, so take fn[1]
        cnt+=1
        
        new_f = f+"\n"+mutant.src()
        #print(new_f)
        mutated_f = open('json_parser_mutated.py','w')
        mutated_f.write(new_f)
        mutated_f.close()
        for i in range(10):
            fuzzinp = inp.fuzz()
            json_inp = json.dumps(fuzzinp)
            json_inp_file = open("json_inp.json","w")
            json_inp_file.write(json_inp)
            json_inp_file.close()
            out = subprocess.Popen([sys.executable,"json_parser_test.py"],stdout=subprocess.PIPE)
            #out = subprocess.Popen(["json_parser_test.py", '"'+inp.fuzz()+'"'],shell=True,stdout=subprocess.PIPE)
            fuzzout, errors = out.communicate()
            d_res[new_f].append((fuzzinp,fuzzout.decode("utf-8"),errors))
            if errors:
                killer_inputs.add(fuzzinp)
                print("Mutant killed")
            #print(d_res[new_f])
print(killer_inputs)
    
        

all_parsers
array_parser
boolean_parser
colon_parser
comma_parser
main
null_parser
number_parser
object_parser
string_parser
value_parser
set()


In [4]:
### this code stores inputs in a set, takes a function and generates mutants 
#(we split on value parser = ... statement so that the code is appended right above it and so that it calls the changed function) 
# - for every mutant, we take each of the stored inputs and raise errors if the output is not equal to that of the original parser
# or if res_mutated[0] doesnt print (is None/nothing to print). 
# Note that our mutant is created using ast.Return(None) not ast.Pass() which means instead of deleting statement and replacing it with Pass, we replace it with Return None
# We store the inputs that killed the mutants
import subprocess
import json
from collections import defaultdict
LAMBDA = lambda:0
cnt = 0
f = open("json_parser.py",'r')
f = f.read()
d_res = defaultdict(list)
from fuzzingbook import Grammars
from fuzzingbook.Grammars import *
from fuzzingbook import GrammarFuzzer
from fuzzingbook.GrammarFuzzer import GrammarFuzzer
inp = GrammarFuzzer(JSON_GRAMMAR)
print(JSON_GRAMMAR)
killer_inputs = set()
fuzzinp = []
for i in range(10):
    fuzzinp.append(inp.fuzz())
print(fuzzinp)
for fn in fns:
    if isinstance(fn[1], type(LAMBDA)) and fn[1].__name__ == LAMBDA.__name__: #check if fn is lambda fn, dont consider that
        continue
    print(fn[0])
    for mutant in MuFunctionAnalyzer(fn[1]): ## fn[0] is the function name. We need to pass the function, so take fn[1]
        cnt+=1
         
        new_f = f.split("value_parser = ")[0]+"\n"+mutant.src()+"\n" + "value_parser = " + f.split("value_parser = ")[1]
#         new_f = f+"\n"+mutant.src()
        #print(new_f)
        mutated_f = open('json_parser_mutated.py','w')
        mutated_f.write(new_f)
        mutated_f.close()
        for fi in fuzzinp:
            json_inp = json.dumps(fi)
            json_inp_file = open("json_inp.json","w")
            json_inp_file.write(json_inp)
            json_inp_file.close()
            out = subprocess.Popen([sys.executable,"json_parser_test.py"],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
            #out = subprocess.Popen(["json_parser_test.py", '"'+inp.fuzz()+'"'],shell=True,stdout=subprocess.PIPE)
            fuzzout, errors = out.communicate()
            d_res[new_f].append((fi,fuzzout.decode("utf-8"),errors))
            if errors:
                killer_inputs.add(fi)
                print("Mutant killed")
            #print(d_res[new_f])
print(killer_inputs)
    
        

{'<start>': ['<json>'], '<json>': ['<element>'], '<element>': ['<ws><value><ws>'], '<value>': ['<object>', '<array>', '<string>', '<number>', 'true', 'false', 'null', "'; DROP TABLE STUDENTS"], '<object>': ['{<ws>}', '{<members>}'], '<members>': ['<member><symbol-2>'], '<member>': ['<ws><string><ws>:<element>'], '<array>': ['[<ws>]', '[<elements>]'], '<elements>': ['<element><symbol-1-1>'], '<string>': ['"<characters>"'], '<characters>': ['<character-1>'], '<character>': ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '!', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', ':', ';', '<', '=', '>', '?', '@', '[', ']', '^', '_', '`', '{', '|', '}', '~', ' '], '<number>': ['<int><frac><exp>'], '<int>': ['<digit>', '<

NameError: name 'fns' is not defined