In [None]:
from dataclasses import dataclass
import pandas as pd
import httpx
import re
import ast
import astor
from db_util import TestCase

In [None]:
def parse_class(node: ast.ClassDef):
    """
    This function should parse the classes in test.py.
    """

    test = []
    t = TestCase(function_name=node.name)
    for field in node.body:
        if type(field) == ast.FunctionDef:
            code_block = "\n".join([line[4:] for line in astor.to_source(field).split("\n")[1:-1]])
            if field.name == "test":
                t.code = code_block
            elif field.name == "valid":
                t.valid = code_block
            elif field.name == "invalid":
                t.invalid = code_block
            else:
                print(field.name)
        elif type(field) == ast.Assign:
            column = field.targets[0].id
            if column in ["title", "description"]:
                value = field.value.value[5:].rsplit("\n", 1)[0]
                t[column] = value
            elif column in ["category", "source", "confidence"]:
                value = field.value.value
                t[column] = value
            elif column in ["type", "activity"]:
                value = astor.to_source(field.value)[:-1]
                t[column] = value
            else:
                pass
        elif type(field) == ast.Expr:
            # http1.router.register
            pass
        else:
            print(field.name)
    return t

In [None]:
tests = []

with open("../testcases.py", "r") as f:
    module = ast.parse(f.read())
    for node in ast.iter_child_nodes(module):
        if type(node) == ast.ClassDef:
            if node.name not in ["Level", "Violation", "Activity", "Logger"]:
                tests.append(parse_class(node))

In [None]:
df = pd.DataFrame(tests)[['function_name', 'title', 'description', 'type', 'category', 'source',
       'measure_violations', 'probe_feature', 'impact',
       'violation_check_implemented', 'probing_implemented', 'activity',
       'code', 'confidence', 'idx', 'valid', 'invalid']]

In [None]:
df["short_source"] = df["source"].apply(lambda x: x.split("#")[0])

In [None]:
@dataclass
class SpecInfo:
    spec_type = "unknown"
    spec_status = "unknown"
    title = "unknown"
    spec_date = "unknown"
    url = "unknown"
    num = "unknown"
    
    def __getitem__(self, item):
        return getattr(self, item)
   
    def to_dict(self) -> dict[str]:
        return {"Organization": self.spec_type, "Status": self.spec_status, "Title": self.title,
                "Date": self.spec_date, "URL": self.url, "Name": self.num}
res = {}


In [None]:
def get_spec(row):
    url_short = row["short_source"]
    url_long = row["source"]
    if "rfc" in url_short: 
        num = url_short.split("/")[-1]
        if num in res:
            r = res[num]
        else:
            resp = httpx.get(f"https://www.rfc-editor.org/search/rfc_search_detail.php?rfc={num}")
            table = pd.read_html(resp.content)[2]
            r = SpecInfo()
            r.spec_type = "RFC"
            r.spec_status = table["Status"][0]
            r.spec_date = table["Date"][0]
            r.title = table["Title"][0]
            r.url = url_short
            r.num = num
            res[num] = r
    elif "html.spec" in url_long:
        r = SpecInfo()
        r.spec_date = "March 2023"
        r.spec_type = "WHATWG"
        r.spec_status = "Living Standard"
        r.title = "HTML"
        r.num = "HTML"
        hash = url_long.split("#")[-1]
        r.url = f"https://html.spec.whatwg.org/commit-snapshots/578def68a9735a1e36610a6789245ddfc13d24e0/#{hash}"
    elif "fetch.spec" in url_long:
        r = SpecInfo()
        r.spec_date = "March 2023"
        r.spec_type = "WHATWG"
        r.spec_status = "Living Standard"
        r.title = "Fetch"
        r.num = "Fetch"
        hash = url_long.split("#")[-1]
        r.url = f"https://fetch.spec.whatwg.org/commit-snapshots/8f109835dcff90d19caed4b551a0da32d9d0f57e/#{hash}"
    elif "webappsec-csp" in url_long:
        r = SpecInfo()
        r.spec_date = "February 2023"
        r.spec_type = "W3C"
        r.spec_status = "Working Draft"
        r.title = "Content Security Policy Level 3"
        r.num = "CSP"
        hash = url_long.split("#")[-1]
        r.url = f"https://www.w3.org/TR/2023/WD-CSP3-20230220/#{hash}"
    elif "permissions" in url_long:
        r = SpecInfo()
        r.spec_date = "February 2023"
        r.spec_type = "W3C"
        r.spec_status = "Working Draft"
        r.title = "Permissions Policy"
        r.num = "PP"
        hash = url_long.split("#")[-1]
        r.url = f"https://www.w3.org/TR/2023/WD-permissions-policy-1-20230322/#{hash}"
    elif "upgrade-insecure" in url_long:
        # Which version to use? Candidate Recommendation from 2015 or Editor's Draft from 2022?
        r = SpecInfo()
        r.spec_date = "October 2022"
        r.spec_type = "W3C"
        r.spec_status = "Editor's Draft"
        r.title = "Upgrade Insecure Requests"
        r.num = "UIR"
        r.url = url_long
    else:
        r = SpecInfo()
        r.title = url_long
        r.url = url_long
    r_dict = r.to_dict()
    r_dict["function_name"] = row["function_name"]
    r_dict["type"] = row['type']
    r_dict["category"] = row["category"]
    r_dict["activity"] = row["activity"]
    return r_dict


In [None]:
def get_stats(df):
    probe_tests = len(df.loc[df["activity"].isin(["Activity.PROXY"])])
    direct_tests = len(df.loc[df["activity"].isin(["Activity.DIRECT", "Activity.DIRECT_BASE"])])
    retro_tests = len(df.loc[df["activity"].isin(["Activity.RETRO"])])

    considered_tests = df.loc[df["activity"].isin(["Activity.DIRECT", "Activity.DIRECT_BASE", "Activity.PROXY", "Activity.RETRO"])]
    num_considered_tests = len(considered_tests)
    requirements = len(considered_tests.loc[considered_tests["type"] == "Level.REQUIREMENT"])
    recommendations = len(considered_tests.loc[considered_tests["type"] == "Level.RECOMMENDATION"])
    abnfs = len(considered_tests.loc[considered_tests["type"] == "Level.ABNF"])
    other = len(considered_tests.loc[~considered_tests["type"].isin(["Level.REQUIREMENT", "Level.RECOMMENDATION", "Level.ABNF"])])

    data = {
        "Considered tests": num_considered_tests,
        "Broken tests": considered_tests["broken"].sum(),
        "Probe tests": probe_tests,
        "Direct tests": direct_tests,
        "Retro tests": retro_tests,
        "Requirements": requirements,
        "Recommendations": recommendations,
        "ABNFs": abnfs,
    }    

    return pd.Series(data)

# Top5k + 5k long tail
broken_wild_05_04 = ['probetest_Recommendation_cookie_IMF_fixdate',
       'probetest_Requirement_code_405_allow',
       'probetest_Recommendation_STS_header_after_upgrade_insecure_requests',
       'retrotest_Recommendation_head_get_headers',
       'retrotest_Recommendation_accept_patch_presence',
       'probetest_ABNF_expires_grammar',
       'probetest_Recommendation_redirect_after_upgrade_insecure_requests',
       'probetest_Recommendation_duplicate_csp',
       'probetest_ABNF_access_control_allow_credentials_grammar',
       'retrotest_Requirement_content_length_same_head_get',
       'probetest_Requirement_duplicate_fields',
       'probetest_Requirement_only_one_sts_header_allowed',
       'probetest_Recommendation_duplicate_cookies',
       'probetest_ABNF_accept_patch_grammar',
       'probetest_Requirement_sts_header_http',
       'probetest_Requirement_post_invalid_response_codes',
       'probetest_Requirement_date_header_required',
       'retrotest_Requirement_code_304_headers',
       'probetest_Requirement_code_407_proxy_authenticate',
       'probetest_ABNF_permissions_policy_grammar',
       'probetest_ABNF_xfo_grammar',
       'probetest_ABNF_access_control_allow_origin_grammar',
       'probetest_Recommendation_content_type_header_required',
       'probetest_ABNF_coop_grammar',
       'probetest_ABNF_content_type_grammar',
       'retrotest_Requirement_code_206_headers',
       'probetest_ABNF_server_grammar', 'probetest_ABNF_sts_grammar',
       'probetest_ABNF_cache_control_grammar',
       'probetest_ABNF_vary_grammar', 'probetest_ABNF_age_grammar',
       'probetest_Recommendation_code_416_content_range',
       'probetest_Recommendation_duplicate_cookie_attribute',
       'probetest_Requirement_code_401_www_authenticate',
       'probetest_ABNF_content_language_grammar',
       'probetest_Recommendation_code_302_location',
       'probetest_ABNF_last_modified_grammar',
       'probetest_ABNF_cookie_grammar',
       'retrotest_Requirement_content_length_same_304_200',
       'probetest_ABNF_content_length_grammar',
       'probetest_Requirement_content_length_1XX_204',
       'probetest_Recommendation_code_415_unsupported_media_type',
       'probetest_ABNF_allow_grammar', 'probetest_ABNF_date_grammar',
       'probetest_ABNF_etag_grammar', 'probetest_ABNF_csp_grammar',
       'probetest_Requirement_field_value_start_or_end_with_whitespace',
       'probetest_ABNF_connection_grammar', 'probetest_ABNF_corp_grammar',
       'probetest_ABNF_xcto_grammar',
       'probetest_Recommendation_code_303_location',
       'probetest_Recommendation_duplicate_csp_ro',
       'probetest_Recommendation_code_307_location',
       'probetest_Recommendation_response_directive_no_cache',
       'probetest_ABNF_access_control_max_age_grammar',
       'probetest_ABNF_access_control_allow_methods_grammar',
       'probetest_Requirement_code_304_no_content',
       'probetest_ABNF_location_header_grammar',
       'probetest_Recommendation_code_301_location',
       'probetest_Requirement_code_206_content_range',
       'probetest_Recommendation_server_header_long',
       'probetest_Requirement_sts_directives_only_allowed_once',
       'probetest_ABNF_range_grammar',
       'probetest_Requirement_transfer_encoding_http11',
       'probetest_ABNF_coep_grammar',
       'probetest_ABNF_access_control_allow_headers_grammar',
       'probetest_Requirement_send_upgrade_101',
       'probetest_Requirement_no_transfer_encoding_1xx_204']
broken_local_05_09 = ['directtest_Recommendation_allow_crlf_start',
       'directtest_Requirement_code_400_after_bad_host_request',
       'directtest_Recommendation_code_501_unknown_methods',
       'directtest_Recommendation_code_405_blocked_methods',
       'directtest_Requirement_reject_fields_contaning_cr_lf_nul',
       'probetest_Recommendation_redirect_after_upgrade_insecure_requests',
       'probetest_Recommendation_STS_header_after_upgrade_insecure_requests',
       'probetest_Requirement_code_405_allow',
       'directtest_Requirement_code_400_if_msg_with_whitespace_between_header_field_and_colon',
       'directtest_Requirement_reject_msgs_with_whitespace_between_startline_and_first_header_field',
       'probetest_Requirement_date_header_required',
       'retrotest_Requirement_content_length_same_head_get',
       'probetest_Requirement_post_invalid_response_codes',
       'retrotest_Recommendation_head_get_headers',
       'probetest_ABNF_server_grammar']

def get_broken(name):
    """All tests broken at least once"""
    broken_all = broken_local_05_09 + broken_wild_05_04
    broken_all = [test_name.split("_", maxsplit=2)[2] for test_name in broken_all]
    if name in broken_all:
        return True
    else:
        return False


In [None]:
# Get overview
def get_overview(t):
    t["broken"] = t["function_name"].apply(get_broken)
    t["URL_short"] = t["URL"].apply(lambda x: x.split("#")[0])
    tab = t.groupby(by=["Name", "Title", "Status", "Organization", "Date", "URL_short"]).apply(get_stats).reset_index()
    tab = tab.rename(columns={"URL_short": "URL"})
    t_old = tab
    # Detailed overview
    def get_url(str):
        return t_old.loc[t_old["Name"] == str]["URL"].values[0]
    def add_url(str):
        return r"\href{" + get_url(str) + r"}" + r"{" + str + "}"
    tab = tab.sort_values(by=["Organization", "Name", "Date"])
    tab = tab.rename(columns={"Considered tests": "#Rules", "Broken tests": "#Broken Rules"})
    tab = tab[["Name", "Title", "Status", "Organization", "Date", "#Rules", "#Broken Rules"]]
    with pd.option_context("max_colwidth", 1000):
        s = tab.style
        s.hide(axis="index")
        s.format({"Name": add_url})
        print(s.to_latex(f"output/sources_overview.tex", hrules=True,))
        display(tab)

    # High level overview
    test_overview = t.groupby("Name").apply(get_stats).apply(sum).to_frame()
    display(test_overview)
    test_overview.to_latex(f"output/test_overview.tex")

In [None]:
def get_test_type(s):
    if s in ["Activity.PROXY"]:
        return "Probe"
    if s in ["Activity.DIRECT", "Activity.DIRECT_BASE"]:
        return "Direct"
    if s in ["Activity.RETRO"]:
        return r"\Multi{}"
    else:
        return "Excluded"
    
def get_rule_type(s):
    if s == "Level.REQUIREMENT":
        return "Requirement"
    if s == "Level.RECOMMENDATION":
        return "Recommendation"
    if s == "Level.ABNF":
        return "ABNF"
    else:
        return "Other"
    
def get_stats2(df):
    df["Test Type"] = df["activity"].apply(get_test_type)
    df["Rule Type"] = df["type"].apply(get_rule_type)
    
    #return df.groupby("Test Type")["Rule Type"].value_counts()
    
    return pd.pivot_table(df, index="Test Type", columns="Rule Type", values="category", aggfunc="count", margins=True).fillna(0).astype(int).T

In [None]:
t = df.apply(get_spec, axis=1, result_type="expand")
get_overview(t)

In [None]:
m = get_stats2(t)
display(m)
m.to_latex("output/test_matrix.tex", escape=False)

In [None]:
# DB name: paper name
test_to_paper = {
    'STS_header_after_upgrade_insecure_requests': 'STS after UIR',
    'redirect_after_upgrade_insecure_requests': 'Redirect after HTTP-UIR',
    'code_405_allow': 'Allow header present for 405',
    'head_get_headers': 'HEAD and GET same headers',
    'accept_patch_presence': 'Accept-Patch if PATCH supported',
    'cookie_IMF_fixdate': 'Cookies use IMF-fixdate',
    'content_length_same_head_get': 'Content-Length for HEAD=GET',
    'post_invalid_response_codes': 'Forbidden status-codes for POST',
    'code_304_headers': 'Same headers for 304 and 200',
    'date_header_required': 'Date header required',
    'expires_grammar': 'Experies ABNF',
    'sts_header_http': 'STS not allowed for HTTP',
    'duplicate_cookies': 'Duplicate cookie names',
    'duplicate_fields': 'Duplicate headers',
    'content_type_header_required': 'Content-Type header required',
    'xfo_grammar': 'XFO ABNF',
    'code_206_headers': 'Mandatory headers for 206',
    'etag_grammar': 'Etag ABNF',
    'cookie_grammar': 'Set-Cookie ABNF',
    'access_control_allow_origin_grammar': 'ACAO ABNF',
    'only_one_sts_header_allowed': 'Duplicate STS',
    'duplicate_csp': 'Duplicate CSP',
    'accept_patch_grammar': 'Accept-Patch ABNF',
    'code_401_www_authenticate': 'WWW-Authenticate required for 401',
    'content_length_1XX_204': 'Forbidden Content-Length for 1XX and 204',
    'last_modified_grammar': 'Last-Modified ABNF',
    'field_value_start_or_end_with_whitespace': 'Forbidden surrounding whitespace for fields',
    'content_type_grammar': 'Content-Type ABNF',
    'code_304_no_content': 'Forbidden content for 304',
    'server_grammar': 'Server ABNF',
    'date_grammar': 'Date ABNF',
    'access_control_allow_credentials_grammar': 'ACAC ABNF',
    'vary_grammar': 'Vary ABNF',
    'content_length_same_304_200': 'Content-Length for 304=200',
    'csp_grammar': 'CSP ABNF',
    'duplicate_cookie_attribute': 'Cookies with duplicate attributes',
    'age_grammar': 'Age ABNF',
    'cache_control_grammar': 'Cache-Control ABNF',
    'content_language_grammar': 'Content-Language ABNF',
    'access_control_allow_methods_grammar': 'ACAM ABNF',
    'permissions_policy_grammar': 'PermissionsPolicy ABNF',
    'access_control_allow_headers_grammar': 'ACAH ABNF',
    'sts_grammar': 'STS ABNF',
    'xcto_grammar': 'XCTO ABNF',
    'code_416_content_range': 'Content-Range required for 416',
    'code_302_location': 'Location required for 302',
    'duplicate_csp_ro': 'Duplicate CSP-RO',
    'coop_grammar': 'COOP ABNF',
    'code_407_proxy_authenticate': 'Proxy-Authenticate required for 407',
    'code_415_unsupported_media_type': 'Missing required headers for 415',
    'server_header_long': 'Overly long Server header',
    'location_header_grammar': 'Location ABNF',
    'content_length_grammar': 'Content-Length ABNF',
    'access_control_max_age_grammar': 'ACMA ABNF',
    'corp_grammar': 'CORP ABNF',
    'allow_grammar': 'Allow ABNF',
    'connection_grammar': 'Connection ABNF',
    'code_301_location': 'Location required for 301',
    'code_303_location': 'Location required for 303',
    'response_directive_no_cache': 'Forbidden token form in no-cache directive',
    'transfer_encoding_http11': 'TE forbidden for non HTTP/1.1 responses',
    'sts_directives_only_allowed_once': 'Duplicate directives for STS',
    'code_206_content_range': 'Content-Range required for 206',
    'send_upgrade_101': 'Upgrade required for 101',
    'coep_grammar': 'COEP ABNF',
    'no_transfer_encoding_1xx_204': 'TE forbidden for 1XX and 204',
    'range_grammar': 'Range ABNF',
    'code_307_location': 'Location required for 307',
    # Direct tests
    'reject_fields_contaning_cr_lf_nul': 'Illegal chars',
    'code_400_after_bad_host_request': 'Bad host',
    'reject_msgs_with_whitespace_between_startline_and_first_header_field': 'Illegal whitespace after startline',
    'code_400_if_msg_with_whitespace_between_header_field_and_colon': 'Illegal whitespace in header name',
    'allow_crlf_start': 'Allow CRLF prior to request line',
    'code_501_unknown_methods': 'Unknown methods should result in 501',
    'code_405_blocked_methods': 'Blocked methods should result in 405',
}

paper_to_test = {val:key for key, val in test_to_paper.items()}
broken_all = broken_local_05_09 + broken_wild_05_04
broken_all = [test_name.split("_", maxsplit=2)[2] for test_name in broken_all]

In [None]:
# Group of potential dangerous/security relevant tests
hot_d = ["Bad host"]

hrs_p_d = ['Illegal chars', 'Illegal whitespace after startline', 'Illegal whitespace in header name']
hrs_p = ['Forbidden Content-Length for 1XX and 204', 'Forbidden surrounding whitespace for fields', 'Forbidden content for 304', 'Content-Length ABNF', 'TE forbidden for non HTTP/1.1 responses', 'TE forbidden for 1XX and 204', 'Upgrade required for 101']

sts_mitm = ['STS after UIR', 'Redirect after HTTP-UIR']
sts_other = ['STS not allowed for HTTP', 'STS ABNF', 'Duplicate directives for STS']

# Broken ABNF or used in HTTP while only allowed in HTTPS or similar, broken cookies can lead to strange behavior, STS with duplicate directives is invalid (although not breaking the ABNF)
restrictive = ['XFO ABNF', 'CSP ABNF', 'PermissionsPolicy ABNF', 'COOP ABNF', 'CORP ABNF', 'COEP ABNF', 'Duplicate CSP', 'Duplicate CSP-RO']
cors_issues = ['ACAO ABNF', 'ACAC ABNF', 'ACAM ABNF', 'ACAH ABNF', 'ACMA ABNF']

# Some are security relevant (such as duplicate STS, XFO, ...)
duplicate_fields = ['Duplicate headers']

content_sniffing = ['Content-Type header required', 'Content-Type ABNF', 'XCTO ABNF']
cookies = ['Set-Cookie ABNF', 'Duplicate cookie names', 'Cookies use IMF-fixdate', 'Cookies with duplicate attributes']


dangerous_broken = hot_d + hrs_p_d + hrs_p + sts_mitm + sts_other + restrictive + cors_issues + duplicate_fields + content_sniffing + cookies
dangerous_broken = [paper_to_test[key] for key in dangerous_broken]

# Unbroken
dangerous_not_broken = ["content_head_request", "continue_before_upgrade", 
                        "send_upgrade_426", "switch_protocol_without_client",
                       "csp_ro_grammar", "access_control_expose_headers_grammar",
                       "content_encoding_grammar", "transfer_encoding_grammar",
                       "code_101_not_allowed_in_http2", "code_204_no_additional_content", "code_205_no_content_allowed",
                       "no_bare_cr", "field_name_nonvisible_asciichars", "field_name_colon_except_for_pseudo_header_fields",
                       "field_value_zero_value_lf_cr", "content_length_2XX_connect", "transfer_encoding_2XX_connect",
                       "sts_max_age"]

dangerous_all = set(dangerous_broken + dangerous_not_broken)

print("Dangerous broken:", len(dangerous_broken), dangerous_broken)
print("\nDangerous not broken:", len(dangerous_not_broken), dangerous_not_broken)
print("\nDangerous all:", len(dangerous_all))

not_dangerous_broken = set(broken_all) - set(dangerous_all)
not_dangerous_not_broken = (set(t["function_name"]) - set(dangerous_all)) - set(broken_all)
not_dangerous_all = not_dangerous_broken | not_dangerous_not_broken

print("\nNot dangerous broken:", len(not_dangerous_broken), not_dangerous_broken)
print("\nNot dangerous not broken:", len(not_dangerous_not_broken), not_dangerous_not_broken)
print("\nNot dangerous all:", len(not_dangerous_all))

print("\nBroken:", len(broken_all))
print("\nAll:", len(dangerous_all | not_dangerous_all))