Create Asset Compliance: CURRENT_OUTSTANDING policy
-------------------------------------------------------------------------------------

In [1]:
# Define a compliance policy that alerts when an asset spends too long in a bad state.

# Main function establishes a connection to RKVST using an App Registration then uses that
# to create an access policy, test it in good and bad states, then cleans up.

In [2]:
from json import dumps as json_dumps
from os import getenv
from time import sleep
from uuid import uuid4
from warnings import filterwarnings

from archivist.archivist import Archivist
from archivist.compliance_policy_requests import CompliancePolicyCurrentOutstanding
from archivist.constants import ASSET_BEHAVIOURS
from archivist.proof_mechanism import ProofMechanism
from archivist.logger import set_logger

filterwarnings("ignore", message="Unverified HTTPS request")

In [3]:
# URL, CLIENT, SECRET are environment variables that represent connection parameters.
#
# URL = represents the url to the RKVST application
# CLIENT = represents the client ID from an Application Registration
# SECRET = represents the client secret from an Application Registration
RKVST_URL = getenv("RKVST_URL")
APPREG_CLIENT = getenv("RKVST_APPREG_CLIENT")
APPREG_SECRET = getenv("RKVST_APPREG_SECRET")

In [4]:
"""
Main function of Asset and Event creation.

* Connect to RKVST with client ID and client secret
* Creates an Asset and two Events
* Prints response of Asset and Event creation
"""

# Optional call to set the logger level.  The argument can be either
# "INFO" or "DEBUG".  For more sophisticated logging control see our
# documentation.
set_logger("INFO")

# Initialize connection to RKVST
print("Connecting to RKVST")
print("URL", RKVST_URL)
arch = Archivist(RKVST_URL, (APPREG_CLIENT, APPREG_SECRET), max_time=300)

Connecting to RKVST
URL https://app.dev-paul-0.wild.jitsuin.io


In [5]:
def create_compliance_policy(arch):
    """Compliance policy which notices when process steps are
    not executed - eg 'you must close the door after you open it'
    or 'candidate software build must be approved before release'

    This example creates a policy that requires doors to be closed
    after they are opened.
    """
    compliance_policy = arch.compliance_policies.create(
        CompliancePolicyCurrentOutstanding(
            description="Vault doors should be closed according to site security policy section Phys.Integ.02",
            display_name="Phys.Integ.02",
            asset_filter=[
                ["attributes.arc_display_type=Vault Door"],
            ],
            event_display_type="Open",
            closing_event_display_type="Close",
        )
    )
    print("CURRENT_OUTSTANDING_POLICY:", json_dumps(compliance_policy, indent=4))
    return compliance_policy

In [6]:
def create_door(arch):
    """
    Creates an Asset record to track a particular door.
    """

    door, _ = arch.assets.create_if_not_exists(
        {
            "selector": [
                {
                    "attributes": [
                        "arc_display_name",
                        "arc_display_type",
                    ]
                },
            ],
            "behaviours": ASSET_BEHAVIOURS,
            "proof_mechanism": ProofMechanism.SIMPLE_HASH.name,
            "attributes": {
                "arc_display_name": "Gringott's Vault 2",
                "arc_description": "Main door to the second level security vault in Gringott's Wizarding Bank",
                "arc_display_type": "Vault Door",
            },
        },
        confirm=True,
    )
    print("DOOR:", json_dumps(door, indent=4))
    return door

In [7]:
def open_door(arch, door, tag):
    """
    Open the vault door
    """
    door_opened = arch.events.create(
        door["identity"],
        {
            "operation": "Record",
            "behaviour": "RecordEvidence",
        },
        {
            "arc_description": "Open the door for Lucius Malfoy",
            "arc_display_type": "Open",
            "arc_correlation_value": f"{tag}",
        },
        confirm=True,
    )
    print("DOOR_OPENED:", json_dumps(door_opened, indent=4))

In [8]:
def close_door(arch, door, tag):
    """
    Close the vault door
    """
    door_closed = arch.events.create(
        door["identity"],
        {
            "operation": "Record",
            "behaviour": "RecordEvidence",
        },
        {
            "arc_description": "Closed the door after Lucius Malfoy exited the vault",
            "arc_display_type": "Close",
            "arc_correlation_value": f"{tag}",
        },
        confirm=True,
    )
    print("DOOR_CLOSED:", json_dumps(door_closed, indent=4))

In [9]:
# Compliance policies with related events (eg open/close, order/ship/deliver
# type situations) require events to be linked through a correlation value.
# In many cases this will be obvious (a CVE tag for vulnerability management,
# or a works ticket number for maintenance, or even a timestamp) but here
# we'll just make a UUID to make sure it's unique and this test is repeatable
tag = uuid4()
print(f"Tag for this run: {tag}")

Tag for this run: 81c94ed1-e310-4743-8e39-9ee0a9170bdf


In [10]:
# make a compliance policy that alerts when doors are left open
compliance_policy = create_compliance_policy(arch)
print("compliance_policy", json_dumps(compliance_policy, indent=4))

Refresh token


CURRENT_OUTSTANDING_POLICY: {
    "identity": "compliance_policies/159b05e9-5d9d-4642-b0ed-db6c1274a979",
    "compliance_type": "COMPLIANCE_CURRENT_OUTSTANDING",
    "description": "Vault doors should be closed according to site security policy section Phys.Integ.02",
    "display_name": "Phys.Integ.02",
    "asset_filter": [
        {
            "or": [
                "attributes.arc_display_type=Vault Door"
            ]
        }
    ],
    "event_display_type": "Open",
    "closing_event_display_type": "Close",
    "time_period_seconds": "0",
    "dynamic_window": "0",
    "dynamic_variability": 0,
    "richness_assertions": []
}
compliance_policy {
    "identity": "compliance_policies/159b05e9-5d9d-4642-b0ed-db6c1274a979",
    "compliance_type": "COMPLIANCE_CURRENT_OUTSTANDING",
    "description": "Vault doors should be closed according to site security policy section Phys.Integ.02",
    "display_name": "Phys.Integ.02",
    "asset_filter": [
        {
            "or": [
      

In [11]:
# create an asset that matches the assets_filter field in the
# compliance policy.
gringotts_vault = create_door(arch)
print("gringotts_vault", json_dumps(gringotts_vault, indent=4))

asset with selector {},{'arc_display_name': "Gringott's Vault 2", 'arc_display_type': 'Vault Door'} already exists


DOOR: {
    "identity": "assets/6ef99260-65ae-48b7-9cda-e1c60f3ce79d",
    "behaviours": [
        "Builtin",
        "AssetCreator",
        "Attachments",
        "RecordEvidence"
    ],
    "attributes": {
        "arc_description": "Main door to the second level security vault in Gringott's Wizarding Bank",
        "arc_display_name": "Gringott's Vault 2",
        "arc_display_type": "Vault Door"
    },
    "confirmation_status": "CONFIRMED",
    "tracked": "TRACKED",
    "owner": "0x3E06AcBf002E8F67bDe832b3f3E7d2aDA2BE3DC8",
    "at_time": "2023-01-10T16:19:32Z",
    "storage_integrity": "TENANT_STORAGE",
    "proof_mechanism": "SIMPLE_HASH",
    "chain_id": "99",
    "public": false,
    "tenant_identity": "tenant/f9040e8b-2afa-48f1-bb00-f055d55ade01"
}
gringotts_vault {
    "identity": "assets/6ef99260-65ae-48b7-9cda-e1c60f3ce79d",
    "behaviours": [
        "Builtin",
        "AssetCreator",
        "Attachments",
        "RecordEvidence"
    ],
    "attributes": {
        "ar

In [12]:
# Open the door
open_door(arch, gringotts_vault, tag)

# Check compliance: should fail because the door is open
sleep(5)
compliance_nok = arch.compliance.compliant_at(
    gringotts_vault["identity"],
)
print("COMPLIANCE (should be false):", json_dumps(compliance_nok, indent=4))

DOOR_OPENED: {
    "identity": "assets/6ef99260-65ae-48b7-9cda-e1c60f3ce79d/events/2c5dd3b7-67a6-4f4a-bb68-59b9eb2a70cb",
    "asset_identity": "assets/6ef99260-65ae-48b7-9cda-e1c60f3ce79d",
    "event_attributes": {
        "arc_display_type": "Open",
        "arc_correlation_value": "81c94ed1-e310-4743-8e39-9ee0a9170bdf",
        "arc_description": "Open the door for Lucius Malfoy"
    },
    "asset_attributes": {},
    "operation": "Record",
    "behaviour": "RecordEvidence",
    "timestamp_declared": "2023-01-10T16:19:36Z",
    "timestamp_accepted": "2023-01-10T16:19:36Z",
    "timestamp_committed": "2023-01-10T16:19:36.508165839Z",
    "principal_declared": {
        "issuer": "https://app.dev-paul-0.wild.jitsuin.io/appidpv1",
        "subject": "bd3f85d2-9b92-4ef0-82c9-2bcb7b5d25c6",
        "display_name": "Application display name 05f19055-d2fd-4076-bd02-46a924cbe984",
        "email": ""
    },
    "principal_accepted": {
        "issuer": "https://app.dev-paul-0.wild.jitsuin.

In [13]:
# Now close the door
close_door(arch, gringotts_vault, tag)

# Check compliance - should be OK because the door is now closed
sleep(5)
compliance_ok = arch.compliance.compliant_at(
    gringotts_vault["identity"],
)
print("COMPLIANCE (should be true):", json_dumps(compliance_ok, indent=4))

DOOR_CLOSED: {
    "identity": "assets/6ef99260-65ae-48b7-9cda-e1c60f3ce79d/events/1d9cc38b-3c1d-4cb6-aa7f-42d463fe99d1",
    "asset_identity": "assets/6ef99260-65ae-48b7-9cda-e1c60f3ce79d",
    "event_attributes": {
        "arc_correlation_value": "81c94ed1-e310-4743-8e39-9ee0a9170bdf",
        "arc_description": "Closed the door after Lucius Malfoy exited the vault",
        "arc_display_type": "Close"
    },
    "asset_attributes": {},
    "operation": "Record",
    "behaviour": "RecordEvidence",
    "timestamp_declared": "2023-01-10T16:19:47Z",
    "timestamp_accepted": "2023-01-10T16:19:47Z",
    "timestamp_committed": "2023-01-10T16:19:47.541462429Z",
    "principal_declared": {
        "issuer": "https://app.dev-paul-0.wild.jitsuin.io/appidpv1",
        "subject": "bd3f85d2-9b92-4ef0-82c9-2bcb7b5d25c6",
        "display_name": "Application display name 05f19055-d2fd-4076-bd02-46a924cbe984",
        "email": ""
    },
    "principal_accepted": {
        "issuer": "https://app.de

In [14]:
# However the fact that it is OK *now* is a bit of a red herring. It
# was non-compliant in the past and this may be an issue that needs to
# be verified during an investigation, insurance claim, or other dispute.
# We can check the audit history for compliance *at a point in time* and
# get a verifiable answer to the state of that asset at that time.

# To make sure the example works with such short time frames we grab the
# time from the previous not OK compliance call, but you can choose any
# arbitrary time in a real forensic process
time_of_suspicion = compliance_nok["compliant_at"]
compliance_nok = arch.compliance.compliant_at(
    gringotts_vault["identity"], compliant_at=time_of_suspicion
)
print("HISTORICAL COMPLIANCE (should be false):", json_dumps(compliance_nok, indent=4))

HISTORICAL COMPLIANCE (should be false): {
    "compliant": false,
    "compliance": [
        {
            "compliance_policy_identity": "compliance_policies/159b05e9-5d9d-4642-b0ed-db6c1274a979",
            "compliant": false,
            "reason": "No closing event for Open"
        }
    ],
    "next_page_token": "",
    "compliant_at": "2023-01-10T16:19:42Z"
}


In [15]:
# finally clean up by deleting the compliance_policy
_ = arch.compliance_policies.delete(
    compliance_policy["identity"],
)