In [1]:
import os
import re
import spacy
from spacy.matcher import Matcher
from spacy.tokens import Token
import data_from_research
import evaluation
import methods

In [2]:
! pip install SpaCy
! python3 -m spacy download en

Defaulting to user installation because normal site-packages is not writeable
[38;5;3m⚠ As of spaCy v3.0, shortcuts like 'en' are deprecated. Please use the
full pipeline package name 'en_core_web_sm' instead.[0m
Defaulting to user installation because normal site-packages is not writeable
Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m0:01[0m01[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')


In [3]:
# Directory with policies txt
directory = './dataset/certs/policies/txt'

KEY_PHRASES = [
    "Compiled into binary",
    "Compiled in the binary",
    "statically stored in the code",
    "Hard Coded",
    "generated external to the module",
    "Stored in flash",
    "Static key Stored in the firmware",
    "Entered in factory",
    "in tamper protected memory",
    "With the exception of DHSK and the RNG seed, all CSPs are loaded at factory.",
    "Static N/A",
    "Embedded in FLASH",
    "Injected During Manufacture",
    "Hard-coded in the module"
]
# Compile regular expressions
KEY_PATTERNS = [re.compile(phrase.lower()) for phrase in KEY_PHRASES]

In [5]:
from sec_certs.dataset.fips import FIPSDataset
from sec_certs.sample import FIPSCertificate
dset: FIPSDataset = FIPSDataset.from_web_latest()

Downloading FIPS Dataset: 100%|████████████| 59.1M/59.1M [00:09<00:00, 6.23MB/s]


**Function checks if a policy contains one of key phrases mentioned in the research.**

In [4]:
# Searches literally phrases from research; 319 results found.
# The problem point is to determine that a key phrase corresponds to X9.31 keys.
def process_text_stupid(text):
    text = text.replace('\n', ' ')
    text = text.lower()
    patterns = ['x9.31', 'cryptographic keys', 'all keys' ]
    regex = re.compile(r'\b(' + '|'.join(patterns) + r')\b')
    for idx in re.finditer(regex, text):
        for regex_pattern in KEY_PATTERNS:
            if regex_pattern.search(text[idx.start():idx.start()+200]):
                print(regex_pattern)
                return True
    return False

**Compare different ways to check if X9.31 is used**


**Way 1.** Fetch algorithms which vulnerable products from research are using and check if each product is using one of these algorithms.

In [15]:
sample = data_from_research.get_vulnerable_products_from_research(dset)
x931_algos = data_from_research.get_x931_algorithms_from_research(dset)
print("Used X9.31 algorithms: ", x931_algos)
stupid = []

for cert in dset:
    x931_algo = ""
    for algo in cert.heuristics.algorithms:
        if "RNG" in algo:
            x931_algo = algo
    if x931_algo not in x931_algos:
        continue
    dgst = cert.dgst
    policy_path = os.path.join(directory, dgst + ".txt")
    with open(policy_path, 'r') as file:
        if methods.process_text_stupid(file.read()):
            stupid.append(dgst)

print("Vulnerable products found: ", len(stupid), "/", len(sample))
print("True positives: ", len(evaluation.get_true_positives(stupid, sample)))

36
['77b108f25becb7b9', '0dc732c14f4b4da3', '3c365ff931ecb0e3', 'ccc6742d528e7ca2', '065f4fa6723db908', '909f911ba355637d', '5f93230da0c7dd66', '501a4e61aa4f7737', '0d866ba9f9fd0f2c', '729830e85be541a1', '7a99d42d79e9bafe', 'cebe3d9d614ba5c1', '12ef84e911f067ff', '290a0b92873bdf4e', 'b62764dc19af7e6a', '6fa893bfb00e234a', '4332cd76590d0efd', '9988ad0a66f28fe8', '9f7d97be4c6d7f20', 'b32523d903bfe235', 'ad608cd856711cb4', '46372791018924b8', '9e6296be2bb963df', 'aabe8732aabad9a7', 'da853efbaaddae47', 'f12c70c1cdb1ce6b', '11f8e31ccbdbb7d9', '22fc91dc876c50db', '59f6c59d4e41b4a7', '327e892542e0f409', '1bee34b6262a7777', 'eaaa9a5b38786a0f', '57c03210be824f7a', '87a4a78ecb6deb2d', 'd88c39de46401a31', '59933d776452c8a5']
36
['77b108f25becb7b9', '0dc732c14f4b4da3', '3c365ff931ecb0e3', 'ccc6742d528e7ca2', '065f4fa6723db908', '909f911ba355637d', '5f93230da0c7dd66', '501a4e61aa4f7737', '0d866ba9f9fd0f2c', '729830e85be541a1', '7a99d42d79e9bafe', 'cebe3d9d614ba5c1', '12ef84e911f067ff', '290a0b92873

**Way 2.** Literally check "X9.31" pattern

In [16]:
sample = data_from_research.get_vulnerable_products_from_research(dset)
stupid = []

for cert in dset:
    dgst = cert.dgst
    policy_path = os.path.join(directory, dgst + ".txt")
    with open(policy_path, 'r') as file:
        content = file.read()
        if "x9.31" not in content.lower():
            continue
        if methods.process_text_stupid(content):
            stupid.append(dgst)

print("Vulnerable products found: ", len(stupid), "/", len(sample))
print("True positives: ", len(evaluation.get_true_positives(stupid, sample)))

36
['77b108f25becb7b9', '0dc732c14f4b4da3', '3c365ff931ecb0e3', 'ccc6742d528e7ca2', '065f4fa6723db908', '909f911ba355637d', '5f93230da0c7dd66', '501a4e61aa4f7737', '0d866ba9f9fd0f2c', '729830e85be541a1', '7a99d42d79e9bafe', 'cebe3d9d614ba5c1', '12ef84e911f067ff', '290a0b92873bdf4e', 'b62764dc19af7e6a', '6fa893bfb00e234a', '4332cd76590d0efd', '9988ad0a66f28fe8', '9f7d97be4c6d7f20', 'b32523d903bfe235', 'ad608cd856711cb4', '46372791018924b8', '9e6296be2bb963df', 'aabe8732aabad9a7', 'da853efbaaddae47', 'f12c70c1cdb1ce6b', '11f8e31ccbdbb7d9', '22fc91dc876c50db', '59f6c59d4e41b4a7', '327e892542e0f409', '1bee34b6262a7777', 'eaaa9a5b38786a0f', '57c03210be824f7a', '87a4a78ecb6deb2d', 'd88c39de46401a31', '59933d776452c8a5']
36   319
22


**Way 3.** Get all possible X9.31 implementations???

In [23]:
# Current version is incomplete and noisy("RNG" doesn't nessesaryly mean X9.31)
x931_all_options = x931_algos
for cert in dset:
    policy_path = os.path.join(directory, cert.dgst + ".txt")
    with open(policy_path, 'r') as file:
        content = file.read()
        if "x9.31" in content.lower():
            for algo in cert.heuristics.algorithms:
                if "RNG" in algo:
                    x931_all_options.add(algo)
print("Number of versions: ", len(x931_all_options))
print(x931_all_options)

747
{'RNG#635', 'RNG#1147', 'RNG#624', 'RNG#91', 'RNG#722', 'RNG#1115', 'RNG#1000', 'RNG#64', 'RNG#376', 'RNG#951', 'RNG#992', 'RNG#400', 'RNG#1185', 'RNG#227', 'RNG#725', 'RNG#273', 'RNG#470', 'RNG#1262', 'RNG#804', 'RNG#1135', 'RNG#509', 'RNG#1071', 'RNG#388', 'RNG#1165', 'RNG#535', 'RNG#1100', 'RNG#612', 'RNG#749', 'RNG#301', 'RNG#368', 'RNG#466', 'RNG#382', 'RNG#379', 'RNG#97', 'RNG#1215', 'RNG#608', 'RNG#1173', 'RNG#932', 'RNG#277', 'RNG#1300', 'RNG#975', 'RNG#778', 'RNG#854', 'RNG#331', 'RNG#58', 'RNG#436', 'RNG#33', 'RNG#1061', 'RNG#719', 'RNG#643', 'RNG#49', 'RNG#132', 'RNG#844', 'RNG#30', 'RNG#1008', 'RNG#34', 'RNG#1153', 'RNG#1281', 'RNG#826', 'RNG#61', 'RNG#628', 'RNG#183', 'RNG#511', 'RNG#925', 'RNG#1275', 'RNG#597', 'RNG#244', 'RNG#394', 'RNG#423', 'RNG#1255', 'RNG#965', 'RNG#346', 'RNG#337', 'RNG#760', 'RNG#902', 'RNG#771', 'RNG#900', 'RNG#16', 'RNG#1033', 'RNG#500', 'RNG#123', 'RNG#1258', 'RNG#1192', 'RNG#406', 'RNG#505', 'RNG#971', 'RNG#1246', 'RNG#319', 'RNG#1150', 'RN

In [95]:
sample = data_from_research.get_vulnerable_products_from_research(dset)
print("Used algos: ", x931_algos)
stupid = []

for cert in dset:
    x931_algo = ""
    for algo in cert.heuristics.algorithms:
        if "RNG" in algo:
            x931_algo = algo
    #if x931_algo not in x931_all_options:
    #    continue
    dgst = cert.dgst
    policy_path = os.path.join(directory, dgst + ".txt")
    with open(policy_path, 'r') as file:
        if process_text_stupid(file.read()):
            print(dgst)
            stupid.append(dgst)

print(len(sample), " ", len(stupid))
print(evaluation.get_true_positives(stupid, sample))

36
['77b108f25becb7b9', '0dc732c14f4b4da3', '3c365ff931ecb0e3', 'ccc6742d528e7ca2', '065f4fa6723db908', '909f911ba355637d', '5f93230da0c7dd66', '501a4e61aa4f7737', '0d866ba9f9fd0f2c', '729830e85be541a1', '7a99d42d79e9bafe', 'cebe3d9d614ba5c1', '12ef84e911f067ff', '290a0b92873bdf4e', 'b62764dc19af7e6a', '6fa893bfb00e234a', '4332cd76590d0efd', '9988ad0a66f28fe8', '9f7d97be4c6d7f20', 'b32523d903bfe235', 'ad608cd856711cb4', '46372791018924b8', '9e6296be2bb963df', 'aabe8732aabad9a7', 'da853efbaaddae47', 'f12c70c1cdb1ce6b', '11f8e31ccbdbb7d9', '22fc91dc876c50db', '59f6c59d4e41b4a7', '327e892542e0f409', '1bee34b6262a7777', 'eaaa9a5b38786a0f', '57c03210be824f7a', '87a4a78ecb6deb2d', 'd88c39de46401a31', '59933d776452c8a5']
Used algos:  {'RNG#635', 'RNG#1147', 'RNG#624', 'RNG#91', 'RNG#722', 'RNG#1115', 'RNG#1000', 'RNG#64', 'RNG#376', 'RNG#951', 'RNG#992', 'RNG#400', 'RNG#1185', 'RNG#227', 'RNG#725', 'RNG#273', 'RNG#470', 'RNG#1262', 'RNG#804', 'RNG#1135', 'RNG#509', 'RNG#1071', 'RNG#388', 'RNG

**Function checks if a policy contains one of key phrases using spacy for more advanced pattern detection**

In [13]:
# TODO: Replace +20 tokens context with spacy dependencies
# TODO: Add negated sentences detection
def process_text_spacy(text):
    nlp = spacy.load("en_core_web_sm")
    text = text.replace('\n', ' ')
    
    doc = nlp(text)

    patterns_algo = ["x9.31", "all keys", "cryptografic keys"]
    patterns_verbs = ["stored", "written", "compiled", "injected", "entered", "loaded", "embedded", "hard-coded"]
    patterns_places = ["flash", "binary", "firmware", "module", "factory"]
    patterns_nouns = ["randomseed", "seed", "key"]
    patterns_adverbs = ["statically"]

    patterns = [
        
        ["static"],
        
        ["hard", "coded"],
        
        ["generated", "external", "module"],

        ["tamper", "memory"],
        
        ["injected", "during", "manufacture"]
    ]

    found_flag = False
    for idx in range(len(doc)):
        if doc[idx].text.lower() in patterns_algo:
            context = doc[idx:idx+60].text.lower()
            if any(part in context for part in patterns_verbs) and any(part in context for part in patterns_places):
                found_flag = True
            if any(part in context for part in patterns_nouns) and any(part in context for part in patterns_verbs)  and any(part in context for part in patterns_adverbs):
                found_flag = True
            for phrase in patterns:
                if all(part in context for part in phrase):
                    found_flag = True
                    break
    return found_flag
    
    """negation_keywords = ["not", "without", "exception", "except", "but"]
    
    found_flag = False
    
    for token in doc:
        if token.text.lower() in negation_keywords:
            for child in token.children:
                if any(child.text.lower() in phrase for phrase in x931_phrases):
                    found_flag = False
                    break
        
        if any(token.text.lower() == phrase_part for phrase in x931_phrases for phrase_part in phrase):
            for phrase in x931_phrases:
                if all(part in doc.text.lower() for part in phrase):
                    found_flag = True
                    break
    
    return found_flag"""

text = "The product utilizes the X9.31 ANSI algorithm keys hard coded."
mention_found = process_text_spacy(text)
print("Mention found:", mention_found)

Mention found: True


**Function is using spacy Matcher for pattern detection**

In [6]:
# TODO: Add different word order in patterns
# TODO: Add negation handling
def process_text_spacy_matcher(text):
    nlp = spacy.load("en_core_web_sm")
    
    doc = nlp(text)

    matcher = Matcher(nlp.vocab)

    patterns = [
        [{"LOWER": {"IN": ["stored", "written", "compiled", "injected", "entered", "loaded", "embedded", "hard-coded"]}}, {"OP": "*"}, {"LOWER": {"IN": ["flash", "binary", "firmware", "module", "factory", "code"]}}],
   
        [{"LOWER": {"IN": ["randomseed", "seed", "key"]}}, {"OP": "*"}, {"LOWER": {"IN": ["stored", "written", "compiled", "injected", "entered", "loaded", "embedded", "hard-coded"]}}, {"OP": "*"}, {"LOWER": {"IN": ["statically"]}}],
        
        [{"LOWER": {"IN": ["static"]}}, {"OP": "*"}, {"LOWER": {"IN": ["randomseed", "seed", "key"]}}],
        
        [{"LOWER": "hard"}, {"LOWER": "coded"}],
        
        [{"LOWER": "generated"}, {"OP": "*"}, {"LOWER": "external"}],
        
        [{"LOWER": "injected"}, {"OP": "*"}, {"LOWER": "manufacture"}],
        #[{"LOWER": "hard"}, {"LOWER": "coded"},  {"LOWER": {"IN": ["flash", "binary", "firmware", "module", "factory"]}}],
    ]

    for pattern in patterns:
        matcher.add("X931_PATTERN", [pattern])

    # bad approach because of "table" format
    """sentences = list(doc.sents)
    for idx in range(len(sentences)):
        if any(token.text.lower() == "x9.31" for token in sentences[idx]):
            for nxt_idx in range(idx, min(len(sentences), idx + 2)):
                if matcher(sentences[nxt_idx]):
                    return True
    """
    for idx in range(len(doc)):
        if doc[idx].text.lower() == "x9.31":
            context = doc[idx:idx+20]
            if matcher(context):
                return True
    return False

text = "The product utilizes the X9.31 ANSI algorithm H. The Injected During Manufacture hhh"
mention_found = process_text_spacy_matcher(text)
print("Mention found:", mention_found)

Mention found: True


In [14]:
# Get vulnerable products with process_text_spacy() function
sample = data_from_research.get_vulnerable_products_from_research(dset)
found = []

for cert in dset:
    dgst = cert.dgst
    policy_path = os.path.join(directory, dgst + ".txt")
    with open(policy_path, 'r') as file:
        content = file.read()
        if "x9.31" not in content.lower():
            continue
        if process_text_spacy(content):
            found.append(dgst)

print("Vulnerable products found with spacy: ", len(stupid), "/", len(sample))
print("True positives: ", len(evaluation.get_true_positives(stupid, sample)))

36
['77b108f25becb7b9', '0dc732c14f4b4da3', '3c365ff931ecb0e3', 'ccc6742d528e7ca2', '065f4fa6723db908', '909f911ba355637d', '5f93230da0c7dd66', '501a4e61aa4f7737', '0d866ba9f9fd0f2c', '729830e85be541a1', '7a99d42d79e9bafe', 'cebe3d9d614ba5c1', '12ef84e911f067ff', '290a0b92873bdf4e', 'b62764dc19af7e6a', '6fa893bfb00e234a', '4332cd76590d0efd', '9988ad0a66f28fe8', '9f7d97be4c6d7f20', 'b32523d903bfe235', 'ad608cd856711cb4', '46372791018924b8', '9e6296be2bb963df', 'aabe8732aabad9a7', 'da853efbaaddae47', 'f12c70c1cdb1ce6b', '11f8e31ccbdbb7d9', '22fc91dc876c50db', '59f6c59d4e41b4a7', '327e892542e0f409', '1bee34b6262a7777', 'eaaa9a5b38786a0f', '57c03210be824f7a', '87a4a78ecb6deb2d', 'd88c39de46401a31', '59933d776452c8a5']
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
b8a137cb2c0a7f90
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
b20a51d1d0cd4d4e
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7eb2419377f0>>
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 770, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


1
1
1
1
6e8d1a943f129896
1
1
1
8a8b2d66735ed03d
1
1
1


KeyboardInterrupt: 