# Process to get "plugins" from Qualys API and enrich with categories

This code aims to categorize all the vulnerabilities from differents tools (Qualys and Openvas in this case) and normalize the data to make some reports easily. The objective is to make a "patch" report without caring about the tool. 

In [1]:
import sys
import xml.etree.ElementTree as ET
import csv
import re
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import getpass
import pandas as pd

We declared all the global variables we are going to need after in our code. Basically we clasify all the vulnerabilities into 9 categories, and we left "to clasify" category for all of those vulnerabilities that we could not clasify automatically with this code.

In [2]:
SolutionPatch = "Update or patch installation"
Eol = "End of life Platform"
Info = "Informational"
Workaround = "Workaround"
NoPatch = "No solution or patch available"
PolicyCompliance = "Policy compliance"
UselessService = "Unsecure or useless service"
ConfigurationChanges = "Configuration changes"
Ssltls = "SSL/TLS hardening"
ToClassify = "To classify"

The next two functions are going to be optional, the main goal is to download the Knowledge base, aKa KB, from Qualys API and save into file that we define in the variable "plugins_output_file", then the code is going to proccess this file for categorizing Qualys plugins.

In [3]:
def GetQualysApiValues():
    base_url = input('Please enter the URL for API in Qualys: ')
    username = input('Please enter the Username: ')
    password = getpass.getpass()
    headers = {'X-Requested-With': 'Python3'}
    return (base_url,username,password,headers)

def GetPlugins(plugins_output_file):
    url_base,username,password,headers = GetQualysApiValues()
    endpoint = url_base + '/api/2.0/fo/knowledge_base/vuln/'
    parameter = {'action' : 'list'}
    r = requests.get(endpoint,auth=(username,password),params=parameter,headers=headers,verify=False)
    with open(plugins_output_file,'w') as f:
        f.write(r.text)

These function are going to read the plugins that we have got from the KB, then will apply one of that 9 categories and save into a csv file. This file we normally use in Splunk&reg;  as a lookup to enrich data from vulnerabilities results and make some reports. 

In [4]:
def WriteLookup(FileName,List):
    CsvHeaders = ["ID", "SolutionCategory", "VendorReference", "PluginName","Diagnosis","Consequence","Solution"]
    with open(FileName, 'w') as outcsv:
        writer = csv.writer(outcsv)
        writer.writerow(CsvHeaders)
    # Write data to file
        for r in List:
            outcsv.write(r + "\n")
        outcsv.close()

def ReadVendorReferencesCSV(FileName):
    ReferenceRegexCSV = []
    with open(FileName,'r') as VR:
        reader = csv.reader(VR)
        next(reader, None)  # skip the headers
        for row in reader:
            ReferenceRegexCSV.append(row[2])
    return ReferenceRegexCSV

def LookVendorReference(FileName,Content):
    VendorReferences = []
    Content = Content.upper()
    for i in FileName:
        i = str(i)
        VR = re.compile(i)
        VendorRef = VR.findall(Content)
        if VendorRef:
            for i in VendorRef:
                if type(i) is str:
                    VendorReferences.append(i)
                if type(i) is tuple:
                    i = list(set(i))
                    i = filter(None,i)
                    for l in i:
                        VendorReferences.append(l)
    VendorReference=' '.join(VendorReferences)
    VendorReference = VendorReference.split()
    return VendorReference

def CleanVendorReferenceList(VRList):
    VRTemp = []
    for v in VRList:
        if type(v) is str:
            VRTemp.append(v)
        if type(v) is list:
            for i in v:
                VRTemp.append(i)
    VRTemp = [elem for elem in VRTemp if elem != "AND"]
    VRTemp = [elem for elem in VRTemp if elem != "TO"]
    VRTemp = sorted(set(VRTemp),key=VRTemp.index)
    VRTemp = filter(None,VRTemp)
    VendorReference = ' '.join(VRTemp)
    return VendorReference
            
def CompressCSV(FileName):
    import gzip
    import shutil
    FileNameCompressed = FileName + '.gz'
    with open(FileName, 'rb') as f_in, gzip.open(FileNameCompressed, 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)
        
def GenerateClasifiedPlugin(PluginsInputFile,ReferenceRegexCSV,OutputFile):
    tree = ET.parse(PluginsInputFile)
    root = tree.getroot()
    QIDs = []
    VRID = []
    for kb in root:
        for response in kb:
            for vulnlist in response.findall('VULN'):
                PATCH = ToClassify
                QID = vulnlist.find('QID').text
                PATCHABLE = vulnlist.find('PATCHABLE').text
                TITLE = vulnlist.find('TITLE').text
                VendorRefName = LookVendorReference(ReferenceRegexCSV,TITLE)
                TITLE = TITLE.replace(',',' ')
                VRID.append(VendorRefName)
                VULN_TYPE = vulnlist.find('VULN_TYPE').text
                SOLUTIONELEMENT = vulnlist.find('SOLUTION')
                if SOLUTIONELEMENT is not None:
                    SOLUTION = SOLUTIONELEMENT.text
                    SOLUTION = SOLUTION.replace('\"','').replace(',','')
                else:
                    SOLUTION = "None"
                DIAGNOSISELEMENT = vulnlist.find('DIAGNOSIS')
                if DIAGNOSISELEMENT is not None:
                    DIAGNOSIS = DIAGNOSISELEMENT.text
                    DIAGNOSIS = DIAGNOSIS.replace('\"','')
                    DIAGNOSIS = DIAGNOSIS.replace(',','')
                else:
                    DIAGNOSIS = "None"
                CONSEQUENCEELEMENT = vulnlist.find('CONSEQUENCE')
                if CONSEQUENCEELEMENT is not None:
                    CONSEQUENCE = CONSEQUENCEELEMENT.text
                    CONSEQUENCE = CONSEQUENCE.replace('\"','')
                    CONSEQUENCE = CONSEQUENCE.replace(',','')
                else:
                    CONSEQUENCE = "None"
                VENDOR_REFERENCE_ID = vulnlist.findall('VENDOR_REFERENCE_LIST/VENDOR_REFERENCE/ID')
                if VENDOR_REFERENCE_ID is None:
                    VENDOR_REFERENCE_ID = "None"
                else:
                    for vr in vulnlist.findall('VENDOR_REFERENCE_LIST/VENDOR_REFERENCE'):
                        id = vr.find('ID').text
                        id = id.upper()
                        id = id.replace(',',' ')
                        id = id.replace('\"','')
                        id = id.split()
                        VRID.append(id)
                VENDOR_REFERENCE = CleanVendorReferenceList(VRID)
                VRID.clear()
                if TITLE.find('EOL') != -1:
                    PATCH = Eol
                if PATCHABLE == '0' and SOLUTIONELEMENT is not None:
                    SOLUTION = SOLUTION.lower()
                    if TITLE.find('EOL') != -1:
                        PATCH = Eol
                    elif (SOLUTION.find('not released patch') != -1 
                        or SOLUTION.find('no fix available') != -1 
                        or SOLUTION.find('not released a patch') != -1 
                        or SOLUTION.find('no official fix') != -1 
                        or SOLUTION.find('not released the patch') != -1 
                        or SOLUTION.find('no solution') != -1 
                        or SOLUTION.find('not released') != -1 
                        or SOLUTION.find('hasn\'t released') != -1 
                        or SOLUTION.find('no vendor advisory') != -1 
                        or SOLUTION.find('not issued a fix') != -1 
                        or SOLUTION.find('no vendor supplied patches') != -1 
                        or SOLUTION.find('vendor has not confirmed the vulnerability') != -1
                        or SOLUTION.find('vendor has not confirmed vulnerability') != -1
                        or SOLUTION.find('vendor hasn\'t confirmed') != -1
                        or SOLUTION.find('no patch') != -1
                        or SOLUTION.find('no vendor-supplied') != -1
                        or SOLUTION.find('no patches')!=-1
                        or SOLUTION.find('any vendor supplied') != -1
                        or SOLUTION.find('any fixes') != -1
                        or SOLUTION.find('no known patches') != -1
                        or SOLUTION.find('any vendor-supplied') != -1
                        or SOLUTION.find('has not confirmed this issue') != -1
                        or SOLUTION.find('has not released') != -1
                        #or SOLUTION.find('') != -1
                        #or SOLUTION.find('') != -1
                        #or SOLUTION.find('') != -1
                        #or SOLUTION.find('') != -1
                        ):
                        if SOLUTION.find('workaround:<br>') != -1:
                            PATCH = Workaround
                        elif SOLUTION.find('workaround:') != -1:
                            PATCH = Workaround
                        elif SOLUTION.find('workarounds:<br>') != -1:
                            PATCH = Workaround
                        else:
                            PATCH = NoPatch
                    elif VULN_TYPE == "Information Gathered":
                        PATCH = Info
                    elif (SOLUTION.find('workaround') != -1):
                        PATCH = Workaround
                    else:
                        PATCH = ConfigurationChanges
                if (PATCHABLE == '1' and PATCH != Eol):
                    PATCH = SolutionPatch
                if VULN_TYPE == "Information Gathered":
                    PATCH = Info
                else:
                    PATH = ToClassify
                TITLE = TITLE.replace('\"','')
                LookupData = '\"' + QID + '\"' + ',' + '\"' + PATCH + '\"' + ',' + '\"' + str(VENDOR_REFERENCE) + '\"' + ',' + '\"' + TITLE.replace('\"','') + '\"' + ',' + '\"' + DIAGNOSIS + '\"' + ',' + '\"' + CONSEQUENCE + '\"' + ',' + '\"' + SOLUTION + '\"'
                QIDs.append(LookupData)
    WriteLookup(OutputFile,QIDs)
    CompressCSV(OutputFile)

Finally, we call all the functions we created before into a main function that will receive an argument. If we want to download the KB from Qualys API we must introduce the URL and credentials

In [6]:
%%time
PluginsFile = './QualysPlugins/PluginsQualys1901.xml'
OutputFile = './OutputFiles/QualysLookupResult1901.csv'
def main():
    References = []
    downloadflag = input('Would you like to download latest plugin version from Qualys API? (yes/y OR no/n)')
    downloadflag = downloadflag.lower()
    if (downloadflag == 'yes' or downloadflag =='y'):
        print('Connecting to API and downloading the latest knowledge base from Qualys')
        GetPlugins(PluginsFile)
        for r in ReadVendorReferencesCSV('../VendorReferences.csv'):
            References.append(r)
        GenerateClasifiedPlugin(PluginsFile,References,'QualysLookupResultPrueba.csv')
    elif(downloadflag == 'no' or downloadflag == 'n' ):
        print('Using the knowledge base in your localfiles')
        for r in ReadVendorReferencesCSV('../VendorReferences.csv'):
            References.append(r)
        GenerateClasifiedPlugin(PluginsFile,References,OutputFile)
    else:
        print('Please enter a valid value')

main()

Would you like to download latest plugin version from Qualys API? (yes/y OR no/n)n
Using the knowledge base in your localfiles
CPU times: user 20.3 s, sys: 700 ms, total: 21 s
Wall time: 22.6 s


In [7]:
pluginsdf = pd.read_csv(OutputFile+'.gz')

In [8]:
pluginsdf.groupby(['SolutionCategory']).describe()

Unnamed: 0_level_0,ID,ID,ID,ID,ID,ID,ID,ID
Unnamed: 0_level_1,count,mean,std,min,25%,50%,75%,max
SolutionCategory,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2
Configuration changes,1364.0,54647.264663,68981.881185,11.0,10527.75,34002.5,86375.25,371390.0
End of life Platform,446.0,108315.237668,54367.337691,11542.0,105419.75,105552.5,105667.75,371149.0
Informational,1190.0,79307.805882,50221.570693,6.0,45090.25,78019.5,105242.75,370668.0
No solution or patch available,1529.0,64594.605625,73196.777306,1020.0,11331.0,38010.0,115957.0,371349.0
To classify,17.0,95577.470588,111255.803948,7009.0,19568.0,62080.0,105089.0,371016.0
Update or patch installation,36306.0,158997.567262,86470.822509,1018.0,117167.25,157098.5,176511.75,390164.0
Workaround,607.0,67048.131796,67982.173391,1024.0,12336.0,43401.0,116243.0,371163.0


In [9]:
pluginsdf[pluginsdf.SolutionCategory == 'To classify']

Unnamed: 0,ID,SolutionCategory,VendorReference,PluginName,Diagnosis,Consequence,Solution
209,7009,To classify,,New Virtual Web Server Hostnames Found.,This is a list of potential Virtual Web Server...,,
1951,11785,To classify,,WordPress Page Layout Builder Plugin Cross-Sit...,WordPress is an open source blogging tool and ...,Successful exploitation could allow an attacke...,
1956,11790,To classify,,Moxa MXview Private Key Disclosure Vulnerability,Moxa MXview network management software is des...,Successful exploitation allows unauthenticated...,
2866,12772,To classify,,WordPress MobileChief Plugin jQuery Validation...,MobileChief is a powerful extendable mobile si...,Successfully exploiting this vulnerability mig...,
4099,19568,To classify,,Database Instance Detected,The service detected a database installation o...,,
4901,27316,To classify,,ArGoSoft FTP Server .NET Directory Traversal V...,A vulnerability has been discovered in ArGoSof...,There are no vendor supplied patches available...,
6130,43242,To classify,HUAWEI AR SERIES ROUTERS,Huawei AR Series Routers Multiple Vulnerabilities,Huawei Quidway AR series routers are manufactu...,Successfully exploiting these vulnerabilities ...,
6614,45120,To classify,,ICS / SCADA System Detected,SCADA system or related components were detect...,,
7038,62080,To classify,,Squid Proxy Detected,Squid is a caching and forwarding web proxy. <...,,
10455,105086,To classify,,SiteZAP Is Remotely Accessible,SiteZAP network camera is a combined camera an...,,
