In [None]:
# Configuration

# Provide your Checkmarx One tenant name:
tenant_name = "my_tenant"

# Provide either the region name for your multi-tenant instance or your single-tenant server name (do not include "https"):
server = "US" # using the US multi-tenant region

# If using an OAuth client, provide the client name and secret as strings.  If using an API Key, these can be set to None:
oauth_client = None
oauth_secret = None

# If using an API Key, provide the API key as a string.  If using an OAuth client, this can be set to None:
api_key = None

# Provide the name for this agent that will show as the source of operations such as scanning:
agent_name = "ApiTutorial"

# Provide the path to a zip file that can be read for submitting an example scan.
zip_path = ""

# Provide the name of an existing test project where a sample scan will be submitted.
test_scan_project_name = "MyTutorialProject"

# Provide the project id of an existing project that was created with Code Repository import.  This
# will be used to invoke a scan similar to how it is done in the UI.
import_scan_project_id = None

# Provide the project id of an existing project that is a "manual scan" project but has the repository URL and
# credentials saved in the project settings.  This will be used to invoke a scan similar to a code repository import scan.
manual_scan_project_id = None

# Provide a clone URL and a PAT (or None if the repository is public) and a project id to demonstrate
# executing a scan by providing the clone details.
clone_project_id = manual_scan_project_id
clone_url = None
clone_branch = None
clone_pat = None

# Provide a list of email addresses that will receive an sample PDF report.  Leave
# set to None to not send a PDF report via email.
# Example:
# email_list = ["me@corp.com", "you@corp.com"]
email_list = None



## Optional Configuration

The next cell contains optional configuration options.  Usually these are not needed.

### Viewing the Tutorial's API I/O

While running the tutorial, it may be helpful to intercept the Checkmarx One API I/O to gain a deeper understanding of how the `cxone-async-api` is operating.  This can be done with `mitmproxy`.

One method of running `mitmproxy` is to run it as a docker container.  The following command line is an example of how it might be executed:

`docker run --rm -it -p 8080:8080 mitmproxy/mitmproxy mitmproxy --ssl-insecure`

Then set the `proxies` and `ssl_verify` properties in the following cell:

```python
proxies = {
  "http" : "http://localhost:8080",
  "https" : "http://localhost:8080"
}

ssl_verify=False

```

The `proxies` setting may need to be varied if running this notebook using the Jupyter Notebook server rather than VS Code.


In [None]:
# Notebook options - mostly these can be left as-is

# Set to True to force install of the cxone-async-api from the GitHub release instead of the local
# package (if running this notebook in VSCode from a clone of the cxone-async-api repository)/
force_download = False


# Proxy settings - leave at None if not using a proxy, otherwise this is a dictionary that follows the Requests API proxy settings
proxies = None

# SSL check settings - leave at True for most uses
ssl_verify=True


In [None]:
%%capture ignore

# Determine and install the latest cxone-async-api version or the local code, don't modify this cell.

import os
from pathlib import Path

local_module_path = Path("..") / Path("pyproject.toml")

%pip uninstall -y cxone_api
%pip install pandas plotly nbformat

if os.path.exists(local_module_path) and not force_download:
  %pip install ..

  from cxone_api.__version__ import __version__
  api_version = __version__
else:
  %pip install requests

  import requests
  latest_redirect = requests.get("https://github.com/checkmarx-ts/cxone-async-api/releases/latest", allow_redirects=False)
  assert(latest_redirect.status_code == 302)
  api_version = latest_redirect.headers['location'].split("/")[-1:].pop()
  %pip uninstall requests -y

  %pip install https://github.com/checkmarx-ts/cxone-async-api/releases/download/$api_version/cxone_api-$api_version-py3-none-any.whl


from requests.packages import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


In [None]:
%%capture ignore
# Add some utility functions, don't modify this cell
from IPython.display import Markdown, JSON as int_JSON, display
import asyncio

def MD(md_string):
    display(Markdown(md_string))

def JSON(json_dict):
    display(int_JSON(json_dict, expanded=True))


from cxone_api import CxOneClient
from cxone_api.low.projects import create_a_project, retrieve_list_of_projects
async def get_test_projectid(client : CxOneClient) -> str:
  project_search_response = json_on_ok(await retrieve_list_of_projects(client, name_regex=f"^{test_scan_project_name}$"))

  if not project_search_response['filteredTotalCount'] == 1:
    # The project with the test name doesn't exist, create it.
    return json_on_ok(await create_a_project(client, name=test_scan_project_name, origin=agent_name))['id']
  else:
    # Use the existing test project
    return project_search_response['projects'][0]['id']

# Functions for plotting charts.
from IPython.display import display
import plotly.express as px

def barChart(df, x, y, title):
    fig = px.bar(df, orientation='v', y=y, x=x, title=title)
    display(fig)

def pieChart(df, values, names, title):
    display(px.pie(df, values=values, names=names, title=title))

In [None]:
MD(f"**Using cxone-async-api version {api_version}.**")

# Tutorial 1: Creating an instance of `CxOneClient`

The `CxOneClient` is the primary object for interfacing with the Checkmarx One API.  It is initialized with the Checkmarx One tenant's endpoints and credentials that
are then used to perform authorized operations with the Checkmarx One API.  Details such as re-authenticating when authentication tokens expire and retrying operations due
to 5XX responses is handled transparently by `CxOneClient`.

## Selecting the Connection Endpoints

There are two endpoints needed for interfacing with Checkmarx One:

1. The IAM endpoint
2. The API endpoint

For multi-tenant Checkmarx One, the endpoints can be referenced by [regional moniker shortcuts](https://checkmarx.stoplight.io/docs/checkmarx-one-api-reference-guide/branches/main/fm1ma9xg73dx9-authentication-api#url) (US, US2, DEU, etc.) or by using the server name directly.  Single-tenant Checkmarx One must be referenced by the single-tenant server name for both endpoints.


In [None]:
assert(server is not None and len(server) > 0)
assert(tenant_name is not None and len(server) > 0)

from cxone_api import CxOneAuthEndpoint, CxOneApiEndpoint, AuthRegionEndpoints, ApiRegionEndpoints

# If the server name is set to a regional moniker, the moniker can be used to make each endpoint instance.
if server in AuthRegionEndpoints and server in ApiRegionEndpoints:
    auth_endpoint = AuthRegionEndpoints[server](tenant_name)
    api_endpoint = ApiRegionEndpoints[server]()
else:
    # The server name was provided, as would be required for single-tenant Checkmarx One, make the endpoint instances using the server name.
    # This method can also be used if you do not want to use the multi-tenant regional moniker.
    auth_endpoint = CxOneAuthEndpoint(tenant_name, server)
    api_endpoint = CxOneApiEndpoint(server)


MD(f"Auth endpoint: **{str(auth_endpoint)}**, API Endpoint: **{str(api_endpoint)}**")

## Using an API Key

An API Key can be created by any authorized Checkmarx One user and is tied the the creating account.  It inherits the roles and permissions of the creating user
and will be limited to performing actions allowed by the user's assigned roles.  The user that created the key can revoke the key at any time.  If the user is removed
from the Checkmarx One system, the API key will be invalid.

This type of credential is ideal for user-interactive processes that should operate with Checkmarx One as an identifiable user.  The Checkmarx One IDE plugins, for example, use an API key that is generated and managed by the user of the IDE.

In [None]:
from cxone_api import CxOneClient
from cxone_api.util import json_on_ok
from cxone_api.low.misc import retrieve_versions

if api_key is not None:
    # Create the client instance with the factory method for API keys
    client = CxOneClient.create_with_api_key(api_key, agent_name, auth_endpoint, api_endpoint, proxy=proxies, ssl_verify=ssl_verify)

    # Make a simple authenticated API call.
    JSON(json_on_ok(await retrieve_versions (client)))
else:
    print(f"The variable \'api_key\' was not set with a value, skipping.")


## Using an OAuth Client

An OAuth client can be created by a user with the appropriate Checkmarx One administrative privileges.  An OAuth client is not tied to a specific user account and has access roles assigned to the OAuth client at creation.

This type of credential is most ideal for background automated processes that do not need to be tied to a specific user.

In [None]:
from cxone_api import CxOneClient
from cxone_api.util import json_on_ok
from cxone_api.low.misc import retrieve_versions

if oauth_client is None:
    print(f"The variable \'oauth_client\' was not set with a value, skipping.")
elif oauth_secret is None:
    print(f"The variable \'oauth_secret\' was not set with a value, skipping.")
else:
    # Create the client instance with the factory method for OAuth clients
    client = CxOneClient.create_with_oauth(oauth_client, oauth_secret, agent_name, auth_endpoint, api_endpoint, proxy=proxies, ssl_verify=ssl_verify)

    # Make a simple authenticated API call.
    JSON(json_on_ok(await retrieve_versions (client)))


# Tutorial 2: Client Direct API Calls

The `cxone-async-api` provides the `CxOneClient` object that performs authentication and retries with the Checkmarx One API by wrapping the Python [requests](https://pypi.org/project/requests/) library.  The `CxOneClient` object performs the logic for API I/O and is used in
both the `low` and `high` modules.  The client's methods can also be used to directly invoke the Checkmarx One API via HTTP verb methods in `requests`; as part of the invocation, the `CxOneClient` instance will properly manage an authenticated session with the Checkmarx One API.

As the Checkmarx One API evolves, adding wrapper methods in the `low` module may take some time.  This example demonstrates the use of `CxOneClient` to interface directly with the Checkmarx One API if a wrapper function is not available.

This approach is typically not required but is included in the tutorial to show how the `cxone-async-api` works.

## Calling the `versions` API

An example API call to the `versions` API using a wrapper function was demonstrated as part of the previous tutorial.  That same API can be executed using the [`requests.get`](https://requests.readthedocs.io/en/latest/api/#requests.get) HTTP verb method with the `CxOneClient` instance.


In [None]:
import requests
from requests.compat import urljoin

response = await client.exec_request(requests.get, # The method from requests that implements the GET verb
                                     url=urljoin(client.api_endpoint, "versions") # Composes the URL for the API endpoint
                                     )

print (f"API Response: {response.status_code}")

if response.ok:
    JSON(response.json())
else:
    print(f"Failure: {response.reason}")


## Calling the `projects-overview` API with Query Parameters

Using query parameters is possible by using the `params` parameter in the HTTP verb method.  The `args` and `kwargs` are passed to the HTTP verb method.



In [None]:
import requests
from requests.compat import urljoin

response = await client.exec_request(requests.get, 
                                     url=urljoin(client.api_endpoint, "projects-overview"),
                                     # Use parameters to specify limiting the retrieved projects to 3 and
                                     # projects where the projects have scans submitted from zip files or
                                     # code repository import events from GitHub
                                     params={"limit" : "3", "source-type" : "zip,github"})

print (f"API Response: {response.status_code}")

if response.ok:
    json = response.json()

    for project in json['projects']:
        print(f"Project: {project['projectName']} Source: {project['sourceType']}")

    MD("**Response JSON:**")
    JSON(json)
else:
    print(f"Failure: {response.reason}")

# Tutorial 3: The Low Level Module

The `low` module of the `cxone-async-api` is intended to follow the Checkmarx One [API documentation](https://checkmarx.stoplight.io/docs/checkmarx-one-api-reference-guide).
The namespacing and method calls will closely follow the documentation structure.

The `low` module's API wrapping methods will typically have the `CxOneClient` instance as the first parameter. Other required method parameters will map to URL path parameters and/or content to be sent as part of the API call.

When an API defines query parameters, the query parameters can be passed as `kwarg` parameters that are translated to the URL query parameters.  Some examples:

* If the query parameter `limit` is defined in the API, a `kwarg` with the name `limit` is assigned the value that should be set in the URL.
* Query parameters that have dashes in the name, such as `source-type` are translated from `kwargs` since dashes are not valid in Python syntax.  This means the `kwarg` with the name `source_type` or `sourcetype` is translated to the query parameter `source-type`.

## Retrieve a List of Scans with `cxone_api.low.scans.retrieve_list_of_scans`

This example shows using the `from-date` query parameter to return scans from the last 15 days and the `limit` parameter restricting the display to a maximum of 3 scans. The `kwarg` parameter `from_date` is translated to the `from-date` query parameter.


In [None]:
from datetime import datetime,timedelta,timezone
from cxone_api.util import json_on_ok
from cxone_api.low.scans import retrieve_list_of_scans

scans = json_on_ok(await retrieve_list_of_scans(client, limit=3, from_date=(datetime.now(timezone.utc) - timedelta(days=15)).isoformat()))
JSON(scans)


## Chaining Multiple Low-Level API Calls

The previous examples have been simple data retrieval calls.  In reality, it will often be the case that there will be a need for multiple API calls to be executed in a logical order.  This example will show how to request a report for a scan.  To obtain a report, multiple API calls must be made before the compiled report can be retrieved.

In [None]:
from IPython.display import HTML
from cxone_api.util import json_on_ok
from cxone_api.low.scans import retrieve_list_of_scans

# Get an arbitrary scan from the retrieve_list_of_scans API
MD("### Step 1: Retrieve an arbitrary scan ID")
scan_json = json_on_ok(await retrieve_list_of_scans(client, limit=1, statuses="Completed"))
scanid = scan_json['scans'][0]['id']
projectid = scan_json['scans'][0]['projectId']
branch = scan_json['scans'][0]['branch']
MD(f"Using Scan Id **{scanid}** from project **{scan_json['scans'][0]['projectName']}**")
MD("*Response JSON:*")
JSON(scan_json)


from cxone_api.low.reports import create_a_report
MD("### Step 2: Create a report")

# This API kicks off a server-side workflow to compile a report.  The identifier
# returned is later used to retrieve the report.
create_result = json_on_ok(await create_a_report(client, 
                      reportName="improved-scan-report", 
                      fileFormat="json",
                      reportType="cli",
                      data={"scanId" : scanid, "projectId" : projectid}), [202])

requestid = create_result["reportId"]
MD(f"Report request id: **{requestid}**")
MD("*Response JSON:*")
JSON(create_result)

from cxone_api.low.reports import retrieve_report_status
MD("### Step 3: Check the status of the report")

# The report may take a while to compile, so loop with a delay
# to check the report status.  The number of loop iterations
# should be limited so that the checks don't run forever.
tries = 30
report_url = None
while tries > 0:
    tries -= 1
    status_response = json_on_ok(await retrieve_report_status(client, requestid))
    status = status_response['status']
    MD(f"Report compile status: {status}")
    
    if not status == "completed":
        # Still compiling, wait a few seconds before checking again.
        JSON(status_response)
        await asyncio.sleep(2)
    else:
        report_url = status_response['url']
        JSON(status_response)
        break

assert(report_url is not None)

MD("### Step 4: Download the report\n\n*Uncomment the last line of code to see the large JSON output*")
from cxone_api.low.reports import download_a_report
# JSON(json_on_ok(await download_a_report(client, requestid)))



## Scan an Uploaded Zip File

This tutorial shows some of the steps taken by the Checkmarx One CLI to upload source code for scanning.  Other methods of scanning will be covered in a later tutorial.



In [None]:
from os import path
from cxone_api.util import json_on_ok
from cxone_api.low.projects import retrieve_list_of_projects, create_a_project
from cxone_api.low.uploads import generate_upload_link, upload_to_link
from cxone_api.low.scans import run_a_scan, retrieve_scan_details

# Check the test upload file exists
assert(path.exists(zip_path))
assert(test_scan_project_name is not None and len(test_scan_project_name) > 0)

MD("### Step 1: Determine the project id where the scan will execute")

projectid = await get_test_projectid(client)

MD(f"Using projectid: **{projectid}**")


MD("### Step 2: Get an upload link where the zip for scanning will be sent")
# Get an upload link for the zip that will be uploaded for scanning
upload_link = json_on_ok(await generate_upload_link(client))['url']


MD("### Step 3: Upload the zip containing the source code to scan")
# Upload the zip file
assert((await upload_to_link(client, upload_link, zip_path)).ok)

MD("### Step 4: Start the scan")
# Start the scan with a few configuration options
scan_res = json_on_ok(await run_a_scan(client, {
"project" : {"id" : projectid},
"type" : "upload",
"handler" : {
  "uploadurl" : upload_link,
  "branch" : "zip"
  },
"config" : [
  {
    "type" : "sast", "value" : {}
  },
  {
    "type" : "sca", "value" : {}
  }
]
}))

scanid = scan_res['id']
MD(f"Scan started, scan id: **{scanid}**")


MD("### Step 5: Wait for the scan to complete")
# Wait for the scan to finish
while True:
  status = json_on_ok(await retrieve_scan_details(client, scanid))
  display_str = f"Scan id: **{scanid}** Status: **{status['status']}**"
  if status['status'] not in ["Running", "Queued"]:
    MD(display_str + ": **Done**")
    break

  MD(display_str + ": Sleeping 10 seconds...")
  await asyncio.sleep(10)



# Tutorial 4: Low-Level Utilities


## The `json_on_ok` Utility

The `json_on_ok` utility is a helper to retrieve the JSON response from an API request. By default, the status code of the response must be in the 2XX range to be considered "ok" to return a JSON response.  An optional parameter `specific_responses` can provide a list of response codes that are considered "ok" if an API usage returns a status code other than 2XX.

### Positive response example

This example is repeated from previous tutorials that call the `versions` API.  Since the client is properly authenticated, it would be expected to respond with a 2XX response.

In [None]:
from cxone_api.util import json_on_ok
from cxone_api.low.misc import retrieve_versions

# The API call returns a requests.Response object
versions_response = await retrieve_versions (client)

MD(f"Response code: **{versions_response.status_code}**")

# Dump the json response for display purposes
JSON(json_on_ok(versions_response))


### Negative response example

This example causes `json_on_ok` to raise a `ResponseException` when it encounters an unexpected response code.

In [None]:
from cxone_api.util import json_on_ok
from cxone_api.exceptions import ResponseException
from cxone_api.low.misc import retrieve_versions

# The API call returns a requests.Response object
versions_response = await retrieve_versions (client)

MD(f"Response code: **{versions_response.status_code}**")

# Simulate an unexpected response by specifying an incorrect expected response code.
try:
  JSON(json_on_ok(versions_response, [401]))
except ResponseException as ex:
  MD("Exception: " + str(ex))


## The `page_generator` Utility

Checkmarx One APIs that would potentially return a large number of data elements will return a limited amount of data on the initial API call.  Subsequent API calls will typically indicate the next response should return
data that logically follows the data from the previous response.  Each response from the API is therefore considered a "page" of data.

The `page_generator` utility automates the page retrieval of a paged API such that returned JSON data elements can be iterated using an `async for` loop.  The `page_generator` handles the automatic retrieval of the next page of data.

### Row-Offset API Example

All of the Checkmarx One APIs that potentially return large data sets use the parameter `limit` to indicate the maximum number of data elements to return as a response to an API call.  For the API used in this example, `offset` is an index value into the number of available data elements to return.  This example shows the use of `page_generator` to iterate by data element count.

The `page_generator` parameters are mostly left as default values in this example.


In [None]:
from cxone_api.util import page_generator
from cxone_api.low.scans import retrieve_list_of_scans

table = "|Scan Id|Status|Date|\n|-|:-:|-|\n"

# Iterate over scans where the scans are retrieved by index offset.
async for scan in page_generator(
  retrieve_list_of_scans, # The coroutine that calls the API
  array_element='scans', # The list containing the array of data is in the "scans" json element
  client=client, # The CxOneClient instance passed as a parameter to the coroutine
  limit=5 # Passed to the API to set the response data element limit to 5 to demonstrate the paging
):
  table += f"|{scan['id']}|{scan['status']}|{scan['updatedAt']}|\n"

MD(table)


### Page-Offset API example

Some of the APIs describe their limit/offset paging parameters such that the `offset` yields the returned data starting at "offset * limit"; this means the `offset` value is a 0-indexed page number among multiple pages each having up to `limit` data elements.

The initial call, for example, assumes `offset` is 0 if the parameter is not provided.  Therefore, the number of rows returned will have a count maximum of `limit` and start at the first row available.  The next `offset` value of 1 will return a maximum count of `limit` starting with the data element at index "(offset * limit) + 1".

This example shows the use of `page_generator` for APIs that use `limit` and `offset` for retrieval by page rather than by data element index.


In [None]:
from cxone_api.util import page_generator
from cxone_api.low.all_scanners_results import retrieve_scan_results_all_scanners
from cxone_api.low.scans import retrieve_list_of_scans


# Get an arbitrary scan from the retrieve_list_of_scans API
MD("#### Step 1: Retrieve an arbitrary scan ID")
scan_json = json_on_ok(await retrieve_list_of_scans(client, limit=1, statuses="Completed"))
scanid = scan_json['scans'][0]['id']
projectid = scan_json['scans'][0]['projectId']
branch = scan_json['scans'][0]['branch']
MD(f"Using Scan Id **{scanid}** from project **{scan_json['scans'][0]['projectName']}**")

MD("#### Step 2: Iterate pages of results")


table = "|Type|Severity|Name|\n|-|:-:|-|\n"

# Iterate over results where the results are retrieved by page offset.
async for result in page_generator(retrieve_scan_results_all_scanners, # The coroutine that calls the API
                                   array_element='results', # The list containing the array of data is in the "results" json element
                                   offset_is_by_count=False, # Indicates the offset is by page rather than returned result count
                                   client=client, # The CxOneClient instance passed as a parameter to the coroutine
                                   scan_id=scanid, # Required query parameter for the API
                                   limit=5 # Passed to the API to set the response data element limit to 5 to demonstrate the paging
                                   ):
  
  table += f"|{result['type']}|{result['severity']}|"
  if result['type'] == 'sast':
    table += f"{result['data']['group']}::{result['data']['queryName']}|\n"
  elif result['type'] == 'sca':
    table += f"{result['id']}|\n"
  else:
    MD("Unhandled result type - fix the display as an exercise.")
    JSON(result)
    break

MD(table)


# Tutorial 5: The High Level Module

The previous tutorials have shown the use of `cxone-async-api.low.*` to make direct API calls.  These can be incorporated in coding logic in any combination to perform any required business logic.

The high-level module is intended to offer methods and classes that can perform common business logic functions that would require one or more low-level API calls and interpretation of responses.  Each implementation in the high-level module is a composite of several of the methods from the low-level module.  The `high` module will add more functionality over time.

## High Level `CxOneVersions` Class

The low-level `cxone-async-api.low.misc.retrieve_versions` will return a `requests.Response` object that contains a JSON response with the versions deployed on the Checkmarx One instance.  This high-level API is similar with the exception that the class from the high-level module gives provides a concrete class instance with properties for the relevant version elements.

In [None]:
from cxone_api.high.util import CxOneVersions

v = await CxOneVersions.factory(client)

MD(f"**CxOne Version: {v.CxOne}** includes *SAST Version {v.SAST}* and *KICS Version {v.KICS}*")


## High Level `ScanInspector` Class

The `ScanInspector` loads data for a scan by scan id and performs the logic that determines if a scan is executing, failed, or successful.

The function of `ScanInspector` is to translate multiple scan states such that understanding the disposition of the scan is programmatically easier.

Examples:
* A scan in the state of `Queued` can be considered "executing" as can a scan in the state `Running`.
* A scan in the state `Partial` has neither failed nor has it been successful.
* A scan in the `Canceled` state can be considered as failed as can a scan in the `Failed` state.

This example will look for scans in various states to show how the `ScanInspector` interprets the scan state.

In [None]:
import asyncio
from cxone_api.high.scans import ScanInspector, ScanLoader
from cxone_api.low.scans import retrieve_list_of_scans
from cxone_api.util import json_on_ok

lock = asyncio.Lock()

# Some helper functions for this example
async def get_a_scan_id(status):
  list_response = await retrieve_list_of_scans(client, limit=1, statuses=status)
  if list_response.ok:
    scan_list = json_on_ok(list_response)
    if len(scan_list['scans']) > 0:
      return scan_list['scans'][0]['id']
  
async def dump_scan_inspector(inspector : ScanInspector, state):
  async with lock:
    markdown = f"### `ScanInspector` Example for a `{state}` Scan\n\n"
    markdown += f"**Scan ID: {inspector.scan_id}** from Project ID: {inspector.project_id}\n\n"
    markdown += f"* Currently executing: {inspector.executing}\n"
    markdown += f"* Scan failed: {inspector.failed}\n"
    markdown += f"* Scan successful: {inspector.successful}\n\n"
    markdown += f"Scan message: **{inspector.state_msg}**\n"
    MD(markdown)

async def show_example(state):
  scan_id = await get_a_scan_id(state)
  if scan_id is not None:
    # Create an instance of the ScanInspector using ScanLoader.load
    await dump_scan_inspector(await ScanLoader.load(client, scan_id), state)
  else:
    async with lock:
      MD(f"### *No scan id for a `{state}` scan found, skipping example.*")

scan_states = ['Queued', 'Running', 'Completed', 'Failed', 'Partial', 'Canceled']

# ScanInspector instances loaded in parallel using asyncio
_ = await asyncio.gather(*[show_example(state) for state in scan_states])



## High Level `ProjectRepoConfig` Class

There are currently two types of projects in Checkmarx One:

* Projects for manual scans
* Projects created with a Code Repository Integration

Manual scan projects can optionally have repository information provided in the project configuration.  Projects with a Code Repository Integration will have a repository configuration but the configuration is performed differently from how a repository is configured for a manual scan project.  The `ProjectRepoConfig` class performs the logic needed to detect the different types of projects and allow a programmatic method of inspecting the project configuration.

This example will list the project repository configuration for projects found in the system.


In [None]:
from cxone_api.high.projects import ProjectRepoConfig
from cxone_api.util import page_generator
from cxone_api.low.projects import retrieve_list_of_projects

markdown = "|Project Name|Imported|SCM Type|Repo URL|Primary Branch|Creds Expired|SCM Org|Default Engines|\n"
markdown += "|-|-|-|-|-|-|-|-|\n"

# Iterate through the first 30 projects found and list the repo configuration details
max = 30
async for project_json in page_generator(retrieve_list_of_projects, "projects", client=client):
  repo = await ProjectRepoConfig.from_project_json(client, project_json)

  markdown += f"|{project_json['name']}"
  markdown += f"|{await repo.is_scm_imported}"
  markdown += f"|{await repo.scm_type}"
  markdown += f"|{await repo.repo_url}"
  markdown += f"|{await repo.primary_branch}"
  markdown += f"|{await repo.scm_creds_expired}"
  markdown += f"|{await repo.scm_org}"
  markdown += f"|{await repo.get_enabled_scanners(await repo.primary_branch)}"

  markdown += "|\n"

  if max == 0:
    break
  max -= 1


MD(markdown)


## High Level `ProjectScanConfiguration` Class

Default scan configurations for different engines can be set at either the Tenant level or the project level.  The Tenant level settings can be overridden at the project level unless a configuration override is disabled.  The `ProjectScanConfiguration` class allows for reading and updating the Project level scan configuration settings.

The `TenantScanConfiguration` class works like the `ProjectScanConfiguration` class except that `TenantScanConfiguration` will write configuration changes at the Tenant level.  Tenant level changes affect all projects, so use with caution.

**Note:** This example will write configuration changes to the test project name that was set in the notebook settings.

This example will show how to read and update settings for the SAST engine.  The configuration classes have configuration options for other engine types that can be explored by extending this code sample.


In [None]:
from cxone_api.high.scan_configuration import ProjectScanConfiguration
from cxone_api.high.scan_configuration.categories import ConfigurationPropertyHandler, SastScanConfig
from cxone_api.util import json_on_ok
from cxone_api.low.projects import retrieve_list_of_projects

project_search_response = json_on_ok(await retrieve_list_of_projects(client, name_regex=f"^{test_scan_project_name}$"))

async def make_table_entry(name : str, config_prop : ConfigurationPropertyHandler) -> str:
  return f"|{name}|{await config_prop.getValue()}|{await config_prop.getOverride()}|\n"


async def make_sast_table(config : SastScanConfig) -> str:
  markdown = "|Config Name|Value|Allow Override|\n|-|:-:|:-:|\n"
  markdown += await make_table_entry("Fast Scan", config.FastScan)
  markdown += await make_table_entry("Exclusions", config.Exclusions)
  markdown += await make_table_entry("Engine Verbose", config.EngineVerbose)
  markdown += await make_table_entry("Incremental Scan", config.IncrementalScan)
  markdown += await make_table_entry("Preset", config.Preset)
  markdown += await make_table_entry("Recommended Exclusions", config.RecommendedExclusions)
  markdown += await make_table_entry("Language Mode", config.LanguageMode)

  return markdown

if project_search_response['filteredTotalCount'] != 1:
  MD(f"Project id for {test_scan_project_name} could not be found, skipping the example.")
else:
  project_id = project_search_response['projects'][0]['id']
  config = ProjectScanConfiguration(client, project_id)

  markdown = f"### Project Configuration for Project ID {project_id}\n"

  markdown += "#### SAST Configuration (Before Change)\n"
  markdown += await make_sast_table(config.SAST)

  # Change the SAST Exclusions for the project configuration
  previous_value = await config.SAST.Exclusions.getValue()
  previous_override = await config.SAST.Exclusions.getOverride()
  await config.SAST.Exclusions.setValue("!**/node_modules/*")
  await config.SAST.Exclusions.setOverride(False)
  await config.commit_config()

  # Reload the configuration to demonstrate the change was written
  config = ProjectScanConfiguration(client, project_id)
  markdown += "#### SAST Configuration (After Change)\n"
  markdown += await make_sast_table(config.SAST)

  # Change the configuration back to the original value
  await config.SAST.Exclusions.setValue(previous_value)
  await config.SAST.Exclusions.setOverride(previous_override)
  await config.commit_config()

  MD(markdown)



## High Level `Groups` Class

The `Groups` class allows for logically retrieving Checkmarx One group information.  The class itself will load group information on demand and perform caching of group information.

This example will use the `Groups` class to identify projects assigned to each group.


In [None]:
import asyncio
from cxone_api.util import json_on_ok
from cxone_api.low.projects import retrieve_project_info
from cxone_api.high.access_mgmt.user_mgmt import Groups


g = Groups(client)

# Iterate through the groups, show the projects that are assigned to
# each group.
for group_path in await g.get_path_list():

  descriptor = await g.get_by_full_path(group_path)
  MD(f"### Projects in group {descriptor.name} [{str(descriptor.path)} id: {descriptor.id}]")

  project_ids = await g.get_project_ids_by_groups([descriptor])

  if len(project_ids) > 0:
    markdown = "\n"
    # Use asyncio to load project information in parallel
    projects = [json_on_ok(p) for p in await asyncio.gather(*[retrieve_project_info(client, id) for id in project_ids])]
    for project in projects:
      markdown += f"* {project.get('name')}\n"
    
    MD(markdown)
  else:
    MD("No associated projects found.")





# Tutorial 6: Invoking Scans with the High Level API

A previous low-level module tutorial showed a method of invoking a scan using a local zip file.  There are several ways of invoking a scan that require different interactions with the API that will execute a scan.  The scan invocation methods that can be used are:

* Scanning with a zip upload
* Scanning with a Git clone URL (with or without credentials)
* Scanning the default or a specified branch of a manual project's configured Git clone URL (with or without credentials)
* Scanning the default or a specified branch of an imported project

The `ScanInvoker` class can be used to invoke scans with these scenarios.  The logic needed to form the JSON payload correctly for each type of scan is
handled by the `ScanInvoker` implementation.

## Engine Configurations

Before starting the next examples, it is important to understand that all scans are performed with one or more selected scan engines.  A configuration for
each engine can be provided at the time of invocation via the API but is not required.  The engines for the scan must be selected even if a specific
configuration for the selected engine is not provided at the invocation of the API.  If no specific engine configuration is provided, the project's engine
configuration is used by Checkmarx One.

All selected engines and associated configurations used by `ScanInvoker` methods follow the
[Run a scan](https://checkmarx.stoplight.io/docs/checkmarx-one-api-reference-guide/i3mwd8u3zyoox-run-a-scan)
API conventions for engine selection and configuration.  The API will return an error if at least one engine with an empty configuration is not
provided.  The `ScanInvoker` methods have logic that selects the engines and uses their default configuration if an engine configuration is not
specifically provided.  The logic chooses the engines in this order:

1. If the engine configuration is provided as a parameter value to the `ScanInvoker` method, that configuration is used.
2. If a scan exists in the target project using the target branch, the engine selection for that scan is used.
3. Otherwise, a default configuration selecting the SAST engine only is used.

## Example: `ScanInvoker` with a local zip upload
This example shows how to use `ScanInvoker` to upload a local zip file and start a scan.  The branch name used for this example is `zip`.



In [None]:
from cxone_api.high.scans import ScanInvoker
from cxone_api.util import json_on_ok

if not (zip_path is None or zip_path == ""):
  project_id = await get_test_projectid(client)

  # Simple scan with a local zip upload.
  JSON(json_on_ok(await ScanInvoker.scan_by_local_zip_upload(client, project_id, zip_path, "zip")))
else:
  MD("**A value for `zip_path` not provided, example skipped.**")

## Example: `ScanInvoker` with code repository import project

Executing a scan with a project that was created using the "Code Repository" import will not require the need for a zip upload or clone credentials.  The selected engines to scan are configured in the project settings as is the default branch for the repository.  This example shows a scan started using the engines selected in the project configuration.

In [None]:
from cxone_api.high.scans import ScanInvoker
from cxone_api.util import json_on_ok

if not (import_scan_project_id is None or import_scan_project_id == ""):
  response = await ScanInvoker.scan_by_project_config(client, import_scan_project_id)

  if response.ok:
    MD("**Scan started**")
  else:
    MD(f"**FAILED:** Request to start scan response code {response.status_code}.")
else:
  MD("**A value for `import_scan_project_id` not provided, example skipped.**")

## Example: `ScanInvoker` with manual scan project

A manual scan project, unlike a Code Repository import project, can optionally have a clone URL, protected branch, and PAT configured in the project settings.  If these configuration elements are provided, it is possible
to invoke a scan that uses the project configuration elements to retrieve the code.  In this example, the `ScanInvoker` is used to create a scan with the clone parameters that have been configured in the project.  If your project does not have these configured, this example will fail.

By not specifying the engine configuration, the `ScanInvoker` will attempt to use the same engines that were used in the latest scan on the configured branch.

In [None]:
from cxone_api.high.scans import ScanInvoker
from cxone_api.util import json_on_ok

if not (manual_scan_project_id is None or manual_scan_project_id == ""):
  JSON(json_on_ok(await ScanInvoker.scan_by_project_config(client, manual_scan_project_id)))
else:
  MD("**A value for `manual_scan_project_id` not provided, example skipped.**")

## Example: `ScanInvoker` with a clone URL and credentials

Providing a clone URL and any required credentials will let Checkmarx One perform the clone operation.  This assumes that the Checkmarx One instance can reach the SCM server.

In [None]:
from cxone_api.high.scans import ScanInvoker
from cxone_api.util import json_on_ok


if clone_project_id is None or clone_project_id == "":
  MD("`clone_project_id` is not defined, skipping this example.")
elif clone_url is None or clone_url == "":
  MD("`clone_url` is not defined, skipping this example.")
elif clone_branch is None or clone_branch == "":
  MD("`clone_branch` is not defined, skipping this example.")
elif clone_pat is None or clone_pat == "":
  MD("`clone_pat` is not defined, skipping this example.")
else:
  JSON(json_on_ok(await ScanInvoker.scan_by_clone_url(client, 
                                               clone_project_id,
                                               clone_url,
                                               branch=clone_branch,
                                               clone_cred_type = ScanInvoker.CredentialTypeEnum.APIKEY,
                                               clone_cred_value=clone_pat)))
  


# Tutorial 7: Reading Presets with the High Level API

The `PresetReader` class allows for preset details to be utilized by integrations. Obtaining preset data typically needs multiple API calls to obtain the full preset
information; the `PresetReader` class allows the presets to be explored as logically correlated data without needing to make multiple API calls.

## Example: Dump SAST Presets to a CSV with `PresetReader`

This example shows how it is possible to dump preset details for all presets into a CSV.


In [None]:
import csv
from cxone_api.high.presets import PresetReader, PresetEngine

with open("cxone_presets.csv", "wt") as csvfile:
  out = csv.writer(csvfile, quoting=csv.QUOTE_ALL)

  out.writerow(['PresetName','IsCustomPreset','Language','QueryType','QueryCategory','QueryName','QuerySeverity','QueryCWE','QueryIsCustom'])

  reader = PresetReader(client, PresetEngine.SAST)

  for preset_descriptor in await reader.get_presets():
    for family_descriptor in preset_descriptor.QueryFamilies:
      for query_descriptor in family_descriptor.Queries:
        out.writerow([preset_descriptor.Name, preset_descriptor.Custom, family_descriptor.Name, query_descriptor.TypeKey, query_descriptor.CategoryName,
                      query_descriptor.Name,query_descriptor.Severity,query_descriptor.CWE,query_descriptor.Custom])


## Example: Get Custom and/or Standard SAST Presets

Checkmarx One has several standard presets that can be copied and customized.  Customized presets are used to provide a custom set of queries that align with required
risk profiles.  The following example shows how `PresetReader` can be used to easily obtain the list of custom or standard presets.

In [None]:
from cxone_api.high.presets import PresetReader, PresetEngine

reader = PresetReader(client, PresetEngine.SAST)

markdown = "### Custom SAST Presets\n\n"
for descriptor in await reader.get_custom_presets():
  markdown += f"* {descriptor.Name}\n"
MD(markdown)

markdown = "### Standard SAST Presets\n\n"
for descriptor in await reader.get_standard_presets():
  markdown += f"* {descriptor.Name}\n"
MD(markdown)


## Example: Get Custom and/or Standard SAST Queries by Language

SAST presets are composed of query selections of one or more language.  Other engines have preset concepts that separate queries in general categories similar to how
SAST separates them by language.  Each group of queries is known as a "family".  Each "family" for SAST presets correspond to a programming language.

This example shows how `PresetReader` can be used to obtain a list of standard and custom SAST executable queries.

*Note: The executable queries can be selected as part of a preset. Non-executable queries may exist but won't be presented by `PresetReader` as they are not available for preset assignment.*

In [None]:
from cxone_api.high.presets import PresetReader, PresetEngine

reader = PresetReader(client, PresetEngine.SAST)

# Get queries for the Java programming language.
family = "Java"

markdown = f"### Custom SAST Queries for {family}\n\n"
for descriptor in await reader.get_custom_queries_by_family(family):
  markdown += f"* {descriptor.CategoryName}::{descriptor.Severity}::{descriptor.Name}\n"
MD(markdown)

markdown = f"### Standard SAST Queries for {family}\n\n"
for descriptor in await reader.get_standard_queries_by_family(family):
  markdown += f"* {descriptor.CategoryName}::{descriptor.Severity}::{descriptor.Name}\n"
MD(markdown)


# Tutorial 8: Creating a PDF Report

Several different reports can be retrieved using objects in module `cxone_api.high.reports`.  This example shows how to create a PDF
report using `cxone_api.high.reports.PDFReport` and `cxone_api.high.reports.ImprovedScanReport` to create a report.

If you have configured the `email_list` variable in this notebook's first cell to contain email addresses, a link to download the PDF report will be emailed to those
recipients.


In [None]:
from cxone_api.high.reports import PDFReport, ReportType, ImprovedScanReport
from cxone_api.low.projects import retrieve_last_scan
from cxone_api.util import json_on_ok

# Select email report type if emails have been provided
report_type = ReportType.CLI if email_list is None else ReportType.EMAIL

MD(f"Report type: *{report_type.value}*")

# Get the last scan in the test project
test_pid = await get_test_projectid(client)
last_scans = json_on_ok(await retrieve_last_scan(client, project_ids=[test_pid], scan_status='Completed'))
if len(last_scans.keys()) > 0:
  scan_id = next(iter(last_scans.values())).get('id', None)

  if scan_id is not None:
    with open("test.pdf", "wb") as pdf_out:
      # The PDFReport class will return a bytearray of the PDF content.
      pdf_out.write(await PDFReport.get_report(ImprovedScanReport(client, scan_id, test_pid, email=email_list, report_type=report_type)))
      pdf_out.flush()
  
    MD("PDF has been written to `test.pdf`")
  else:
    MD(f"No report was created, could not determine the last scan id.")

else:
  MD(f"Please execute at least one scan in project {test_pid}")




# Tutorial 9: Visualization

The data for Checkmarx One scans can be retrieved and used for a variety of purposes.  Creating plots using scan data is often used to recognize trends
and other interesting performance indicating statistics.  The next section shows one of many possible methods that can be used to create plots using the
`cxone-async-api` with Pandas and Plotly.

*Note:* These examples load data into memory by accessing the Checkmarx One APIs.  Loading data from remote services is slow and large amounts of data being held in memory works until
physical memory is completely utilized.  These examples may not be suitable for production purposes.

## Example: Plotting with Scan Metadata

The following cells will load scan metadata and show some simple plots using Pandas and Plotly.  The code will retrieve the latest scan and create simple plots from the data.


In [None]:
from cxone_api.high.reports import ImprovedScanReport, CSVReport, ReportType
from cxone_api.util import json_on_ok, page_generator
from cxone_api.low.projects import retrieve_last_scan
from cxone_api.low.reports import create_a_report, retrieve_report_status, download_a_report
import pandas as pd
import asyncio, io

# This cell populates the scan dataframe with an async comprehension using page_generator

# If your system has an extreme number of projects, this may fail as written.  If so, refactor it to work.
scan_df = pd.json_normalize([scan async for scan in page_generator(retrieve_last_scan, key_element_name="projectid", client=client, scan_status='Completed')], record_prefix=True)
scan_df.set_index(['projectid', 'id'], inplace=True)


# Expand the engines list into columns for analysis purposes
engine_temp_df = scan_df.engines.map(",".join)

engine_types = set()

for col in engine_temp_df:
  for etype in col.split(","):
    engine_types.add(etype)

engine_calc_df = pd.DataFrame(index=engine_temp_df.index, 
                              data={eng:[0] * engine_temp_df.index.size for eng in engine_types},
                              columns=list(engine_types), dtype=int)

def set_engine(x):
  for index in x.index.values:
    if x.name in scan_df.loc[index].engines:
      engine_calc_df.loc[index, x.name] = 1
    else:
      engine_calc_df.loc[index, x.name] = 0

engine_calc_df.apply(set_engine, axis=0)

# Add the engine columns back into the dataframe.
# Hint: Add a new code cell below this cell and execute the single line:
# scan_df.head(10)
scan_df = scan_df.join(engine_calc_df)



In [None]:

# A simple chart of which engines have been selected for scans
sum = scan_df[list(engine_types)].sum()

scan_types = pd.DataFrame(data={
  "Count" : sum,
  "Percent" : sum.apply(lambda x: (x / scan_df.shape[0])* 100.0)
}, index=sum.index.values, columns=["Count", "Percent"])
scan_types.reset_index(inplace=True, names="Engine")

barChart(scan_types, "Engine", "Percent", "Scan Engine Selection by Percentage of Selected Scans")



In [None]:

# A simple chart showing User Agent used when starting a scan
pieChart(scan_df.groupby("userAgent", as_index=False).size(), "size", "userAgent", "Scans Started by User Agent")


In [None]:
# A simple chart showing the top 10 users who started scans
pieChart(scan_df.groupby("initiator", as_index=False).size().nlargest(10, 'size'), "size", "initiator", "Top 10 Users Initiating Scans")




## Example: Plotting with Scan Details

The following cells show how data from a combination of CSV reports, retrieved by the `cxone_api.high.reports.CSVReport` class, can be plotted.



In [None]:
from cxone_api.high.reports import ImprovedScanReport, CSVReport

# Define an async method to retrieve a scan report and convert it to a pandas
# dataframe.  The semaphore limits the scan generation so that only 2 run concurrently.
# If there are many projects in your Checkmarx One tenant, this will take some time.
#
# This uses the high-level API CSVReport and ImprovedScanReport to retrieve the scan
# reports for each scan.
throttle = asyncio.Semaphore(2)
async def retrieve_report(project_id, scan_id):
  async with throttle:
    report_df = pd.read_csv(io.StringIO(await CSVReport.get_report(ImprovedScanReport(client, scan_id, project_id))))
    rows = report_df.shape[0]
  
    if rows == 0:
      return None
    
    report_df.insert(0, "id", pd.Series([scan_id] * rows))
    report_df.insert(0, "projectid", pd.Series([project_id] * rows))
    report_df.set_index(['projectid', 'id'], inplace=True)
    return report_df


# A function that will set up the array of coroutines that
# will be used by asyncio.gather to load scan reports.
report_load_array = []
def load_func(scan_row):
  project_id = scan_row.name[0]
  scan_id = scan_row.name[1]
  report_load_array.append(retrieve_report(project_id, scan_id))

# This iterates each scan row and build a list of API call coroutines that
# will be used by asyncio.gather
scan_df.apply(load_func, axis=1)


# Execute the report loading in parallel.
scan_reports_array = await asyncio.gather(*report_load_array)

# Create one dataframe with all the scan reports
scan_reports_df = pd.concat([x for x in scan_reports_array if x is not None])


In [None]:
px.bar(scan_reports_df.where(scan_reports_df['Result Severity'].isin(["CRITICAL","HIGH"])).groupby(["Query", "Result State"], as_index=False).size().nlargest(10, "size"), 
      x="size", y="Query", orientation='h', color="Result State", barmode="group",
      title="Triage State of Top 10 Most Frequently Reported Critical and High Vulnerabilities")



In [None]:
pieChart(scan_reports_df.groupby("Result Status", as_index=False).size(), "size", "Result Status", "Distribution of Result Status")


In [None]:
crit_high_counts = scan_reports_df.where(scan_reports_df['Result Severity'].isin(['CRITICAL','HIGH'])).groupby('projectid', as_index=False).size().nlargest(10, 'size')

px.bar(scan_reports_df.where(scan_reports_df['Result Severity'].isin(['CRITICAL','HIGH'])).loc[crit_high_counts['projectid']]
       .groupby(['projectid', "Result Severity"], as_index=False).size(), 
      x="size", y="projectid", orientation='h', color="Result Severity", barmode="group",
      title="Top 10 Projects with Largest Numbers of Critical and High Vulnerabilities")


# Tutorial 10: SCA Analysis Queries

Using SCA's GraphQL API, it is possible to retrieve data for Packages, Licenses, and Risks found across all tenant scans.  This data can then used for analysis purposes.  This tutorial will explain how to use the high-level objects that assist in data retrieval.

## Using `cxone_api.high.sca.ScaTenantPackages`

### Example 1: Iterate all package data

In [None]:
from cxone_api.high.sca import ScaTenantPackages

# Create an instance of the ScaTenantPackages object and set the optional page size
# so that the number of records retrieved per API call is limited.
tenant_pkgs = ScaTenantPackages(client, page_size=2)

# This is to demonstrate that the next page of data is automatically
# retrieved from the API when a new page of data is needed.
max_iterations = 6

async for json_row in tenant_pkgs:
  if max_iterations == 0:
    break
  JSON(json_row)
  max_iterations -= 1


### Example 2: Specify a sorted order for the retrieved data

This example shows how to send instructions to return data with a sorting order specification.  Specifying a sort order is optional.

*Note*: `ScaTenantPackages` is currently the only API that supports specifying a sort order for the returned data.


In [None]:
from cxone_api.high.sca import ScaTenantPackages

# This is the same as the previous example except for the additon of the sorting order.
tenant_pkgs = ScaTenantPackages(client, page_size=2)

# Sorting order is set by specifying the json key names in the appropriate
# method call.
tenant_pkgs.order.add_ascending("packageRepository")
tenant_pkgs.order.add_descending("scanDate")


max_iterations = 6

async for json_row in tenant_pkgs:
  if max_iterations == 0:
    break
  JSON(json_row)
  max_iterations -= 1

### Example 3: Filter the returned data with a `where` clause

A `where` filter can be utilized to reduce the amount of data retrieved from the server.  Specifying a `where` filter is optional.

The `where` filter is defined as a JSON boolean tree that is used by GraphQL to form the filtering criteria.

The `where` filter supports boolean `and` and `or` constructs defined as a tree in a Python dictionary.  Each of these clauses supports a list of field filter operations in this form:

```
{"fieldname" : {"operator" : "value"} }
```

Operators can be:

* `eq`
* `neq`
* `gt`
* `gte`
* `lt`
* `lte`

The `where` operation may not work as expected on all fields.  Experimentation may be neccessary to obtain the desired filter results.  If all else fails, local filtering is always a possibility.

In [None]:
from cxone_api.high.sca import ScaTenantPackages

# This is the same as the previous example except for the additon of the where filter.
tenant_pkgs = ScaTenantPackages(client, page_size=2)

# This example filters the results to show direct dependencies that are not dev/test
where_filter = {
  "and" : [
    {"relation" : {"eq" : "Direct"} },
    {"isDevDependency" : {"eq" : False} },
    {"isTest" : {"eq" : False} }
  ]
}

tenant_pkgs.where = where_filter

max_iterations = 6

async for json_row in tenant_pkgs:
  if max_iterations == 0:
    break
  JSON(json_row)
  max_iterations -= 1

## Using `cxone_api.high.sca.ScaTenantLicenses`


### Example 1: Iterating all license data

In [None]:
from cxone_api.high.sca import ScaTenantLicenses

# Create an instance of the ScaTenantLicenses object and set the optional page size
# so that the number of records retrieved per API call is limited.
tenant_licenses = ScaTenantLicenses(client, page_size=2)

# This is to demonstrate that the next page of data is automatically
# retrieved from the API when a new page of data is needed.
max_iterations = 6

async for json_row in tenant_licenses:
  if max_iterations == 0:
    break
  JSON(json_row)
  max_iterations -= 1

### Example 2: Filter the returned data with a `where` clause

In [None]:
from cxone_api.high.sca import ScaTenantLicenses

# This is the same as the previous example except for the additon of the where filter.
tenant_licenses = ScaTenantLicenses(client, page_size=5)

# This example filters licenses with a risk score >= 5 and in the families of MIT or GNU
where_filter = {
  "and" : [
    {"riskScore" : {"gte" : 5} },
    { "or" : [
      {"licenseFamily" : {"eq" : "MIT" } },
      {"licenseFamily" : {"eq" : "GNU" } }
    ]}
  ]
}

tenant_licenses.where = where_filter

max_iterations = 6

async for json_row in tenant_licenses:
  if max_iterations == 0:
    break
  JSON(json_row)
  max_iterations -= 1

## Using `cxone_api.high.sca.ScaTenantRisks`

### Example 1: Iterating all risk data

In [None]:
from cxone_api.high.sca import ScaTenantRisks

# Create an instance of the ScaTenantRisks object and set the optional page size
# so that the number of records retrieved per API call is limited.
tenant_risks = ScaTenantRisks(client, page_size=2)

# This is to demonstrate that the next page of data is automatically
# retrieved from the API when a new page of data is needed.
max_iterations = 6

async for json_row in tenant_risks:
  if max_iterations == 0:
    break
  JSON(json_row)
  max_iterations -= 1

### Example 2: Filter the returned data with a `where` clause

In [None]:
from cxone_api.high.sca import ScaTenantRisks

# Create an instance of the ScaTenantRisks object and set the optional page size
# so that the number of records retrieved per API call is limited.
tenant_risks = ScaTenantRisks(client, page_size=2)

# This query identifies untriaged results that represent high or critical vulnerabilities
where_filter = {
  "and" : [
    {"state" : {"eq" : "ToVerify"} },
    {"riskType" : {"eq" : "Vulnerability"}},
    { "or" : [
      {"severity" : {"eq" : "High" } },
      {"severity" : {"eq" : "Critical" } }
    ]}
  ]
}

tenant_risks.where = where_filter

# This is to demonstrate that the next page of data is automatically
# retrieved from the API when a new page of data is needed.
max_iterations = 6

async for json_row in tenant_risks:
  if max_iterations == 0:
    break
  JSON(json_row)
  max_iterations -= 1

## Analysis with SCA Data

The SCA analysis data can be utilized for analysis and exploration purposes using any method to read the JSON returned by the API.  Pandas
is one example of an analysis tool that work with the data.  The next code cells show how to load the data into Pandas and perform some simple
analysis work.

In [None]:
import pandas as pd
from io import StringIO
import json

# Load the JSON data into a dataframe.  This example shows a way to iterate
# all JSON records to create a DataFrame.

# Iterate through all records converting each JSON dictionary into a string.
rdr = StringIO("\n".join([json.dumps(x) async for x in ScaTenantRisks(client)]))
sca_df = pd.read_json(rdr, lines=True)


In [None]:
# Show the first 5 rows of the dataframe
sca_df.head(5)


In [None]:
# Show a simple pie chart of the severity categories
pieChart(sca_df.groupby("severity", as_index=False).size(), "size", "severity", "SCA Risk Severities")