In [193]:
import semgrep.semgrep_main
from typing import List
from io import StringIO
from semgrep.output import OutputHandler
from semgrep.output import OutputSettings
from semgrep.constants import OutputFormat
import json

In [194]:
def semgrep_pattern(pattern: str, targets: List[str]):
    io_capture = StringIO()
    output_handler = OutputHandler(
        OutputSettings(
            output_format=OutputFormat.JSON,
            output_destination=None,
            error_on_findings=False,
            strict=False,
        ),
        stdout=io_capture,
    )
    semgrep.semgrep_main.main(
        output_handler=output_handler,
        target=[str(t) for t in targets],
        pattern=pattern,
        config="",
        lang="python")
    output_handler.close()
    return json.loads(io_capture.getvalue())

In [195]:
# Copied from clusterfuzz server_flask.py
# Right hand converted to strings by hand
handlers = [
#    ('/', home.Handler if _is_oss_fuzz else testcase_list.Handler),
    ('/bots', "bots.Handler"),
    ('/bots/dead', "bots.DeadBotsHandler"),
    ('/configuration', "configuration.Handler"),
    ('/add-external-user-permission', "configuration.AddExternalUserPermission"),
    ('/delete-external-user-permission', "configuration.DeleteExternalUserPermission"),
    ('/crash-stats/load', "crash_stats.JsonHandler"),
    ('/crash-stats', "crash_stats.Handler"),
    ('/corpora', "corpora.Handler"),
    ('/corpora/create', "corpora.CreateHandler"),
    ('/corpora/delete', "corpora.DeleteHandler"),
    ('/docs', "help_redirector.DocumentationHandler"),
    ('/download', "download.Handler"),
    ('/download/<resource>', "download.Handler"),
    ('/fuzzers', "fuzzers.Handler"),
    ('/fuzzers/create', "fuzzers.CreateHandler"),
    ('/fuzzers/delete', "fuzzers.DeleteHandler"),
    ('/fuzzers/edit', "fuzzers.EditHandler"),
    ('/fuzzers/log/<fuzzer_name>', "fuzzers.LogHandler"),
    ('/issue', "issue_redirector.Handler"),
    ('/issue/<testcase_id>', "issue_redirector.Handler"),
    ('/jobs', "jobs.Handler"),
    ('/jobs/load', "jobs.JsonHandler"),
    ('/jobs/delete-job', "jobs.DeleteJobHandler"),
    ('/login', "login.Handler"),
    ('/logout', "login.LogoutHandler"),
    ('/report-bug', "help_redirector.ReportBugHandler"),
    ('/session-login', "login.SessionLoginHandler"),
    ('/testcase', "show_testcase.DeprecatedHandler"),
    ('/testcase-detail', "show_testcase.Handler"),
    ('/testcase-detail/<int:testcase_id>', "show_testcase.Handler"),
    ('/testcase-detail/crash-stats', "crash_stats_on_testcase.Handler"),
    ('/testcase-detail/create-issue', "create_issue.Handler"),
    ('/testcase-detail/delete', "delete.Handler"),
    ('/testcase-detail/download-testcase', "download_testcase.Handler"),
    ('/testcase-detail/find-similar-issues', "find_similar_issues.Handler"),
    ('/testcase-detail/mark-fixed', "mark_fixed.Handler"),
    ('/testcase-detail/mark-security', "mark_security.Handler"),
    ('/testcase-detail/mark-unconfirmed', "mark_unconfirmed.Handler"),
    ('/testcase-detail/redo', "redo.Handler"),
    ('/testcase-detail/refresh', "show_testcase.RefreshHandler"),
    ('/testcase-detail/remove-duplicate', "remove_duplicate.Handler"),
    ('/testcase-detail/remove-issue', "remove_issue.Handler"),
    ('/testcase-detail/remove-group', "remove_group.Handler"),
    ('/testcase-detail/testcase-variants', "testcase_variants.Handler"),
    ('/testcase-detail/update-from-trunk', "update_from_trunk.Handler"),
    ('/testcase-detail/update-issue', "update_issue.Handler"),
    ('/testcases', "testcase_list.Handler"),
    ('/testcases/load', "testcase_list.JsonHandler"),
    ('/upload-testcase', "upload_testcase.Handler"),
    ('/upload-testcase/get-url-oauth', "upload_testcase.UploadUrlHandlerOAuth"),
    ('/upload-testcase/prepare', "upload_testcase.PrepareUploadHandler"),
    ('/upload-testcase/load', "upload_testcase.JsonHandler"),
    ('/upload-testcase/upload', "upload_testcase.UploadHandler"),
    ('/upload-testcase/upload-oauth', "upload_testcase.UploadHandlerOAuth"),
    ('/update-job', "jobs.UpdateJob"),
    ('/update-job-template', "jobs.UpdateJobTemplate"),
]

In [None]:
# TODO: Turn this into a semgrep rule that extracts import names -> directories + filenames
# Result is class name to file name dictionary
def mangle_import_name(module_name):
    #from handlers.testcase_detail import (crash_stats as crash_stats_on_testcase)
    #from handlers.testcase_detail import (show as show_testcase)
    if module_name == "show_testcase":
        return "testcase_detail/show"
    elif module_name == "crash_stats_on_testcase":
        return "crash_stats"
    elif module_name == "create_issue":
        return "testcase_detail/create_issue"
    elif module_name == "delete":
        return "testcase_detail/delete"
    elif module_name == "download_testcase":
        return "testcase_detail/download_testcase"
    elif module_name == "find_similar_issues":
        return "testcase_detail/find_similar_issues"
    elif module_name == "mark_fixed":
        return "testcase_detail/mark_fixed"
    elif module_name == "mark_security":
        return "testcase_detail/mark_security"
    elif module_name == "mark_unconfirmed":
        return "testcase_detail/mark_unconfirmed"
    elif module_name == "redo":
        return "testcase_detail/redo"
    elif module_name == "remove_duplicate":
        return "testcase_detail/remove_duplicate"
    elif module_name == "remove_group":
        return "testcase_detail/remove_group"
    elif module_name == "remove_issue":
        return "testcase_detail/remove_issue"
    elif module_name == "testcase_variants":
        return "testcase_detail/testcase_variants"
    elif module_name == "update_from_trunk":
        return "testcase_detail/update_from_trunk"
    elif module_name == "update_issue":
        return "testcase_detail/update_issue"
    else:
        return module_name


In [241]:
from collections import defaultdict

# TODO: Something with duplicates
def construct_pattern(method, class_name):
    pattern = "class {}:\n\tdef {}(self):\n\t\t...\n".format(class_name, method)
    return pattern

def extract_annotations(result):
    annotations = []
    lines = result['extra']['lines']
    for l in lines.split("\n"):
        if "@" in l:
            annotations.append(l.strip())
    return annotations

# TODO: Separate out get, post, etc.
# Make sure that they are stored separately, do not combine multiple methods per Handler class
get_to_annotations = defaultdict(list)
post_to_annotations = defaultdict(list)

# No metavariable for annotations?
for handler in handlers:
    print(handler)
    module_name, class_name = handler[1].split(".")
    file_name = "/home/daniel/ppa/semgrep_repl/clusterfuzz/src/appengine/handlers/{}.py".format(mangle_import_name(module_name))
    
    pattern = construct_pattern("get", class_name)
    # Always assume 0 or 1 result.
    results = semgrep_pattern(pattern, [file_name])['results']
    assert(len(results) <= 1)
    if results == 1:
        get_to_annotations[handler[0]] = extract_annotations(results[0])

    pattern = construct_pattern("post", class_name)
    print(pattern)
    results = semgrep_pattern(pattern, [file_name])['results']
    assert(len(results) <= 1)
    if results:
        post_to_annotations[handler[0]] = extract_annotations(results[0])


('/bots', 'bots.Handler')
class Handler:
	def post(self):
		...

('/bots/dead', 'bots.DeadBotsHandler')
class DeadBotsHandler:
	def post(self):
		...

('/configuration', 'configuration.Handler')
class Handler:
	def post(self):
		...

('/add-external-user-permission', 'configuration.AddExternalUserPermission')
class AddExternalUserPermission:
	def post(self):
		...

('/delete-external-user-permission', 'configuration.DeleteExternalUserPermission')
class DeleteExternalUserPermission:
	def post(self):
		...

('/crash-stats/load', 'crash_stats.JsonHandler')
class JsonHandler:
	def post(self):
		...

('/crash-stats', 'crash_stats.Handler')
class Handler:
	def post(self):
		...

('/corpora', 'corpora.Handler')
class Handler:
	def post(self):
		...

('/corpora/create', 'corpora.CreateHandler')
class CreateHandler:
	def post(self):
		...

('/corpora/delete', 'corpora.DeleteHandler')
class DeleteHandler:
	def post(self):
		...

('/docs', 'help_redirector.DocumentationHandler')
class Documentati

In [242]:
all_annotations = set()
for r in get_to_annotations:
    for a in get_to_annotations[r]:
        all_annotations.add(a)
for r in post_to_annotations:
    for a in post_to_annotations[r]:
        all_annotations.add(a)
print(all_annotations)

{'@handler_flask.check_user_access(need_privileged_access=True)', '@handler_flask.check_admin_access_if_oss_fuzz', '@handler_flask.check_admin_access', '@staticmethod', '@handler_flask.post(handler_flask.FORM, handler_flask.JSON)', '@handler_flask.check_user_access(need_privileged_access=False)', '@handler_flask.oauth', '@handler_flask.post(handler_flask.JSON, handler_flask.JSON)', '@handler_flask.get(handler_flask.HTML)', '@handler_flask.post(handler_flask.FORM, handler_flask.HTML)', '@handler_flask.unsupported_on_local_server', '@handler_flask.require_csrf_token', '@handler_flask.get(handler_flask.JSON)'}


In [247]:
auth_map = {
    "@handler_flask.check_user_access(need_privileged_access=True)": "privileged user",
    "@handler_flask.check_admin_access_if_oss_fuzz": "oss-fuzz-admin",
    "@handler_flask.check_admin_access": "always-admin",
    "@handler_flask.check_user_access(need_privileged_access=False)": "unprivileged user",
    "@handler_flask.oauth": "logged in",
}

In [248]:
def annotation_list_to_auth(annotations):
    auth = []
    for a in annotations:
        if a in auth_map:
            auth.append(auth_map[a])
    if len(auth) == 0:
        auth.append("unauth")
    return auth

for route in get_to_annotations:
    print("GET {}".format(route), annotation_list_to_auth(get_to_annotations[route]))
for route in post_to_annotations:
    print("POST {}".format(route), annotation_list_to_auth(post_to_annotations[route]))

GET /bots ['oss-fuzz-admin', 'unprivileged user']
GET /bots/dead ['unauth']
GET /configuration ['always-admin', 'always-admin']
GET /crash-stats ['unauth']
GET /corpora ['oss-fuzz-admin', 'unprivileged user']
GET /docs ['unauth']
GET /fuzzers ['oss-fuzz-admin', 'unprivileged user']
GET /jobs ['privileged user']
GET /login ['unauth']
GET /logout ['unauth']
GET /report-bug ['unauth']
GET /testcase ['unauth']
GET /testcase-detail/crash-stats ['unauth']
GET /testcase-detail/download-testcase ['logged in']
GET /testcases ['unauth']
GET /upload-testcase ['unauth']
POST /configuration ['always-admin', 'always-admin']
POST /add-external-user-permission ['always-admin']
POST /delete-external-user-permission ['always-admin']
POST /crash-stats/load ['unauth']
POST /corpora/create ['privileged user']
POST /corpora/delete ['privileged user']
POST /fuzzers/create ['privileged user']
POST /fuzzers/delete ['privileged user']
POST /fuzzers/edit ['privileged user']
POST /jobs/load ['privileged user']
PO

In [218]:
# Make sure we resolved every route
for h in handlers:
    assert(h[0] in get_to_annotations or h[0] in post_to_annotations)