# Analysis of IoT-Related CVEs and Their Exploitation by APTs

In [4]:
import os # For traversing and reading folders and files
import json # For reading and extracting data from CVE records
import re # Handle regex patterns
import xml.etree.ElementTree as ET # For reading and extracting data from CWE records
import pandas as pd # For data cleaning and analysis
import numpy as np # For advanced calculations
import matplotlib.pyplot as plt # For data visualization

## Data Collection
### Extracting Data from the CVE List
There are precisely $260,896$ records in the CVE list; each is represented by a single JSON file. These files contain all the information (though lots of it is incomplete) that will populate our `CVE_df` dataframe. It was clear after creating the initial script that massive discrepencacies existed between the files' structures. This makes it especially complicated to reliably retrieve desired values, but I managed to create a recursive function that performs various data retrieval processes based on the kind of data type the object holds at every level on its way to pull out the desired info. Based on the CVE List schema, I figured that the most important information—and really, the only kinds of information contained in the files that would be useful to us—would be the CVE ID, the CWE ID, the description, the date the record was published, a categorical and numerical severity score, the vector of attack used in the CVE, and attack complexity.



In [70]:
def search(obj, key_to_find, parent_key=None, expected_type=None):
    # If the object is not a dictionary or list, end search
    if not isinstance(obj, (dict, list)):
        return None
    # If the current object is a dictionary
    if isinstance(obj, dict):
        # Check if current dictionary has the parent key
        if parent_key and parent_key in obj:
            parent_value = obj[parent_key]
            result = search(parent_value, key_to_find, None, expected_type)
            if result is not None:
                return result
        # If no parent key is specified
        elif not parent_key and key_to_find in obj:
            value = obj[key_to_find]
            if expected_type is None or isinstance(value, expected_type):
                return value
        # If value is not expected type
        for value in obj.values():
            result = search(value, key_to_find, parent_key, expected_type)
            if result is not None:
                return result
    # If the current object is a list
    elif isinstance(obj, list):
        for item in obj:
            result = search(item, key_to_find, parent_key, expected_type)
            if result is not None:
                return result
    # If no value was found
    return None


def extract_data(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            cve = json.load(file)

            cve_id = search(cve, 'cveId')
            cwe_id = search(cve, 'cweId')
            cve_state = search(cve, 'state')
            date_published = search(cve, 'datePublished')
            description = search(cve, 'value', parent_key='descriptions')
            severity = search(cve, 'baseSeverity')
            severity_score = search(cve, 'baseScore')
            attack_vector = search(cve, 'attackVector')
            attack_complexity = search(cve, 'attackComplexity')

            return {
                'cve_id': cve_id,
                'cwe_id': cwe_id,
                'cve_state': cve_state,
                'date_published': date_published,
                'description': description,
                'severity': severity,
                'severity_score': severity_score,
                'attack_vector': attack_vector,
                'attack_complexity': attack_complexity
            }

    except json.JSONDecodeError as e:
        print(f'Error reading {file_path}: {e}')
    except Exception as e:
        print(f'Unexpected error reading {file_path}: {e}')
    return None

def process_files(base_dir):
    data = []
    total_files = sum(len(files) for _, _, files in os.walk(base_dir))
    file_count = 0
    print(total_files)

    for root, dirs, files in os.walk(base_dir):
        for file_name in files:
            if file_name.endswith('.json'):
                file_path = os.path.join(root, file_name)
                cve_data = extract_data(file_path)

                if cve_data:
                    data.append(cve_data)

                file_count += 1
                print(f'File {file_count} of {total_files}: {file_count / total_files * 100:.2f}%', end='\r')

    print('\nAll files processed!')
    df = pd.DataFrame(data)
    return df

base_dir = '../data/CVE_V5/cvelistV5-main/cves'
df = process_files(base_dir)

260894
File 260894 of 260894: 100.00%
All files processed!


### Extracting Data from the CWE List
The CWE list contains lots of information that could be pertinant to our analysis that's relatively easy to access. Among these I counted the CWE IDs, names, descriptions, related CWEs, the nature of these relationships, the technological vector (web-based, network, etc.), background details, the phases of development wherein the weakness could be introduced, descriptions about these unfortunate events, the likelihood of these weaknesses, the scope and impact of the weakness's common consequences, detection methods and their descriptions and effectiveness, mitigation strategies, potential vulnerabilities, observed vulnerabilities (with direct references to CVE IDs), and more. Because of the observed vulnerabilities, we can merge together these two dataframes and clean and filter the resultant so that we have a comprehensive dataset to work with. Because there's so much text data in the CWE list, we should have plenty of information to feed into a matural language processing pipeline that can help build a classification model to predict the association of a given CVE with the IoT and it's various vulnerabilities.

I created several helper functions to get us from `A` to `B`. The first, I can just feed the XML file. Next, we have a function that takes an element and takes all of the text data inside of it and it's children nodes and returns it as a long sentence. This kind of column data will be useful when we tokenize and lemmatize the text to more accurately search for our desire IoT-vulnerability-related keywords.

In [71]:
def readXML(file_path):
    try:
        tree = ET.parse(file_path)
        root = tree.getroot()
        return root
    except ET.ParseError as e:
        print(f'Error parsing {file_path}: {e}')
        return None

def extract_txt_from_el(element):
    if element is None:
        return ''
    txt_parts = [element.text or '']
    for child in element:
        txt_parts.append(extract_txt_from_el(child))
        if child.tail:
            txt_parts.append(child.tail)
    return ' '.join(filter(None, txt_parts)).strip()

def extract_data(root):
    data = []
    ns = {
        'ns': 'http://cwe.mitre.org/cwe-7',
        'xhtml': 'http://www.w3.org/1999/xhtml',
        'xsi': 'http://www.w3.org/2001/XMLSchema-instance'
    }
    weaknesses = root.findall('.//ns:Weakness', ns)

    # Debugging
    if not weaknesses:
        print('No Weaknesses found')
        return data

    for weakness in weaknesses:
        # Basic info
        cwe_id = weakness.get('ID')
        name = weakness.get('Name')
        desc = extract_txt_from_el(weakness.find('ns:Description', ns))
        ext_desc = extract_txt_from_el(weakness.find('ns:Extended_Description', ns))
        description = f'{desc} {ext_desc}'.strip()

        # Related weakness info
        rel_ids, nature_of_rels = [], []
        related_weaknesses = weakness.find('ns:Related_Weaknesses', ns)
        if related_weaknesses is not None:
            for rel_weakness in related_weaknesses.findall('ns:Related_Weakness', ns):
                rel_id = rel_weakness.get('CWE_ID')
                nature = rel_weakness.get('Nature')
                rel_ids.append(rel_id)
                nature_of_rels.append(nature)

        # Technical info
        tech_el = weakness.find('.//ns:Technology', ns)
        tech_class = tech_el.get('Class') if tech_el is not None else ''
        bg_details = extract_txt_from_el(weakness.find('ns:Background_Details/ns:Background_Detail', ns))

        # Modes of introduction
        modes_of_intro_phases, modes_of_intro_descs = [], []
        modes_of_intro = weakness.find('ns:Modes_Of_Introduction', ns)
        if modes_of_intro is not None:
            for mode in modes_of_intro:
                modes_of_intro_phase = mode.find('ns:Phase', ns).text if mode.find('ns:Phase', ns) is not None else ''
                modes_of_intro_note = mode.find('ns:Note', ns).text if mode.find('ns:Note', ns) is not None else ''
                # Append data to the columns
                modes_of_intro_phases.append(modes_of_intro_phase)
                modes_of_intro_descs.append(modes_of_intro_note)

        # Likelihood of exploitation
        likelihood = extract_txt_from_el(weakness.find('ns:Likelihood_Of_Exploit', ns))

        # Consequence info
        scope_of_consequences, impact_of_consequences, consequence_notes = [], [], []
        consequences = weakness.find('ns:Common_Consequences', ns)
        if consequences is not None:
            for consequence in consequences:
                scope = consequence.find('ns:Scope', ns).text if consequence.find('ns:Scope', ns) is not None else ''
                impact = consequence.find('ns:Impact', ns).text if consequence.find('ns:Impact', ns) is not None else ''
                note = consequence.find('ns:Note', ns).text if consequence.find('ns:Note', ns) is not None else ''
                scope_of_consequences.append(scope)
                impact_of_consequences.append(impact)
                consequence_notes.append(note)

        # Detection info
        detect_methods = []
        detect_descs = []
        detect_effectiveness = []
        detection_methods = weakness.find('ns:Detection_Methods', ns)
        if detection_methods is not None:
            for method in detection_methods:
                detect_method = method.find('ns:Method', ns).text if method.find('ns:Method', ns) is not None else ''
                detect_desc = method.find('ns:Description', ns).text if method.find('ns:Description', ns) is not None else ''
                detect_effect = method.find('ns:Effectiveness', ns).text if method.find('ns:Effectiveness', ns) is not None else ''
                # Append data to the columns
                detect_methods.append(detect_method)
                detect_descs.append(detect_desc)
                detect_effectiveness.append(detect_effect)

        # Mitigation info
        mitigate_phases = []
        mitigate_descs = []
        mitigate_effectivenesses = []
        mitigate_notes = []
        potential_mitigations = weakness.find('ns:Potential_Mitigations', ns)
        if potential_mitigations is not None:
            for mitigation in potential_mitigations:
                mitigate_phase = mitigation.find('.//ns:Phase', ns)
                mitigate_phase = mitigate_phase.text if mitigate_phase is not None else ''

                # Build a mitigation description
                mitigate_desc_el = mitigation.find('ns:Description', ns)
                if mitigate_desc_el is not None:
                    desc_parts = []
                    if mitigate_desc_el.text:
                        desc_parts.append(mitigate_desc_el.text.strip())
                    for part in mitigate_desc_el.findall('.//xhtml:p', ns):
                        if part.text:
                            desc_parts.append(part.text.strip())
                    mitigate_descs.append(' '.join(desc_parts))
                else:
                    mitigate_descs.append('')

                mitigate_effectiveness = mitigation.find('ns:Effectiveness', ns)
                mitigate_effectiveness = mitigate_effectiveness.text if mitigate_effectiveness is not None else ''

                mitigate_note = mitigation.find('ns:Effectiveness_Notes', ns)
                mitigate_note = mitigate_note.text if mitigate_note is not None else ''
                # Append data to the columns
                mitigate_phases.append(mitigate_phase)
                mitigate_effectivenesses.append(mitigate_effectiveness)
                mitigate_notes.append(mitigate_note)


        observed_vulnerabilities = []
        vulnerability_descs = []
        observed_examples = weakness.find('ns:Observed_Examples', ns)
        if observed_examples is not None:
            for cve in observed_examples.findall('ns:Observed_Example', ns):
                observed_vulnerability = cve.find('ns:Reference', ns).text if cve.find('ns:Reference', ns) is not None else ''
                vulnerability_desc = cve.find('ns:Description', ns).text if cve.find('ns:Description', ns) is not None else ''
                # Append data to the columns
                observed_vulnerabilities.append(observed_vulnerability)
                vulnerability_descs.append(vulnerability_desc)

        data.append({
            'cwe_id': cwe_id,
            'name': name,
            'description': description,
            'tech_class': tech_class,
            'bg_details': bg_details,
            'rel_ids': rel_ids,
            'nature_of_rels': nature_of_rels,
            'modes_of_intro_phases': modes_of_intro_phases,
            'modes_of_intro_descs': modes_of_intro_descs,
            'likelihood': likelihood,
            'scope_of_consequences': scope_of_consequences,
            'impact_of_consequences': impact_of_consequences,
            'consequence_notes': consequence_notes,
            'detection_method': detect_methods,
            'detection_desc': detect_descs,
            'detection_effectiveness': detect_effectiveness,
            'mitigation_phases': mitigate_phase,
            'mitigation_descs': mitigate_descs,
            'mitigation_effectiveness': mitigate_effectivenesses,
            'mitigate_notes': mitigate_notes,
            'observed_vulnerabilities': observed_vulnerabilities,
            'vulnerability_descs': vulnerability_descs
        })
    return data

file = '../data/CWE_v4.15.xml'
root = readXML(file)
data = extract_data(root)

# Construct the dataframe
df2 = pd.DataFrame(data)


## Saving the Data
Parquet is a file type that streamlines the storage and retrieval of columnar data. I also saved a copy of both dataframes to CSV (a basic type of spreadsheet file). I chose to use the default option `None` for the method's `index` parameter, which saves the index of each record in a special kind of metadata range loop. This means it won't take up the kind of memory it would have if the index was actually saved into the dataframe as a separate attribute, but also provides a way to keep track of the records for the purposes of splitting them up between training, test, and validation sets for an ML algorithm should our work come to that.

In [75]:
# Converting the severity score to a proper numerical value so that parquet can
# save the file correctly
try:
    df['severity_score'] = pd.to_numeric(df['severity_score'], errors='coerce')
except ValueError as e:
    print(f'Unable to convert severity score: {e}')


In [81]:
# Saving CVE list
df.to_parquet(path='../data/CVE_V5/CVE_List.parquet', index=None)
df.to_csv('../data/CVE_V5/CVE_List.csv', index=None)

# Saving CWE list
df2.to_parquet(path='../data/CWE_List.parquet', index=None)
df2.to_csv('../data/CVE_V5/CWE_List.csv', index=None)

### Loading the Data
Extracting data from $260,000$ files take far too long to redo every time we want to analyze it. This is why I saved it to parquet and CSV. Once we've loaded in our data, we'll inspect it to ensure its cleanliness, merge them together in interest ways, and construct an comprehensive analysis with descriptive statistics, chi-square independence testing, Spearman's rank correlation, and logistic regression.

In [38]:
def load_data(file):
    try
cves = pd.read_parquet(path='../')
df2[['detection_method', 'detection_desc', 'detection_effectiveness']]
df2[['scope_of_consequences', 'impact_of_consequences', 'consequence_notes']]

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 964 entries, 0 to 963
Data columns (total 21 columns):
 #   Column                    Non-Null Count  Dtype 
---  ------                    --------------  ----- 
 0   cwe_id                    964 non-null    object
 1   name                      964 non-null    object
 2   description               964 non-null    object
 3   tech_class                917 non-null    object
 4   rel_ids                   964 non-null    object
 5   nature_of_rels            964 non-null    object
 6   modes_of_intro_phases     964 non-null    object
 7   modes_of_intro_descs      964 non-null    object
 8   likelihood                964 non-null    object
 9   scope_of_consequences     964 non-null    object
 10  impact_of_consequences    964 non-null    object
 11  consequence_notes         964 non-null    object
 12  detection_method          964 non-null    object
 13  detection_desc            964 non-null    object
 14  detection_effectiveness   

Unnamed: 0,scope_of_consequences,impact_of_consequences,consequence_notes
0,"[Confidentiality, Integrity]","[Read Application Data, Gain Privileges or Ass...","[If the HttpOnly flag is not set, then sensiti..."
1,[Integrity],[Other],[An attacker may ultimately redirect a user to...
2,[Integrity],[Unexpected State],[]
3,[Access Control],[Gain Privileges or Assume Identity],[An attacker can trick a user into performing ...
4,[Confidentiality],[Alter Execution Logic],[The user may be redirected to an untrusted pa...
...,...,...,...
959,"[Confidentiality, Access Control, Access Contr...","[Read Files or Directories, Bypass Protection ...",[The injected code could access restricted dat...
960,"[Confidentiality, Access Control, Access Contr...","[Read Files or Directories, Bypass Protection ...",[The injected code could access restricted dat...
961,[Confidentiality],[Execute Unauthorized Code or Commands],[]
962,[Integrity],[Execute Unauthorized Code or Commands],[The attacker may be able to specify arbitrary...


In [72]:
df2.head(n=20)

Unnamed: 0,cwe_id,name,description,tech_class,bg_details,rel_ids,nature_of_rels,modes_of_intro_phases,modes_of_intro_descs,likelihood,...,consequence_notes,detection_method,detection_desc,detection_effectiveness,mitigation_phases,mitigation_descs,mitigation_effectiveness,mitigate_notes,observed_vulnerabilities,vulnerability_descs
0,1004,Sensitive Cookie Without 'HttpOnly' Flag,The product uses a cookie to store sensitive i...,Web Based,An HTTP cookie is a small piece of data attrib...,[732],[ChildOf],[Implementation],[],Medium,...,"[If the HttpOnly flag is not set, then sensiti...",[Automated Static Analysis],"[Automated static analysis, commonly referred ...",[High],Implementation,[Leverage the HttpOnly flag when setting a sen...,[High],[While this mitigation is effective for protec...,"[CVE-2022-24045, CVE-2014-3852, CVE-2015-4138]",[Web application for a room automation system ...
1,1007,Insufficient Visual Distinction of Homoglyphs ...,The product displays information or identifier...,Web Based,,[451],[ChildOf],"[Architecture and Design, Implementation]",[This weakness may occur when characters from ...,Medium,...,[An attacker may ultimately redirect a user to...,[Manual Dynamic Analysis],"[If utilizing user accounts, attempt to submit...",[Moderate],Implementation,[ Use a browser that displays Punycode for IDN...,"[, ]","[, ]","[CVE-2013-7236, CVE-2012-0584, CVE-2009-0652, ...",[web forum allows impersonation of users with ...
2,102,Struts: Duplicate Validation Forms,The product uses multiple validation forms wit...,,,"[694, 1173, 20]","[ChildOf, ChildOf, ChildOf]",[Implementation],[],,...,[],[],[],[],Implementation,[The DTD or schema validation will not catch t...,[],[],[],[]
3,1021,Improper Restriction of Rendered UI Layers or ...,The web application does not restrict or incor...,Web Based,,"[441, 610, 451]","[ChildOf, ChildOf, ChildOf]",[Implementation],[],,...,[An attacker can trick a user into performing ...,[Automated Static Analysis],"[Automated static analysis, commonly referred ...",[High],Implementation,[ The use of X-Frame-Options allows developers...,"[, , ]","[, , ]","[CVE-2017-7440, CVE-2017-5697, CVE-2017-4015, ...",[E-mail preview feature in a desktop applicati...
4,1022,Use of Web Link to Untrusted Target with windo...,The web application produces links to untruste...,Web Based,,[266],[ChildOf],"[Architecture and Design, Implementation]",[This weakness is introduced during the design...,Medium,...,[The user may be redirected to an untrusted pa...,[Automated Static Analysis],"[Automated static analysis, commonly referred ...",[High],Implementation,[Specify in the design that any linked externa...,"[, , ]","[, , ]",[CVE-2022-4927],"[Library software does not use rel: ""noopener ..."
5,1023,Incomplete Comparison with Missing Factors,The product performs a comparison between enti...,,,[697],[ChildOf],[Implementation],[],,...,[],[],[],[],Testing,[Thoroughly test the comparison scheme before ...,[],[],"[CVE-2005-2782, CVE-2014-6394]",[PHP remote file inclusion in web application ...
6,1024,Comparison of Incompatible Types,The product performs a comparison between two ...,,,[697],[ChildOf],[Implementation],[],,...,[],[],[],[],Testing,[Thoroughly test the comparison scheme before ...,[],[],[],[]
7,1025,Comparison Using Wrong Factors,The code performs a comparison between two ent...,,,[697],[ChildOf],[Implementation],[],,...,[],[],[],[],Testing,[Thoroughly test the comparison scheme before ...,[],[],[],[]
8,103,Struts: Incomplete validate() Method Definition,The product has a validator form that either d...,,The Struts Validator uses a form's validate() ...,"[573, 20]","[ChildOf, ChildOf]",[Implementation],[],,...,[Disabling the validation framework for a form...,[Automated Static Analysis],"[Automated static analysis, commonly referred ...",[High],Implementation,[Implement the validate() method and call supe...,[],[],[],[]
9,1037,Processor Optimization Removal or Modification...,The developer builds a security-critical prote...,,,[1038],[ChildOf],[Architecture and Design],[Optimizations built into the design of the pr...,Low,...,[A successful exploitation of this weakness wi...,[White Box],[In theory this weakness can be detected throu...,[Opportunistic],Implementation,[],[],[],"[CVE-2017-5715, CVE-2017-5753, CVE-2017-5754]","[Intel, ARM, and AMD processor optimizations r..."
