In [3]:
# creating nodes for Detection and Assets 

import uuid
import pandas as pd
import xmltodict
import os

os.makedirs(".\graph_csv", exist_ok=True)
scan_path = r".\assets\QualysOutput_3_19_with_TruRisk.xml"

with open(scan_path, "r", encoding="utf-8") as fp:
    scan_data = xmltodict.parse(fp.read())
    
asset_list = scan_data['HOST_LIST_VM_DETECTION_OUTPUT']['RESPONSE']['HOST_LIST']['HOST']

all_assets = list()
all_vulnerabilitiy_detections = list()

rel_asset_to_detection = list()
rel_detection_to_asset = list()

for asset in asset_list:
    # id used for assets is not correct 
    # using uuid as id will make it difficult while adding new scans
    asset_id = asset.get('ID') 
    asset_detail = {
        "asset_id" : asset_id,
        "ip" : asset.get('IP'),
        "os" : asset.get('OS'),
        "hostname": asset['DNS_DATA'].get('HOSTNAME'),
        "domain": asset['DNS_DATA'].get('DOMAIN'),
        "netbios": asset.get('NETBIOS'),
        "last_scan_datetime": asset.get('LAST_SCAN_DATETIME'),
    }
    all_assets.append(asset_detail)

    detections = asset['DETECTION_LIST'].get('DETECTION')
    for detection in detections:
        detection_id = f"detection-{uuid.uuid4()}"
        detection_detail = {
            "detection_id" : detection_id,
            "unique_vendor_detection_id": detection.get('UNIQUE_VULN_ID'),
            "vendor_id" : detection.get('QID'),
            "type" : detection.get('TYPE'),
            "severity" : detection.get('SEVERITY'),
            "status" : detection.get('STATUS'),
            "first_found_datetime" : detection.get('FIRST_FOUND_DATETIME'),
            "last_found_datetime" : detection.get('LAST_FOUND_DATETIME')
        }
        all_vulnerabilitiy_detections.append(detection_detail)
        rel_asset_to_detection.append((asset_id, detection_id))
        rel_detection_to_asset.append((detection_id, asset_id))

df_assets = pd.DataFrame(all_assets)
df_assets[":LABEL"] = "Assets;Hosts"
df_assets.rename(columns={"asset_id" : "asset_id:ID(Assets-ID)"}, inplace=True)
df_assets.to_csv(r".\graph_csv\assets_nodes.csv", encoding="utf-8", index=False, header=False)
with open(r".\graph_csv\assets_nodes_header.csv", "w", encoding="utf-8") as f:
    f.write(",".join(df_assets.columns))

df_detections = pd.DataFrame(all_vulnerabilitiy_detections)
df_detections[":LABEL"] = "Detections;Vulnerablities"
df_detections.rename(columns={"detection_id": "detection_id:ID(Detections-ID)"}, inplace=True)
df_detections.to_csv(r".\graph_csv\detection_nodes.csv", encoding="utf-8", index=False, header=False)
with open(r".\graph_csv\detection_nodes_header.csv", "w", encoding="utf-8") as f:
    f.write(",".join(df_detections.columns))

with open(r".\graph_csv\detection_found_on_assets_rel.csv", 'w', encoding='utf-8') as f:
    # there should not be space between the two fields
    f.write(":START_ID(Detections-ID),:END_ID(Assets-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},FOUND_ON" for x in rel_detection_to_asset]))


with open(r".\graph_csv\asset_has_risk_detection_rel.csv", 'w',encoding='utf-8') as f:
    # there should not be space between the two fields
    f.write(":START_ID(Assets-ID),:END_ID(Detections-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},HAS_RISK" for x in rel_asset_to_detection]))

del scan_data

In [4]:
# create nodes for qualys_kb 

qualys_kb_path = r".\assets\qualys_kb.xml"
with open(qualys_kb_path, "r", encoding="utf-8") as fp:
    qualys_kb_data = xmltodict.parse(fp.read())

vuln_list = qualys_kb_data['KNOWLEDGE_BASE_VULN_LIST_OUTPUT']['RESPONSE']['VULN_LIST']['VULN']
qualys_kb_list = list()
qualys_kb_dict = dict()
for vuln in vuln_list:
    qualys_vuln = {
        "qualys_id" : vuln.get('QID'),
        "vulnerability_type" : vuln.get('VULN_TYPE'),
        "severity_level": vuln.get('SEVERITY_LEVEL'),
        "title" : vuln.get('TITLE'),
        "category": vuln.get('CATEGORY'),
        "publised_datetime": vuln.get('PUBLISHED_DATETIME'),
        "patchable": vuln.get('PATCHABLE'),
        "diagnosis" : vuln.get('DIAGNOSIS'),
        "pci_flag": vuln.get('PCI_FLAG'),
        "consequences" : vuln.get('CONSEQUENCE'),
        "solution": vuln.get('SOLUTION')
    }


    if vuln.get('CVE_LIST'):
        cve_list = vuln['CVE_LIST']['CVE']
        cve_ids = list()
        if isinstance(cve_list, list):
            for cve in cve_list:
                cve_ids.append(cve['ID'])
        else:
            cve_ids.append(cve_list['ID'])
        
        qualys_vuln["cve_ids"] = cve_ids
    
    qualys_kb_list.append(qualys_vuln)
        # print(qualys_vuln)
    
    qualys_kb_dict[qualys_vuln["qualys_id"]] = qualys_vuln


df_qualys_kb = pd.DataFrame(qualys_kb_list)
df_qualys_kb[":LABEL"] = "Qualys_Knowledge_Base"
df_qualys_kb.rename(columns={"qualys_id": "qualys_id:ID(Qualys-ID)"}, inplace=True)
df_qualys_kb.to_csv(r".\graph_csv\qualys_kb_nodes.csv", encoding="utf-8", index=False, header=False)
with open(r".\graph_csv\qualys_kb_nodes_header.csv", "w", encoding="utf-8") as f:
    f.write(",".join(df_qualys_kb.columns))

# relation between vulnerability detected and qualys_kb
rel_detection_to_qualys_kb = list()
for detection in all_vulnerabilitiy_detections:
    if detection["vendor_id"] in qualys_kb_dict.keys():
        rel_detection_to_qualys_kb.append((detection['detection_id'], detection['vendor_id']))
    else:
        print(f"{detection['vendor_id']} not found in qualys_kb")

with open(r".\graph_csv\detection_to_qualys_kb_rel.csv", 'w', encoding='utf-8') as f:
    f.write(":START_ID(Detections-ID),:END_ID(Qualys-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},MORE_INFORMATION" for x in rel_detection_to_qualys_kb]))

del qualys_kb_data

379478 not found in qualys_kb
379492 not found in qualys_kb
755929 not found in qualys_kb
755947 not found in qualys_kb
755957 not found in qualys_kb
755971 not found in qualys_kb
755973 not found in qualys_kb
200166 not found in qualys_kb
200180 not found in qualys_kb
379223 not found in qualys_kb
92121 not found in qualys_kb
379223 not found in qualys_kb


In [5]:
# create nodes for cve 
import json 
with open(r".\assets\cve_db.json", "r", encoding="utf-8") as f:
    cve_db = json.load(f)

df_cve_db = pd.DataFrame(cve_db.values())
df_cve_db[":LABEL"] = "CVE_DATA"
df_cve_db.rename(columns={"id": "cve_id:ID(Cve-ID)"}, inplace=True)
df_cve_db.to_csv(r".\graph_csv\cve_db_nodes.csv", encoding="utf-8", index=False, header=False)
with open(r".\graph_csv\cve_db_nodes_header.csv", "w", encoding="utf-8") as f:
    f.write(",".join(df_cve_db.columns))


# print(type(cve_db))
rel_qualys_kb_to_cve = list()
for qualys_vuln in qualys_kb_list:
    if cve_ids := qualys_vuln.get("cve_ids"):
        for cve_id in cve_ids:
            if cve_id in cve_db.keys():
                rel_qualys_kb_to_cve.append((qualys_vuln['qualys_id'], cve_id))


with open(r".\graph_csv\qualys_kb_to_cve_rel.csv", 'w', encoding='utf-8') as f:
    f.write(":START_ID(Qualys-ID),:END_ID(Cve-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},MORE_INFORMATION" for x in rel_qualys_kb_to_cve]))




In [6]:
import ast 

threat_actors_path = r".\assets\Threat_Actors_to_CVE.csv"

df_thread_actors = pd.read_csv(threat_actors_path, encoding="utf-8")
df_thread_actors["Threat_Actor_ID"] = df_thread_actors.index.to_series().map(lambda x: f"Threat-Actor-{x}")
df_thread_actors['CVEs_list'] = df_thread_actors["CVEs"].map(ast.literal_eval)

df_thread_actors_nodes = df_thread_actors[["Threat_Actor_ID", "Key"]]
df_thread_actors_nodes[":LABEL"] = "Threat_Actors"
df_thread_actors_nodes.rename(columns={"Threat_Actor_ID": "threat_actor_id:ID(Threat-Actor-ID)", "Key": "name"}, inplace=True)
df_thread_actors_nodes.to_csv(r".\graph_csv\threat_actor_nodes.csv", encoding="utf-8", index=False, header=False)
with open(r".\graph_csv\threat_actor_nodes_header.csv", "w", encoding="utf-8") as f:
    f.write(",".join(df_thread_actors_nodes.columns))


rel_threat_to_cve = list()
rel_cve_to_threat = list()
for index, row in df_thread_actors.iterrows():
    cve_list = row['CVEs_list']
    for cve in cve_list:
        rel_threat_to_cve.append((row["Threat_Actor_ID"], cve))
        rel_cve_to_threat.append((cve, row["Threat_Actor_ID"]))


with open(r".\graph_csv\threat_to_cve_rel.csv", 'w', encoding='utf-8') as f:
    f.write(":START_ID(Threat-Actor-ID),:END_ID(Cve-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},EXPLOITS" for x in rel_threat_to_cve]))

with open(r".\graph_csv\cve_to_threat_rel.csv", 'w', encoding='utf-8') as f:
    f.write(":START_ID(Cve-ID),:END_ID(Threat-Actor-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},EXPLOITED_BY" for x in rel_cve_to_threat]))




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_thread_actors_nodes[":LABEL"] = "Threat_Actors"
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_thread_actors_nodes.rename(columns={"Threat_Actor_ID": "threat_actor_id:ID(Threat-Actor-ID)", "Key": "name"}, inplace=True)


In [7]:
import ast 

ransomware_path = r".\assets\ransomware_CVE.csv"

df_ransomware = pd.read_csv(ransomware_path, encoding="utf-8")
df_ransomware["Ransomware_ID"] = df_ransomware.index.to_series().map(lambda x: f"Ransomware-{x}")
df_ransomware['CVEs_list'] = df_ransomware["CVEs"].map(ast.literal_eval)

df_ransomware_nodes = df_ransomware[["Ransomware_ID", "Ransomware"]]
df_ransomware_nodes[":LABEL"] = "Ransomeware"
df_ransomware_nodes.rename(columns={"Ransomware_ID": "ransomware_id:ID(Ransomware-ID)", "Ransomware": "name"}, inplace=True)
df_ransomware_nodes.to_csv(r".\graph_csv\ransomware_nodes.csv", encoding="utf-8", index=False, header=False)
with open(r".\graph_csv\ransomware_nodes_header.csv", "w", encoding="utf-8") as f:
    f.write(",".join(df_ransomware_nodes.columns))


rel_ransomware_to_cve = list()
rel_cve_to_ransomware = list()
for index, row in df_ransomware.iterrows():
    cve_list = row['CVEs_list']
    for cve in cve_list:
        rel_ransomware_to_cve.append((row["Ransomware_ID"], cve))
        rel_cve_to_ransomware.append((cve, row["Ransomware_ID"]))


with open(r".\graph_csv\ransomware_to_cve_rel.csv", 'w', encoding='utf-8') as f:
    f.write(":START_ID(Ransomware-ID),:END_ID(Cve-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},EXPLOITS" for x in rel_ransomware_to_cve]))

with open(r".\graph_csv\cve_to_ransomware_rel.csv", 'w', encoding='utf-8') as f:
    f.write(":START_ID(Cve-ID),:END_ID(Ransomware-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},EXPLOITED_BY" for x in rel_cve_to_ransomware]))




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_ransomware_nodes[":LABEL"] = "Ransomeware"
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_ransomware_nodes.rename(columns={"Ransomware_ID": "ransomware_id:ID(Ransomware-ID)", "Ransomware": "name"}, inplace=True)


In [8]:
import ast 

malware_path = r".\assets\Malware_to_CVE.csv"

df_malware = pd.read_csv(malware_path, encoding="utf-8")
df_malware["Malware_ID"] = df_malware.index.to_series().map(lambda x: f"Malware-{x}")
df_malware['CVEs_list'] = df_malware["CVEs"].map(ast.literal_eval)

df_malware_nodes = df_malware[["Malware_ID", "Key"]]
df_malware_nodes[":LABEL"] = "Malware"
df_malware_nodes.rename(columns={"Malware_ID": "malware_id:ID(Malware-ID)", "Key": "name"}, inplace=True)
df_malware_nodes.to_csv(r".\graph_csv\malware_nodes.csv", encoding="utf-8", index=False, header=False)
with open(r".\graph_csv\malware_nodes_header.csv", "w", encoding="utf-8") as f:
    f.write(",".join(df_malware_nodes.columns))


rel_malware_to_cve = list()
rel_cve_to_malware = list()
for index, row in df_malware.iterrows():
    cve_list = row['CVEs_list']
    for cve in cve_list:
        rel_malware_to_cve.append((row["Malware_ID"], cve))
        rel_cve_to_malware.append((cve, row["Malware_ID"]))


with open(r".\graph_csv\malware_to_cve_rel.csv", 'w', encoding='utf-8') as f:
    f.write(":START_ID(Malware-ID),:END_ID(Cve-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},EXPLOITS" for x in rel_malware_to_cve]))

with open(r".\graph_csv\cve_to_malware_rel.csv", 'w', encoding='utf-8') as f:
    f.write(":START_ID(Cve-ID),:END_ID(Malware-ID),:TYPE\n")
    f.write( "\n".join([f"{x[0]},{x[1]},EXPLOITED_BY" for x in rel_cve_to_malware]))



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_malware_nodes[":LABEL"] = "Malware"
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_malware_nodes.rename(columns={"Malware_ID": "malware_id:ID(Malware-ID)", "Key": "name"}, inplace=True)
