# CWE Analysis
The purpose of this pipeline is to collect metadata about the CWE. In particular view CWE-1003 as it's contains the weaknesses the NVD use for labeling. However we may also take a look at view CWE-1000 for completeness. 

In [None]:
import requests
import json

def req_CWE(url, jfile = None, printj = True) :
    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0'
    }
    try:
        # Make the request
        reply = requests.get(url, headers=headers, timeout=100)
        jreply = reply.json()

        if not printj :
            return jreply
        # Write the JSON response to a file
        if jfile is None : 
            print(json.dumps(jreply, indent=4))
        else :
            with open('tmp/' + jfile, 'w') as file:
                json.dump(jreply, file, indent=4)  # Write JSON data to the file with indentation
            print("Data successfully written to 'tmp/" + jfile + "'")
        return jreply
    except Exception as exp:
        # Handle exceptions and write error details to the file
        if jfile is None  :
            print(f"Exception -> {exp}")
            print(f"Request status -> {reply.status_code}")
        else :
            with open('tmp/' + jfile, 'w') as file:
                error_message = {
                    "error": str(exp),
                    "status_code": reply.status_code if 'reply' in locals() else None
                }
                json.dump(error_message, file, indent=4)
            print(f"Exception occurred. Details written to 'tmp/" + jfile + "'")

"\nurl_view1003 = 'https://cwe-api.mitre.org/api/v1/cwe/view/1003'\nurl_view1003_des = 'https://cwe-api.mitre.org/api/v1/cwe/74/descendants?view=1003'\nurl_view1003_children = 'https://cwe-api.mitre.org/api/v1/cwe/74/children?view=1003'\nurl_CWE_20_74_inf = 'https://cwe-api.mitre.org/api/v1/cwe/weakness/20,74?view=1003'\n#url_CWE_all1003_inf = 'https://cwe-api.mitre.org/api/v1/cwe/weakness?view=1003'\nurl_leaf1284 = 'https://cwe-api.mitre.org/api/v1/cwe/1284/children?view=1003'\nprint(len(req_CWE(url_view1003, 'view_CWE-1003.json')['Views'][0]['Members']))\nreq_CWE(url_view1003_des, 'view_CWE-1003_desc_CWE-74.json')\nreq_CWE(url_view1003_children, 'view_CWE-1003_children_CWE-74.json')\nreq_CWE(url_CWE_20_74_inf, 'CWE-20,74_weakness.json')\n#req_CWE(url_CWE_all1003_inf, 'CWE-1003_weakness.json')\nreq_CWE(url_leaf1284, 'url_leaf1284.json')\n"

Random tests to get a view into the CWE API and database.

In [None]:

# temporary tests
url_view1003 = 'https://cwe-api.mitre.org/api/v1/cwe/view/1003'
url_view1003_des = 'https://cwe-api.mitre.org/api/v1/cwe/74/descendants?view=1003'
url_view1003_children = 'https://cwe-api.mitre.org/api/v1/cwe/74/children?view=1003'
url_CWE_20_74_inf = 'https://cwe-api.mitre.org/api/v1/cwe/weakness/20,74?view=1003'
#url_CWE_all1003_inf = 'https://cwe-api.mitre.org/api/v1/cwe/weakness?view=1003'
url_leaf1284 = 'https://cwe-api.mitre.org/api/v1/cwe/1284/children?view=1003'
print(len(req_CWE(url_view1003, 'view_CWE-1003.json')['Views'][0]['Members']))
req_CWE(url_view1003_des, 'view_CWE-1003_desc_CWE-74.json')
req_CWE(url_view1003_children, 'view_CWE-1003_children_CWE-74.json')
req_CWE(url_CWE_20_74_inf, 'CWE-20,74_weakness.json')
#req_CWE(url_CWE_all1003_inf, 'CWE-1003_weakness.json')
req_CWE(url_leaf1284, 'url_leaf1284.json')

Extracting relevant views using the CWE API

In [7]:
import pickle
def rec_expand(view, cweID) : 
    childrenURL = 'https://cwe-api.mitre.org/api/v1/cwe/' + cweID + '/children?view=' + view
    childrenj = req_CWE(childrenURL, printj=False)
    id_list = []
    for child in childrenj : 
        headID = child['ID']
        id_list.append(headID)
        id_list += rec_expand(view, headID)
    
    return id_list

def get_weaknesses_from_view(view, expand = False, save=False) :
    viewURL = 'https://cwe-api.mitre.org/api/v1/cwe/view/' + view
    weaknessesURL = 'https://cwe-api.mitre.org/api/v1/cwe/weakness/'
    id_list = []
    jview = req_CWE(viewURL, printj=False)
    for member in jview['Views'][0]['Members'] :
        headID = member['CweID']
        id_list.append(headID)
        if expand : 
            id_list += rec_expand(view, headID)
    weaknessesURL += ",".join(id_list)
    
    print("Number of weaknesses extracted:\t", len(id_list))
    #print("resulting API request" + weaknessesURL)

    fsave_name = "view_CWE-" + view + ("_all_weaknesses" if expand else "_head_weaknesses") + ".json"
    jview_complete = req_CWE(weaknessesURL, fsave_name, save)
    return jview_complete

jview1003_complete = get_weaknesses_from_view('1003', True, True)

# Save jview1003_complete using pickle
pickle_file = 'tmp/jview1003_complete.pkl'
with open(pickle_file, 'wb') as file:
    pickle.dump(jview1003_complete, file)
print(f"Data successfully saved to '{pickle_file}' using pickle.")



Number of weaknesses extracted:	 130
Data successfully written to 'tmp/view_CWE-1003_all_weaknesses.json'
Data successfully saved to 'tmp/jview1003_complete.pkl' using pickle.


In [45]:
import pandas as pd
import pickle
import json

pickle_file = 'tmp/jview1003_complete.pkl'
with open(pickle_file, 'rb') as file:
    jview1003_complete = pickle.load(file)


def rec_expand_path(cweID, weaknesses, path_df) :
    if not path_df[path_df['ID'] == cweID].empty:
        return path_df
    rw_list = weaknesses.loc[weaknesses['ID'] == cweID, 'RelatedWeaknesses'].values
    if len(rw_list) == 0 or len(rw_list[0]) == 0 or not isinstance(rw_list[0], list):
        new_row = pd.DataFrame([{'ID': cweID, 'NumPaths': 1, 'Paths': [[cweID]]}])
        path_df = pd.concat([path_df, new_row], ignore_index=True)
        return path_df
    
    rw = rw_list[0]

    if not rw:  
        new_row = pd.DataFrame([{'ID': cweID, 'NumPaths': 1, 'Paths': [[cweID]]}])
        path_df = pd.concat([path_df, new_row], ignore_index=True)
        return path_df

    paths = []  
    num_paths = 0
    for w in rw :
        cid = w['CweID']
        path_df = rec_expand_path(cid, weaknesses, path_df)
        num_paths += path_df[path_df['ID'] == cid].NumPaths.values[0]
        paths += [p + [cweID] for p in path_df[path_df['ID'] == cid].Paths.values[0]]
    
    new_row = pd.DataFrame([{'ID': cweID, 'NumPaths': num_paths, 'Paths': paths}])
    path_df = pd.concat([path_df, new_row], ignore_index=True)
    return path_df

def get_paths(view, df_weaknesses) :
    df_weaknesses['RelatedWeaknesses'] = df_weaknesses['RelatedWeaknesses'].apply(
        lambda weaknesses: [w for w in weaknesses if w['ViewID'] == view and w['Nature'] == 'ChildOf'] 
        if isinstance(weaknesses, list) else [])
    
    #display(df_weaknesses)
    path_df = pd.DataFrame(columns=['ID', 'NumPaths', 'Paths'])
    for cweID in df_weaknesses['ID'] :
        path_df = rec_expand_path(cweID, df_weaknesses, path_df) 
    
    display(path_df)
# Now you can use jview1003_complete as a Python object
#print(json.dumps(jview1003_complete, indent=4))
df_view1003_complete = pd.DataFrame(pd.json_normalize(jview1003_complete["Weaknesses"], sep='_'))
#display(df_view1003_complete[:5])
#display(df_view1003_complete[['ID', 'RelatedWeaknesses']])
get_paths('1003', df_view1003_complete)
#display(df_view1003_complete[:5])
#display(df_view1003_complete[df_view1003_complete['ID'] == '20']['S'])


Unnamed: 0,ID,NumPaths,Paths
0,20,1,[[20]]
1,1284,1,"[[20, 1284]]"
2,129,1,"[[20, 129]]"
3,74,1,[[74]]
4,1236,1,"[[74, 1236]]"
...,...,...,...
125,913,1,[[913]]
126,1321,1,"[[913, 1321]]"
127,470,1,"[[913, 470]]"
128,502,1,"[[913, 502]]"
