diff --git a/notebooks/Check Asset Compliance using CURRENT OUTSTANDING Policy.ipynb b/notebooks/Check Asset Compliance using CURRENT OUTSTANDING Policy.ipynb new file mode 100644 index 00000000..d1f4d7ce --- /dev/null +++ b/notebooks/Check Asset Compliance using CURRENT OUTSTANDING Policy.ipynb @@ -0,0 +1,335 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "c85b9e96", + "metadata": {}, + "outputs": [], + "source": [ + "# Define a compliance policy that alerts when an asset spends too long in a bad state.\n", + "\n", + "# Main function establishes a connection to RKVST using an App Registration then uses that\n", + "# to create an access policy, test it in good and bad states, then cleans up." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "001b25e3", + "metadata": {}, + "outputs": [], + "source": [ + "from json import dumps as json_dumps\n", + "from os import getenv\n", + "from time import sleep\n", + "from uuid import uuid4\n", + "from warnings import filterwarnings\n", + "\n", + "from archivist.archivist import Archivist\n", + "from archivist.compliance_policy_requests import CompliancePolicyCurrentOutstanding\n", + "from archivist.constants import ASSET_BEHAVIOURS\n", + "from archivist.proof_mechanism import ProofMechanism\n", + "from archivist.logger import set_logger\n", + "\n", + "filterwarnings(\"ignore\", message=\"Unverified HTTPS request\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a877ffed", + "metadata": {}, + "outputs": [], + "source": [ + "# Connection parameters. Fill in with client ID and secret from an appropriately\n", + "# permissioned Application Registration\n", + "RKVST_URL=\"https://app.rkvst.io\"\n", + "APPREG_CLIENT=\"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx\"\n", + "APPREG_SECRET=\"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "acdf240c", + "metadata": {}, + "outputs": [], + "source": [ + "def create_compliance_policy(arch):\n", + " \"\"\"Compliance policy which notices when process steps are\n", + " not executed - eg 'you must close the door after you open it'\n", + " or 'candidate software build must be approved before release'\n", + "\n", + " This example creates a policy that requires doors to be closed\n", + " after they are opened.\n", + " \"\"\"\n", + " compliance_policy = arch.compliance_policies.create(\n", + " CompliancePolicyCurrentOutstanding(\n", + " description=\"Vault doors should be closed according to site security policy section Phys.Integ.02\",\n", + " display_name=\"Phys.Integ.02\",\n", + " asset_filter=[\n", + " [\"attributes.arc_display_type=Vault Door\"],\n", + " ],\n", + " event_display_type=\"Open\",\n", + " closing_event_display_type=\"Close\",\n", + " )\n", + " )\n", + " print(\"CURRENT_OUTSTANDING_POLICY:\", json_dumps(compliance_policy, indent=4))\n", + " return compliance_policy" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8889c68d", + "metadata": {}, + "outputs": [], + "source": [ + "def create_door(arch):\n", + " \"\"\"\n", + " Creates an Asset record to track a particular door.\n", + " \"\"\"\n", + "\n", + " door, _ = arch.assets.create_if_not_exists(\n", + " {\n", + " \"selector\": [\n", + " {\n", + " \"attributes\": [\n", + " \"arc_display_name\",\n", + " \"arc_display_type\",\n", + " ]\n", + " },\n", + " ],\n", + " \"behaviours\": ASSET_BEHAVIOURS,\n", + " \"proof_mechanism\": ProofMechanism.SIMPLE_HASH.name,\n", + " \"attributes\": {\n", + " \"arc_display_name\": \"Gringott's Vault 2\",\n", + " \"arc_description\": \"Main door to the second level security vault in Gringott's Wizarding Bank\",\n", + " \"arc_display_type\": \"Vault Door\",\n", + " },\n", + " },\n", + " confirm=True,\n", + " )\n", + " print(\"DOOR:\", json_dumps(door, indent=4))\n", + " return door" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ba24d143", + "metadata": {}, + "outputs": [], + "source": [ + "def open_door(arch, door, tag):\n", + " \"\"\"\n", + " Open the vault door\n", + " \"\"\"\n", + " door_opened = arch.events.create(\n", + " door[\"identity\"],\n", + " {\n", + " \"operation\": \"Record\",\n", + " \"behaviour\": \"RecordEvidence\",\n", + " },\n", + " {\n", + " \"arc_description\": \"Open the door for Lucius Malfoy\",\n", + " \"arc_display_type\": \"Open\",\n", + " \"arc_correlation_value\": f\"{tag}\"\n", + " },\n", + " confirm=True,\n", + " )\n", + " print(\"DOOR_OPENED:\", json_dumps(door_opened, indent=4))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bde8fc72", + "metadata": {}, + "outputs": [], + "source": [ + "def close_door(arch, door, tag):\n", + " \"\"\"\n", + " Close the vault door\n", + " \"\"\"\n", + " door_closed = arch.events.create(\n", + " door[\"identity\"],\n", + " {\n", + " \"operation\": \"Record\",\n", + " \"behaviour\": \"RecordEvidence\",\n", + " },\n", + " {\n", + " \"arc_description\": \"Closed the door after Lucius Malfoy exited the vault\",\n", + " \"arc_display_type\": \"Close\",\n", + " \"arc_correlation_value\": f\"{tag}\"\n", + " },\n", + " confirm=True,\n", + " )\n", + " print(\"DOOR_CLOSED:\", json_dumps(door_closed, indent=4))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ec1909a6", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"Main function of compliance sample test.\n", + "\n", + " * Connect to RKVST with client ID and secret credential\n", + " * Create minimal test objects to demonstrate the feature\n", + " * Test interactions\n", + " * Clean up\n", + "\"\"\"\n", + "# optional call to set the logger level for all subsystems. The argument can\n", + "# be either \"INFO\" or \"DEBUG\". For more sophisticated logging control see the\n", + "# documentation.\n", + "set_logger(\"INFO\")\n", + "\n", + "# Initialize connection to RKVST\n", + "print(\"Connecting to RKVST\")\n", + "arch = Archivist(\n", + " RKVST_URL,\n", + " (APPREG_CLIENT, APPREG_SECRET)\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b7482420", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# Compliance policies with related events (eg open/close, order/ship/deliver\n", + "# type situations) require events to be linked through a correlation value.\n", + "# In many cases this will be obvious (a CVE tag for vulnerability management,\n", + "# or a works ticket number for maintenance, or even a timestamp) but here\n", + "# we'll just make a UUID to make sure it's unique and this test is repeatable\n", + "tag = uuid4()\n", + "print(f\"Tag for this run: {tag}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bb5b0651", + "metadata": {}, + "outputs": [], + "source": [ + "# make a compliance policy that alerts when doors are left open\n", + "compliance_policy = create_compliance_policy(arch)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f8ac6dda", + "metadata": {}, + "outputs": [], + "source": [ + "# create an asset that matches the assets_filter field in the\n", + "# compliance policy.\n", + "gringotts_vault = create_door(arch)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d93be01f", + "metadata": {}, + "outputs": [], + "source": [ + "# Open the door\n", + "open_door(arch, gringotts_vault, tag)\n", + "\n", + "# Check compliance: should fail because the door is open\n", + "sleep(5)\n", + "compliance_nok = arch.compliance.compliant_at(\n", + " gringotts_vault[\"identity\"],\n", + ")\n", + "print(\"COMPLIANCE (should be false):\", json_dumps(compliance_nok, indent=4))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6e304daa", + "metadata": {}, + "outputs": [], + "source": [ + "# Now close the door\n", + "close_door(arch, gringotts_vault, tag)\n", + "\n", + "# Check compliance - should be OK because the door is now closed\n", + "sleep(5)\n", + "compliance_ok = arch.compliance.compliant_at(\n", + " gringotts_vault[\"identity\"],\n", + ")\n", + "print(\"COMPLIANCE (should be true):\", json_dumps(compliance_ok, indent=4))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2edac120", + "metadata": {}, + "outputs": [], + "source": [ + "# However the fact that it is OK *now* is a bit of a red herring. It\n", + "# was non-compliant in the past and this may be an issue that needs to\n", + "# be verified during an investigation, insurance claim, or other dispute.\n", + "# We can check the audit history for compliance *at a point in time* and\n", + "# get a verifiable answer to the state of that asset at that time.\n", + "\n", + "# To make sure the example works with such short time frames we grab the\n", + "# time from the previous not OK compliance call, but you can choose any\n", + "# arbitrary time in a real forensic process\n", + "time_of_suspicion = compliance_nok[\"compliant_at\"]\n", + "compliance_nok = arch.compliance.compliant_at(\n", + " gringotts_vault[\"identity\"],\n", + " compliant_at=time_of_suspicion\n", + ")\n", + "print(\"HISTORICAL COMPLIANCE (should be false):\", json_dumps(compliance_nok, indent=4))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "40ffc716", + "metadata": {}, + "outputs": [], + "source": [ + "# finally clean up by deleting the compliance_policy\n", + "_ = arch.compliance_policies.delete(\n", + " compliance_policy[\"identity\"],\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}