Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "c85b9e96",
"metadata": {},
"outputs": [],
"source": [
"# Define a compliance policy that alerts when an asset spends too long in a bad state.\n",
"\n",
"# Main function establishes a connection to RKVST using an App Registration then uses that\n",
"# to create an access policy, test it in good and bad states, then cleans up."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "001b25e3",
"metadata": {},
"outputs": [],
"source": [
"from json import dumps as json_dumps\n",
"from os import getenv\n",
"from time import sleep\n",
"from uuid import uuid4\n",
"from warnings import filterwarnings\n",
"\n",
"from archivist.archivist import Archivist\n",
"from archivist.compliance_policy_requests import CompliancePolicyCurrentOutstanding\n",
"from archivist.constants import ASSET_BEHAVIOURS\n",
"from archivist.proof_mechanism import ProofMechanism\n",
"from archivist.logger import set_logger\n",
"\n",
"filterwarnings(\"ignore\", message=\"Unverified HTTPS request\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a877ffed",
"metadata": {},
"outputs": [],
"source": [
"# Connection parameters. Fill in with client ID and secret from an appropriately\n",
"# permissioned Application Registration\n",
"RKVST_URL=\"https://app.rkvst.io\"\n",
"APPREG_CLIENT=\"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx\"\n",
"APPREG_SECRET=\"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "acdf240c",
"metadata": {},
"outputs": [],
"source": [
"def create_compliance_policy(arch):\n",
" \"\"\"Compliance policy which notices when process steps are\n",
" not executed - eg 'you must close the door after you open it'\n",
" or 'candidate software build must be approved before release'\n",
"\n",
" This example creates a policy that requires doors to be closed\n",
" after they are opened.\n",
" \"\"\"\n",
" compliance_policy = arch.compliance_policies.create(\n",
" CompliancePolicyCurrentOutstanding(\n",
" description=\"Vault doors should be closed according to site security policy section Phys.Integ.02\",\n",
" display_name=\"Phys.Integ.02\",\n",
" asset_filter=[\n",
" [\"attributes.arc_display_type=Vault Door\"],\n",
" ],\n",
" event_display_type=\"Open\",\n",
" closing_event_display_type=\"Close\",\n",
" )\n",
" )\n",
" print(\"CURRENT_OUTSTANDING_POLICY:\", json_dumps(compliance_policy, indent=4))\n",
" return compliance_policy"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8889c68d",
"metadata": {},
"outputs": [],
"source": [
"def create_door(arch):\n",
" \"\"\"\n",
" Creates an Asset record to track a particular door.\n",
" \"\"\"\n",
"\n",
" door, _ = arch.assets.create_if_not_exists(\n",
" {\n",
" \"selector\": [\n",
" {\n",
" \"attributes\": [\n",
" \"arc_display_name\",\n",
" \"arc_display_type\",\n",
" ]\n",
" },\n",
" ],\n",
" \"behaviours\": ASSET_BEHAVIOURS,\n",
" \"proof_mechanism\": ProofMechanism.SIMPLE_HASH.name,\n",
" \"attributes\": {\n",
" \"arc_display_name\": \"Gringott's Vault 2\",\n",
" \"arc_description\": \"Main door to the second level security vault in Gringott's Wizarding Bank\",\n",
" \"arc_display_type\": \"Vault Door\",\n",
" },\n",
" },\n",
" confirm=True,\n",
" )\n",
" print(\"DOOR:\", json_dumps(door, indent=4))\n",
" return door"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ba24d143",
"metadata": {},
"outputs": [],
"source": [
"def open_door(arch, door, tag):\n",
" \"\"\"\n",
" Open the vault door\n",
" \"\"\"\n",
" door_opened = arch.events.create(\n",
" door[\"identity\"],\n",
" {\n",
" \"operation\": \"Record\",\n",
" \"behaviour\": \"RecordEvidence\",\n",
" },\n",
" {\n",
" \"arc_description\": \"Open the door for Lucius Malfoy\",\n",
" \"arc_display_type\": \"Open\",\n",
" \"arc_correlation_value\": f\"{tag}\"\n",
" },\n",
" confirm=True,\n",
" )\n",
" print(\"DOOR_OPENED:\", json_dumps(door_opened, indent=4))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bde8fc72",
"metadata": {},
"outputs": [],
"source": [
"def close_door(arch, door, tag):\n",
" \"\"\"\n",
" Close the vault door\n",
" \"\"\"\n",
" door_closed = arch.events.create(\n",
" door[\"identity\"],\n",
" {\n",
" \"operation\": \"Record\",\n",
" \"behaviour\": \"RecordEvidence\",\n",
" },\n",
" {\n",
" \"arc_description\": \"Closed the door after Lucius Malfoy exited the vault\",\n",
" \"arc_display_type\": \"Close\",\n",
" \"arc_correlation_value\": f\"{tag}\"\n",
" },\n",
" confirm=True,\n",
" )\n",
" print(\"DOOR_CLOSED:\", json_dumps(door_closed, indent=4))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ec1909a6",
"metadata": {},
"outputs": [],
"source": [
"\"\"\"Main function of compliance sample test.\n",
"\n",
" * Connect to RKVST with client ID and secret credential\n",
" * Create minimal test objects to demonstrate the feature\n",
" * Test interactions\n",
" * Clean up\n",
"\"\"\"\n",
"# optional call to set the logger level for all subsystems. The argument can\n",
"# be either \"INFO\" or \"DEBUG\". For more sophisticated logging control see the\n",
"# documentation.\n",
"set_logger(\"INFO\")\n",
"\n",
"# Initialize connection to RKVST\n",
"print(\"Connecting to RKVST\")\n",
"arch = Archivist(\n",
" RKVST_URL,\n",
" (APPREG_CLIENT, APPREG_SECRET)\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b7482420",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# Compliance policies with related events (eg open/close, order/ship/deliver\n",
"# type situations) require events to be linked through a correlation value.\n",
"# In many cases this will be obvious (a CVE tag for vulnerability management,\n",
"# or a works ticket number for maintenance, or even a timestamp) but here\n",
"# we'll just make a UUID to make sure it's unique and this test is repeatable\n",
"tag = uuid4()\n",
"print(f\"Tag for this run: {tag}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bb5b0651",
"metadata": {},
"outputs": [],
"source": [
"# make a compliance policy that alerts when doors are left open\n",
"compliance_policy = create_compliance_policy(arch)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f8ac6dda",
"metadata": {},
"outputs": [],
"source": [
"# create an asset that matches the assets_filter field in the\n",
"# compliance policy.\n",
"gringotts_vault = create_door(arch)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d93be01f",
"metadata": {},
"outputs": [],
"source": [
"# Open the door\n",
"open_door(arch, gringotts_vault, tag)\n",
"\n",
"# Check compliance: should fail because the door is open\n",
"sleep(5)\n",
"compliance_nok = arch.compliance.compliant_at(\n",
" gringotts_vault[\"identity\"],\n",
")\n",
"print(\"COMPLIANCE (should be false):\", json_dumps(compliance_nok, indent=4))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6e304daa",
"metadata": {},
"outputs": [],
"source": [
"# Now close the door\n",
"close_door(arch, gringotts_vault, tag)\n",
"\n",
"# Check compliance - should be OK because the door is now closed\n",
"sleep(5)\n",
"compliance_ok = arch.compliance.compliant_at(\n",
" gringotts_vault[\"identity\"],\n",
")\n",
"print(\"COMPLIANCE (should be true):\", json_dumps(compliance_ok, indent=4))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2edac120",
"metadata": {},
"outputs": [],
"source": [
"# However the fact that it is OK *now* is a bit of a red herring. It\n",
"# was non-compliant in the past and this may be an issue that needs to\n",
"# be verified during an investigation, insurance claim, or other dispute.\n",
"# We can check the audit history for compliance *at a point in time* and\n",
"# get a verifiable answer to the state of that asset at that time.\n",
"\n",
"# To make sure the example works with such short time frames we grab the\n",
"# time from the previous not OK compliance call, but you can choose any\n",
"# arbitrary time in a real forensic process\n",
"time_of_suspicion = compliance_nok[\"compliant_at\"]\n",
"compliance_nok = arch.compliance.compliant_at(\n",
" gringotts_vault[\"identity\"],\n",
" compliant_at=time_of_suspicion\n",
")\n",
"print(\"HISTORICAL COMPLIANCE (should be false):\", json_dumps(compliance_nok, indent=4))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40ffc716",
"metadata": {},
"outputs": [],
"source": [
"# finally clean up by deleting the compliance_policy\n",
"_ = arch.compliance_policies.delete(\n",
" compliance_policy[\"identity\"],\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}