## Introduction

This Jupyter Notebook is designed to pull metadata from assets in Prisma Cloud with a selected vulnerability ID. We query Prisma Cloud API's to return data, and then parse that data to provide a simplified workflow. Tags are used as an attempt to attribute ownership of vulnerabilities. Path exclusions can be used to exclude results for corporate managed software (updated via IT).

Setup
1. Create a file named .env in the root of this project
2. Add the required fields, an example .env file is provided in the root of this project as dot_env_example.txt  

> PRISMA_CLOUD_URL  
> PRISMA_CLOUD_IDENTITY  
> PRISMA_CLOUD_SECRET  
> TAG_SEARCH  
> PATH_IGNORE  

Features
- Path Exclusions
- Tag metadata searching for attribution

API's Used:

- [Get CVE Overview](https://pan.dev/prisma-cloud/api/cspm/cve-overview/)
- [Get Vulnerabilities by RQL](https://pan.dev/prisma-cloud/api/cspm/vulnerabilities-search-api/)
- [Get Vulnerable Assets by RQL](https://pan.dev/prisma-cloud/api/cspm/list-vulnerable-assets/)



In [2]:
cveId        = 'CVE-2024-21626'

### API Calls and Data Processing

***

#### Get auth token and set up the environment

***

In [3]:
# Get auth token and set up the environment
import json
import requests
import os
from dotenv import load_dotenv
from IPython.display import Markdown

load_dotenv()

username     = os.getenv("PRISMA_CLOUD_IDENTITY")
password     = os.getenv("PRISMA_CLOUD_SECRET")
apiEndpoint  = os.getenv("PRISMA_CLOUD_URL")
tagSearch    = os.getenv("TAG_SEARCH")
pathIgnore   = os.getenv("PATH_IGNORE")

platformURL  = apiEndpoint + "/login"

payload = json.dumps({
    'username': username,
    'password': password
})

headers = {
  'Content-Type': 'application/json; charset=UTF-8',
  'Accept': 'application/json; charset=UTF-8'
}

login = requests.request("POST", platformURL, headers=headers, data=payload)

response_json   = login.json()
token           = response_json["token"]

#### Get CVE Overview

***

In [4]:
# Get CVE Overview
url = apiEndpoint + '/uve/api/v1/dashboard/vulnerabilities/cve-overview?cve_id=' + cveId

payload={}
headers = {
  'Content-Type': 'application/json',
  'Accept': 'application/json',
  'x-redlock-auth': token
}

response    = requests.request("GET", url, headers=headers, data=payload)
cveDetails  = json.loads(response.text)

cveDescription    = cveDetails["description"]
cveCvssScore      = cveDetails["cvss"]
severity          = cveDetails["severity"]
packageName       = cveDetails["packageName"]
impactedVersions  = cveDetails["impactedVersion"]
fixVersions       = cveDetails["fixVersion"]

In [None]:
# # Create asset chart
# url = apiEndpoint + '/uve/api/v1/dashboard/vulnerabilities/vuln-assets'

# payload = json.dumps({
#   "cve_id": cveId,
#   "risk_factors": [],
#   "page_offset": 0,
#   "page_size": 0,
#   "filter_suppressed": True
# })
# headers = {
#   'Content-Type': 'application/json',
#   'Accept': 'application/json',
#   'x-redlock-auth': token
# }

# response    = requests.request("POST", url, headers=headers, data=payload)
# assetTypes  = json.loads(response.text)

# print(assetTypes)

# hostCount           = assetTypes["value"]["host"]["count"]
# iacCount            = assetTypes["value"]["iac"]["count"]
# deployedImageCount  = assetTypes["value"]["deployedImage"]["count"]


# import plotly.graph_objects as go

# # Your data
# data = {
#     "host": {"count": hostCount},
#     "iac": {"count": iacCount},
#     "deployedImage": {"count": deployedImageCount}
# }

# # Create the x and y values for the chart
# x_values = list(data.keys())
# y_values = [item["count"] for item in data.values()]

# # Create the bar chart
# fig = go.Figure(data=[go.Bar(x=x_values, y=y_values)])

# # Customize the chart
# fig.update_layout(title_text='Vulnerabilities count per category')


#### Get assets with CVE

***

In [5]:
# Get assets with CVE
url = apiEndpoint + '/uve/api/v1/vulnerabilities/search/asset'

query = "vulnerability where cve.id='" + cveId + "'"

payload = json.dumps({
  "query": query,
  "cveId": cveId,
  "assetLifecycle": "run",
  "assetType": "host"
})
headers = {
  'Content-Type': 'application/json',
  'Accept': 'application/json',
  'x-redlock-auth': token
}

response    = requests.request("POST", url, headers=headers, data=payload)
assetData   = json.loads(response.text)
assetInfo   = assetData["value"]
assetCount  = len(assetInfo)

#### Get asset metadata
***

In [None]:
# Get asset metadata
url = apiEndpoint + '/uai/v1/asset'

assetMetaData = {}

for asset in assetInfo:
    payload = json.dumps({
      "assetId": asset["id"],
      "type": "asset_lite"
    })

    headers = {
      'Content-Type': 'application/json',
      'Accept': 'application/json',
      'x-redlock-auth': token
    }

    response      = requests.request("POST", url, headers=headers, data=payload)
    assetResponse = json.loads(response.text)

    assetMetaData[asset["id"]] = assetResponse

#### Get vulnerability details by asset
***

In [None]:
# Get vulnerability details by asset
url = apiEndpoint + '/uai/v1/asset'

def safe_get(dict_obj, *keys, default=None):
    for key in keys:
        if dict_obj is not None and key in dict_obj:
            dict_obj = dict_obj.get(key)
        else:
            return default
    return dict_obj

for asset_id, item in assetMetaData.items():
    asset = item["data"]["asset"]
    patchInfo = []

    payload = json.dumps({
      "type": "asset_cwp_vulns",
      "vulnerabilityInfoType": "cve",
      "assetId": asset["id"],
      "vulnerabilityInfoTypeId": cveId,
      "filters": {
        "patchable": "true",
        "sort": "severity"
      }
    })

    headers = {
      'Content-Type': 'application/json',
      'Accept': 'application/json',
      'x-redlock-auth': token
    }

    response   = requests.request("POST", url, headers=headers, data=payload)
    cvResponse = json.loads(response.text)
        
    value = safe_get(cvResponse, "data", "asset", "vulns", "value")

    if value is not None:
      for item in value:
          if "packagePath" in item and item["packagePath"].startswith(pathIgnore): continue
          patchInfo.append(item)
          
    assetMetaData[asset_id]["data"]["asset"]["patchInfo"] = patchInfo


#### Format output
***

In [None]:
# Format output
cveInformation = {
    "Description": cveDescription,
    "CVSS Score": cveCvssScore,
    "Severity": severity,
    "Package Name": packageName,
    "Impacted Versions": impactedVersions,
    "Fixed Versions": fixVersions
}

results = ''
cveDetailOutput = ''

for key, value in cveInformation.items():
    cveDetailOutput += f'{key} : {value}\n'
    
results += '\n----------------------------------------------------------------------------------------------------------\n'

for item in assetMetaData:
    asset = safe_get(assetMetaData, item, "data", "asset")
    if asset and len(safe_get(asset, "patchInfo", default=[])) > 0:
        results += f'Host: {assetMetaData[item]["data"]["asset"]["name"]}\n'
        results += f'Cloud: {assetMetaData[item]["data"]["asset"]["cloudType"]}\n'
        results += f'Account Name: {assetMetaData[item]["data"]["asset"]["accountName"]}\n'
        results += f'Account ID: {assetMetaData[item]["data"]["asset"]["attributes"]["accountID"]}\n'
        results += f'Region: {assetMetaData[item]["data"]["asset"]["regionId"]}\n'
        results += f'VPC Name: {assetMetaData[item]["data"]["asset"]["vpcName"]}\n'
        results += f'VM Image: {assetMetaData[item]["data"]["asset"]["attributes"]["vmImage"]}\n'
        results += f'Link to Asset: {assetMetaData[item]["data"]["asset"]["url"]}\n'
        patch_info = safe_get(asset, "patchInfo", default=[])
        if patch_info:
            results += f'Required Version: {safe_get(patch_info[0], "fixedVersion")}\n'
        for patch in patch_info:
            results += 'Vulnerability Found:\n'
            results += f'     Path: {safe_get(patch, "packagePath")}\n'
            results += f'     Version: {safe_get(patch, "version")}\n'
        results += '\n\n'

### Results
***

#### CVE Details

In [None]:
print(cveDetailOutput)

#### Finding Details


In [None]:
print(results)