diff --git a/README.rst b/README.rst index 293b8e7b..f55076b4 100644 --- a/README.rst +++ b/README.rst @@ -135,14 +135,14 @@ You can then use the examples code to create assets (see examples directory): # of an asset or event creation - the default is 1200 seconds but one can optionally # specify a different value here particularly when creating assets on SIMPLE_HASH # (rather than KHIPU) as confirmation times are much shorter in this case. - arch = Archivist( + with arch = Archivist( "https://app.rkvst.io", (client_id, client_secret), max_time=300, - ) - # Create a new asset - asset = create_asset(arch) - print("Asset", asset) + ) as arch: + # Create a new asset + asset = create_asset(arch) + print("Asset", asset) if __name__ == "__main__": diff --git a/archivist/archivistpublic.py b/archivist/archivistpublic.py index 2142cad8..cf6d530e 100644 --- a/archivist/archivistpublic.py +++ b/archivist/archivistpublic.py @@ -109,6 +109,15 @@ def __getattr__(self, value: str) -> object: super().__setattr__(value, c) return c + def __enter__(self): + """Just return self on entering - the session property will + create the session when needed + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + @property def session(self) -> requests.Session: """creates and returns session""" @@ -117,7 +126,7 @@ def session(self) -> requests.Session: return self._session def close(self): - """closes current session""" + """closes current session if open""" if self._session is not None: self._session.close() self._session = None diff --git a/docs/sbom.xml b/docs/sbom.xml index 15cfbac9..efea19ff 100644 --- a/docs/sbom.xml +++ b/docs/sbom.xml @@ -1,6 +1,6 @@ - + - 2022-10-25T07:52:58.134553+00:00 + 2022-10-27T13:48:21.177994+00:00 CycloneDX @@ -36,87 +36,87 @@ - + backoff 1.11.1 - + certifi 2022.9.24 - + charset-normalizer 2.1.1 - + flatten-dict 0.4.2 - + idna 3.4 - + iso8601 1.1.0 - + jinja2 3.1.2 - + markupsafe 2.1.1 - + pyaml-env 1.1.5 - + pyyaml 5.4.1 - + requests 2.28.1 - + requests-toolbelt 0.10.1 - + rfc3339 6.2 - + six 1.16.0 - + urllib3 1.26.12 - + xmltodict 0.13.0 - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + diff --git a/examples/access_policies_filter.py b/examples/access_policies_filter.py index d2c829e8..60abb976 100644 --- a/examples/access_policies_filter.py +++ b/examples/access_policies_filter.py @@ -34,25 +34,27 @@ def main(): ) # Initialize connection to Archivist - arch = Archivist( + with Archivist( "https://app.rkvst.io", auth, - ) - - # count access_policies... - print( - "no.of access_policies", - arch.access_policies.count(display_name="Some display name"), - ) - - # iterate through the generator.... - for access_policy in arch.access_policies.list(display_name="Some display name"): - print("access_policy", access_policy) - - # alternatively one could pull the list for all access policies and cache locally... - access_policies = list(arch.access_policies.list()) - for access_policy in access_policies: - print("access_policy", access_policy) + ) as arch: + + # count access_policies... + print( + "no.of access_policies", + arch.access_policies.count(display_name="Some display name"), + ) + + # iterate through the generator.... + for access_policy in arch.access_policies.list( + display_name="Some display name" + ): + print("access_policy", access_policy) + + # alternatively one could pull the list for all access policies and cache locally... + access_policies = list(arch.access_policies.list()) + for access_policy in access_policies: + print("access_policy", access_policy) if __name__ == "__main__": diff --git a/examples/access_policy_create.py b/examples/access_policy_create.py index 73f466cd..36133743 100644 --- a/examples/access_policy_create.py +++ b/examples/access_policy_create.py @@ -34,59 +34,59 @@ def main(): with open(client_secret_file, mode="r", encoding="utf-8") as tokenfile: client_secret = tokenfile.read().strip() - arch = Archivist( + with Archivist( "https://app.rkvst.io", (client_id, client_secret), - ) + ) as arch: - props = { - "display_name": "Friendly name of the policy", - "description": "Description of the policy", - } - filters = [ - { - "or": [ - "attributes.arc_home_location_identity=" - "locations/5ea815f0-4de1-4a84-9377-701e880fe8ae", - "attributes.arc_home_location_identity=" - "locations/27eed70b-9e2b-4db1-b8c4-e36505350dcc", - ] - }, - { - "or": [ - "attributes.arc_display_type=Valve", - "attributes.arc_display_type=Pump", - ] - }, - { - "or": [ - "attributes.ext_vendor_name=SynsationIndustries", - ] - }, - ] - access_permissions = [ - { - "asset_attributes_read": ["toner_colour", "toner_type"], - "asset_attributes_write": ["toner_colour"], - "behaviours": ASSET_BEHAVIOURS, - "event_arc_display_type_read": ["toner_type", "toner_colour"], - "event_arc_display_type_write": ["toner_replacement"], - "include_attributes": [ - "arc_display_name", - "arc_display_type", - "arc_firmware_version", - ], - "subjects": [ - "subjects/6a951b62-0a26-4c22-a886-1082297b063b", - "subjects/a24306e5-dc06-41ba-a7d6-2b6b3e1df48d", - ], - "user_attributes": [ - {"or": ["group:maintainers", "group:supervisors"]}, - ], + props = { + "display_name": "Friendly name of the policy", + "description": "Description of the policy", } - ] - access_policy = arch.access_policies.create(props, filters, access_permissions) - print("access Policy", access_policy) + filters = [ + { + "or": [ + "attributes.arc_home_location_identity=" + "locations/5ea815f0-4de1-4a84-9377-701e880fe8ae", + "attributes.arc_home_location_identity=" + "locations/27eed70b-9e2b-4db1-b8c4-e36505350dcc", + ] + }, + { + "or": [ + "attributes.arc_display_type=Valve", + "attributes.arc_display_type=Pump", + ] + }, + { + "or": [ + "attributes.ext_vendor_name=SynsationIndustries", + ] + }, + ] + access_permissions = [ + { + "asset_attributes_read": ["toner_colour", "toner_type"], + "asset_attributes_write": ["toner_colour"], + "behaviours": ASSET_BEHAVIOURS, + "event_arc_display_type_read": ["toner_type", "toner_colour"], + "event_arc_display_type_write": ["toner_replacement"], + "include_attributes": [ + "arc_display_name", + "arc_display_type", + "arc_firmware_version", + ], + "subjects": [ + "subjects/6a951b62-0a26-4c22-a886-1082297b063b", + "subjects/a24306e5-dc06-41ba-a7d6-2b6b3e1df48d", + ], + "user_attributes": [ + {"or": ["group:maintainers", "group:supervisors"]}, + ], + } + ] + access_policy = arch.access_policies.create(props, filters, access_permissions) + print("access Policy", access_policy) if __name__ == "__main__": diff --git a/examples/applications_registration.py b/examples/applications_registration.py index 4af2648a..8b1c2138 100644 --- a/examples/applications_registration.py +++ b/examples/applications_registration.py @@ -29,96 +29,96 @@ def main(): authtoken = tokenfile.read().strip() # Initialize connection to Archivist - arch = Archivist( + with Archivist( "https://app.rkvst.io", authtoken, - ) - - # create application - application = arch.applications.create( - f"Application display name {uuid4()}", - { - "serial_number": "TL1000000101", - "has_cyclist_light": "true", - }, - ) - print("create application", json_dumps(application, indent=4)) - - # convert to token - appidp = arch.appidp.token( - application["client_id"], - application["credentials"][0]["secret"], - ) - print("appidp", json_dumps(appidp, indent=4)) - - # regenerate secrets - application = arch.applications.regenerate( - application["identity"], - ) - print("regenerate application", json_dumps(application, indent=4)) - - # convert to token - appidp = arch.appidp.token( - application["client_id"], - application["credentials"][0]["secret"], - ) - print("appidp", json_dumps(appidp, indent=4)) - - # update application - application = arch.applications.update( - application["identity"], - custom_claims={ - "serial_number": "TL2000000202", - "has_cyclist_light": "false", - }, - ) - print("update application", json_dumps(application, indent=4)) - - # convert to token - appidp = arch.appidp.token( - application["client_id"], - application["credentials"][0]["secret"], - ) - print("appidp", json_dumps(appidp, indent=4)) - - # regenerate secrets - application = arch.applications.regenerate( - application["identity"], - ) - print("regenerate application", json_dumps(application, indent=4)) - - # convert to token - appidp = arch.appidp.token( - application["client_id"], - application["credentials"][0]["secret"], - ) - print("appidp", json_dumps(appidp, indent=4)) - - # now create Archivist with automatically refreshed jwt token - # this archivist does not allow app registrations. - arch1 = Archivist( - "https://app.rkvst.io", - (application["client_id"], application["credentials"][0]["secret"]), - ) - # create an asset - asset = arch1.assets.create( - props={ - "proof_mechanism": ProofMechanism.SIMPLE_HASH.name, - }, - attrs={ - "arc_display_name": "display_name", - "arc_description": "display_description", - "arc_display_type": "desplay_type", - "some_custom_attribute": "value", - }, - confirm=True, - ) - print("asset", json_dumps(asset, indent=4)) - - # delete application - application = arch.applications.delete( - application["identity"], - ) + ) as arch: + + # create application + application = arch.applications.create( + f"Application display name {uuid4()}", + { + "serial_number": "TL1000000101", + "has_cyclist_light": "true", + }, + ) + print("create application", json_dumps(application, indent=4)) + + # convert to token + appidp = arch.appidp.token( + application["client_id"], + application["credentials"][0]["secret"], + ) + print("appidp", json_dumps(appidp, indent=4)) + + # regenerate secrets + application = arch.applications.regenerate( + application["identity"], + ) + print("regenerate application", json_dumps(application, indent=4)) + + # convert to token + appidp = arch.appidp.token( + application["client_id"], + application["credentials"][0]["secret"], + ) + print("appidp", json_dumps(appidp, indent=4)) + + # update application + application = arch.applications.update( + application["identity"], + custom_claims={ + "serial_number": "TL2000000202", + "has_cyclist_light": "false", + }, + ) + print("update application", json_dumps(application, indent=4)) + + # convert to token + appidp = arch.appidp.token( + application["client_id"], + application["credentials"][0]["secret"], + ) + print("appidp", json_dumps(appidp, indent=4)) + + # regenerate secrets + application = arch.applications.regenerate( + application["identity"], + ) + print("regenerate application", json_dumps(application, indent=4)) + + # convert to token + appidp = arch.appidp.token( + application["client_id"], + application["credentials"][0]["secret"], + ) + print("appidp", json_dumps(appidp, indent=4)) + + # now create Archivist with automatically refreshed jwt token + # this archivist does not allow app registrations. + with Archivist( + "https://app.rkvst.io", + (application["client_id"], application["credentials"][0]["secret"]), + ) as arch1: + # create an asset + asset = arch1.assets.create( + props={ + "proof_mechanism": ProofMechanism.SIMPLE_HASH.name, + }, + attrs={ + "arc_display_name": "display_name", + "arc_description": "display_description", + "arc_display_type": "desplay_type", + "some_custom_attribute": "value", + }, + confirm=True, + ) + print("asset", json_dumps(asset, indent=4)) + + # delete application + application = arch.applications.delete( + application["identity"], + ) if __name__ == "__main__": diff --git a/examples/compliance_policies_since.py b/examples/compliance_policies_since.py index 0e53fd4f..e6ce2ad9 100644 --- a/examples/compliance_policies_since.py +++ b/examples/compliance_policies_since.py @@ -151,6 +151,7 @@ def main(): arch.compliance_policies.delete( compliance_policy["identity"], ) + arch.close() if __name__ == "__main__": diff --git a/examples/create_asset.py b/examples/create_asset.py index 61ade5c1..50fe220e 100644 --- a/examples/create_asset.py +++ b/examples/create_asset.py @@ -105,14 +105,14 @@ def main(): # of an asset or event creation - the default is 1200 seconds but one can optionally # specify a different value here particularly when creating assets on SIMPLE_HASH # (rather than KHIPU) as confirmation times are much shorter in this case. - arch = Archivist( + with Archivist( "https://app.rkvst.io", auth, max_time=300, - ) - # Create a new asset - asset = create_asset(arch) - print("Asset", asset) + ) as arch: + # Create a new asset + asset = create_asset(arch) + print("Asset", asset) if __name__ == "__main__": diff --git a/examples/create_event.py b/examples/create_event.py index 493916c5..dddf7640 100644 --- a/examples/create_event.py +++ b/examples/create_event.py @@ -152,25 +152,37 @@ def main(): client_secret = tokenfile.read().strip() # Initialize connection to Archivist - arch = Archivist( + with Archivist( "https://app.rkvst.io", (client_id, client_secret), - ) - - # Create a new asset - asset = create_asset(arch) - print("Asset", json_dumps(asset, sort_keys=True, indent=4)) - print("Verified domain '", get_verified_domain(arch, asset["tenant_identity"]), "'") - - # Create a new event - event = create_event(arch, asset) - print("Event", json_dumps(event, sort_keys=True, indent=4)) - print("Verified domain '", get_verified_domain(arch, event["tenant_identity"]), "'") - - # Fetch the event - event = arch.events.read(event["identity"]) - print("Event", json_dumps(event, sort_keys=True, indent=4)) - print("Verified domain '", get_verified_domain(arch, event["tenant_identity"]), "'") + ) as arch: + + # Create a new asset + asset = create_asset(arch) + print("Asset", json_dumps(asset, sort_keys=True, indent=4)) + print( + "Verified domain '", + get_verified_domain(arch, asset["tenant_identity"]), + "'", + ) + + # Create a new event + event = create_event(arch, asset) + print("Event", json_dumps(event, sort_keys=True, indent=4)) + print( + "Verified domain '", + get_verified_domain(arch, event["tenant_identity"]), + "'", + ) + + # Fetch the event + event = arch.events.read(event["identity"]) + print("Event", json_dumps(event, sort_keys=True, indent=4)) + print( + "Verified domain '", + get_verified_domain(arch, event["tenant_identity"]), + "'", + ) if __name__ == "__main__": diff --git a/examples/filter_assets.py b/examples/filter_assets.py index 66323f49..1e32f151 100644 --- a/examples/filter_assets.py +++ b/examples/filter_assets.py @@ -35,23 +35,23 @@ def main(): client_secret = tokenfile.read().strip() # Initialize connection to Archivist - arch = Archivist( + with Archivist( "https://app.rkvst.io", (client_id, client_secret), - ) + ) as arch: - # list all assets with required attributes and properties - props = {"confirmation_status": "CONFIRMED"} - attrs = {"arc_display_type": "Traffic light"} + # list all assets with required attributes and properties + props = {"confirmation_status": "CONFIRMED"} + attrs = {"arc_display_type": "Traffic light"} - # iterate through the generator.... - for asset in arch.assets.list(props=props, attrs=attrs): - print("asset", asset) + # iterate through the generator.... + for asset in arch.assets.list(props=props, attrs=attrs): + print("asset", asset) - # alternatively one could pull the list and cache locally... - assets = list(arch.assets.list(props=props, attrs=attrs)) - for asset in assets: - print("asset", asset) + # alternatively one could pull the list and cache locally... + assets = list(arch.assets.list(props=props, attrs=attrs)) + for asset in assets: + print("asset", asset) if __name__ == "__main__": diff --git a/examples/filter_events.py b/examples/filter_events.py index cb8227c6..f1a3004b 100644 --- a/examples/filter_events.py +++ b/examples/filter_events.py @@ -34,20 +34,20 @@ def main(): client_secret = tokenfile.read().strip() # Initialize connection to Archivist - arch = Archivist( + with Archivist( "https://app.rkvst.io", (client_id, client_secret), - ) - # Get all events with required attributes and properties - props = {"confirmation_status": "CONFIRMED"} - attrs = {"arc_display_type": "Traffic light"} - for event in arch.events.list(asset_id="assets/-", props=props, attrs=attrs): - print("event", event) - - # alternatively one could pull the list and cache locally... - events = arch.events.list(asset_id="assets/-", props=props, attrs=attrs) - for event in events: - print("event", event) + ) as arch: + # Get all events with required attributes and properties + props = {"confirmation_status": "CONFIRMED"} + attrs = {"arc_display_type": "Traffic light"} + for event in arch.events.list(asset_id="assets/-", props=props, attrs=attrs): + print("event", event) + + # alternatively one could pull the list and cache locally... + events = arch.events.list(asset_id="assets/-", props=props, attrs=attrs) + for event in events: + print("event", event) if __name__ == "__main__": diff --git a/examples/filter_publicevents.py b/examples/filter_publicevents.py index bc1439c4..e71738b5 100644 --- a/examples/filter_publicevents.py +++ b/examples/filter_publicevents.py @@ -14,21 +14,21 @@ def main(): """ # Initialize connection to ArchivistPublic - public = ArchivistPublic() - - # Get all public events with required attributes and properties - props = {"confirmation_status": "CONFIRMED"} - attrs = {"arc_display_type": "Traffic light"} - - for event in public.events.list( - asset_id=( - "https://app.rkvst.io/archivist/" - "publicassets/87b1a84c-1c6f-442b-923e-a97516f4d275" - ), - props=props, - attrs=attrs, - ): - print("event", event) + with ArchivistPublic() as public: + + # Get all public events with required attributes and properties + props = {"confirmation_status": "CONFIRMED"} + attrs = {"arc_display_type": "Traffic light"} + + for event in public.events.list( + asset_id=( + "https://app.rkvst.io/archivist/" + "publicassets/87b1a84c-1c6f-442b-923e-a97516f4d275" + ), + props=props, + attrs=attrs, + ): + print("event", event) if __name__ == "__main__": diff --git a/examples/get_asset.py b/examples/get_asset.py index b0e892a6..496c5ec7 100644 --- a/examples/get_asset.py +++ b/examples/get_asset.py @@ -33,25 +33,25 @@ def main(): client_secret = tokenfile.read().strip() # Initialize connection to Archivist - arch = Archivist( + with Archivist( "https://app.rkvst.io", (client_id, client_secret), - ) - try: - asset = arch.assets.read_by_signature( - props={"tracked": "TRACKED"}, - attrs={"arc_display_type": "door"}, - ) - except (ArchivistNotFoundError, ArchivistDuplicateError) as ex: - print("Unable to get asset", ex) - - else: + ) as arch: + try: + asset = arch.assets.read_by_signature( + props={"tracked": "TRACKED"}, + attrs={"arc_display_type": "door"}, + ) + except (ArchivistNotFoundError, ArchivistDuplicateError) as ex: + print("Unable to get asset", ex) + + else: + print("Asset", asset["identity"]) + + # alternatively get by identity + asset = arch.assets.read(asset["identity"]) print("Asset", asset["identity"]) - # alternatively get by identity - asset = arch.assets.read(asset["identity"]) - print("Asset", asset["identity"]) - if __name__ == "__main__": main() diff --git a/examples/get_publicasset.py b/examples/get_publicasset.py index 3862fa82..6948dbb1 100644 --- a/examples/get_publicasset.py +++ b/examples/get_publicasset.py @@ -11,11 +11,11 @@ def main(): """Main function of get_publicasset.""" # Initialize connection to Archivist - no auth required - public = ArchivistPublic() - asset = public.assets.read( - "https://app.rkvst.io/archivist/publicassets/dc0dfc17-1d93-4b7a-8636-f740f40f7f52" - ) - print("Asset", asset) + with ArchivistPublic() as public: + asset = public.assets.read( + "https://app.rkvst.io/archivist/publicassets/dc0dfc17-1d93-4b7a-8636-f740f40f7f52" + ) + print("Asset", asset) if __name__ == "__main__": diff --git a/examples/get_publicevent.py b/examples/get_publicevent.py index 31d97956..c6298060 100644 --- a/examples/get_publicevent.py +++ b/examples/get_publicevent.py @@ -12,15 +12,15 @@ def main(): """Main function of getting public events.""" # Initialize connection to ArchivistPublic - public = ArchivistPublic() - - # URL is the fully-attested URL returned by archivist - event = public.events.read( - "https://app.rkvst.io/archivist/" - "publicassets/87b1a84c-1c6f-442b-923e-a97516f4d275" - "events/abcdef4c-1c6f-442b-923e-a97516f4d275" - ) - print("event", event) + with ArchivistPublic() as public: + + # URL is the fully-attested URL returned by archivist + event = public.events.read( + "https://app.rkvst.io/archivist/" + "publicassets/87b1a84c-1c6f-442b-923e-a97516f4d275" + "events/abcdef4c-1c6f-442b-923e-a97516f4d275" + ) + print("event", event) if __name__ == "__main__": diff --git a/examples/sbom_release.py b/examples/sbom_release.py index 62b0b32a..13f5c214 100644 --- a/examples/sbom_release.py +++ b/examples/sbom_release.py @@ -98,21 +98,21 @@ def main(): client_secret_filename=getenv("CLIENT_SECRET_FILENAME"), ) - arch = Archivist(rkvst_url, auth, verify=False, max_time=300) + with Archivist(rkvst_url, auth, verify=False, max_time=300) as arch: - asset, event = sbom_release( - arch, getenv("BUILD_BUILDNUMBER"), getenv("SBOM_FILEPATH") - ) + asset, event = sbom_release( + arch, getenv("BUILD_BUILDNUMBER"), getenv("SBOM_FILEPATH") + ) - rkvst_path = "archivist/v2" + rkvst_path = "archivist/v2" - asset_url = f"{rkvst_url}/{rkvst_path}/{asset['identity']}" - event_url = f"{rkvst_url}/{rkvst_path}/{event['identity']}" + asset_url = f"{rkvst_url}/{rkvst_path}/{asset['identity']}" + event_url = f"{rkvst_url}/{rkvst_path}/{event['identity']}" - print(f"##vso[task.setvariable variable=RKVST_ASSET_URL]{asset_url}") - print(f"##vso[task.setvariable variable=RKVST_EVENT_URL]{event_url}") - print(f"##[debug]Asset url: {asset_url}") - print(f"##[debug]Event url: {event_url}") + print(f"##vso[task.setvariable variable=RKVST_ASSET_URL]{asset_url}") + print(f"##vso[task.setvariable variable=RKVST_EVENT_URL]{event_url}") + print(f"##[debug]Asset url: {asset_url}") + print(f"##[debug]Event url: {event_url}") if __name__ == "__main__": diff --git a/examples/scan_test.py b/examples/scan_test.py index d46d3464..9831991d 100644 --- a/examples/scan_test.py +++ b/examples/scan_test.py @@ -135,18 +135,18 @@ def main(): client_secret_filename=getenv("TEST_CLIENT_SECRET_FILENAME"), ) - arch = Archivist(getenv("TEST_ARCHIVIST"), auth, verify=False, max_time=300) - - print("##[group]Today") - today = date.today() - scan_test(arch, today.strftime("%Y-%m-%d")) - - # currently scans run mon-fri - # so if today is mon, previous day is fri, otherwise previous day is yesterday - print("##[group]Previous day") - days_delta = 3 if today.strftime("%a") == "Mon" else 1 - previous_day = datetime.now() - timedelta(days=days_delta) - scan_test(arch, previous_day.strftime("%Y-%m-%d"), scanned_expected=True) + with Archivist(getenv("TEST_ARCHIVIST"), auth, verify=False, max_time=300) as arch: + + print("##[group]Today") + today = date.today() + scan_test(arch, today.strftime("%Y-%m-%d")) + + # currently scans run mon-fri + # so if today is mon, previous day is fri, otherwise previous day is yesterday + print("##[group]Previous day") + days_delta = 3 if today.strftime("%a") == "Mon" else 1 + previous_day = datetime.now() - timedelta(days=days_delta) + scan_test(arch, previous_day.strftime("%Y-%m-%d"), scanned_expected=True) if __name__ == "__main__": diff --git a/examples/sharing_asset.py b/examples/sharing_asset.py index 146ecc7a..8890ee0b 100644 --- a/examples/sharing_asset.py +++ b/examples/sharing_asset.py @@ -189,6 +189,9 @@ def main(): weyland_asset = weyland.assets.read(acme_asset["identity"]) print("asset read from weyland", json_dumps(weyland_asset, indent=4)) + acme.close() + weyland.close() + if __name__ == "__main__": main() diff --git a/examples/subject_create.py b/examples/subject_create.py index f11c57ae..e958a42b 100644 --- a/examples/subject_create.py +++ b/examples/subject_create.py @@ -34,33 +34,33 @@ def main(): client_secret = tokenfile.read().strip() # Initialize connection to Archivist - arch = Archivist( + with Archivist( "https://app.rkvst.io", (client_id, client_secret), - ) + ) as arch: - # subject_string is the base64 encoding of the self subject of the other organization - subject_string = ( - "eyJpZGVudGl0eSI6ICJzdWJqZWN0cy8wMDAwMDAwMC0wMDAwLTAwMDAtMDA" - "wMC0wMDAwMDAwMDAwMDAiLCAiZGlzcGxheV9uYW1lIjogIlNlbGYiLCAid2" - "FsbGV0X3B1Yl9rZXkiOiBbIjA0YzExNzNiZjc4NDRiZjFjNjA3Yjc5YzE4Z" - "GIwOTFiOTU1OGZmZTU4MWJmMTMyYjhjZjNiMzc2NTcyMzBmYTMyMWEwODgw" - "YjU0YTc5YTg4YjI4YmM3MTBlZGU2ZGNmM2Q4MjcyYzUyMTBiZmQ0MWVhODM" - "xODhlMzg1ZDEyYzE4OWMiXSwgIndhbGxldF9hZGRyZXNzIjogWyIweDk5Rm" - "E0QUFCMEFGMkI1M2YxNTgwODNEOGYyNDRiYjQ1MjMzODgxOTciXSwgInRlc" - "3NlcmFfcHViX2tleSI6IFsiZWZkZzlKMFFoU0IyZzRJeEtjYVhnSm1OS2J6" - "cHhzMDNGRllJaVlZdWVraz0iXSwgInRlbmFudCI6ICIiLCAiY29uZmlybWF" - "0aW9uX3N0YXR1cyI6ICJDT05GSVJNQVRJT05fU1RBVFVTX1VOU1BFQ0lGSU" - "VEIn0=" - ) + # subject_string is the base64 encoding of the self subject of the other organization + subject_string = ( + "eyJpZGVudGl0eSI6ICJzdWJqZWN0cy8wMDAwMDAwMC0wMDAwLTAwMDAtMDA" + "wMC0wMDAwMDAwMDAwMDAiLCAiZGlzcGxheV9uYW1lIjogIlNlbGYiLCAid2" + "FsbGV0X3B1Yl9rZXkiOiBbIjA0YzExNzNiZjc4NDRiZjFjNjA3Yjc5YzE4Z" + "GIwOTFiOTU1OGZmZTU4MWJmMTMyYjhjZjNiMzc2NTcyMzBmYTMyMWEwODgw" + "YjU0YTc5YTg4YjI4YmM3MTBlZGU2ZGNmM2Q4MjcyYzUyMTBiZmQ0MWVhODM" + "xODhlMzg1ZDEyYzE4OWMiXSwgIndhbGxldF9hZGRyZXNzIjogWyIweDk5Rm" + "E0QUFCMEFGMkI1M2YxNTgwODNEOGYyNDRiYjQ1MjMzODgxOTciXSwgInRlc" + "3NlcmFfcHViX2tleSI6IFsiZWZkZzlKMFFoU0IyZzRJeEtjYVhnSm1OS2J6" + "cHhzMDNGRllJaVlZdWVraz0iXSwgInRlbmFudCI6ICIiLCAiY29uZmlybWF" + "0aW9uX3N0YXR1cyI6ICJDT05GSVJNQVRJT05fU1RBVFVTX1VOU1BFQ0lGSU" + "VEIn0=" + ) - subject = arch.subjects.create_from_b64( - { - "display_name": "Some display name", - "subject_string": subject_string, - } - ) - print("Subject", subject) + subject = arch.subjects.create_from_b64( + { + "display_name": "Some display name", + "subject_string": subject_string, + } + ) + print("Subject", subject) if __name__ == "__main__": diff --git a/examples/subjects_filter.py b/examples/subjects_filter.py index bfcdeb07..4b1dd852 100644 --- a/examples/subjects_filter.py +++ b/examples/subjects_filter.py @@ -28,22 +28,22 @@ def main(): client_secret = tokenfile.read().strip() # Initialize connection to Archivist - arch = Archivist( + with Archivist( "https://app.rkvst.io", (client_id, client_secret), - ) + ) as arch: - # count subjects... - print("no.of subjects", arch.subjects.count(display_name="Some display name")) + # count subjects... + print("no.of subjects", arch.subjects.count(display_name="Some display name")) - # iterate through the generator.... - for subject in arch.subjects.list(display_name="Some display name"): - print("subject", subject) + # iterate through the generator.... + for subject in arch.subjects.list(display_name="Some display name"): + print("subject", subject) - # alternatively one could pull the list for all subjects and cache locally... - subjects = list(arch.subjects.list()) - for subject in subjects: - print("subject", subject) + # alternatively one could pull the list for all subjects and cache locally... + subjects = list(arch.subjects.list()) + for subject in subjects: + print("subject", subject) if __name__ == "__main__": diff --git a/functests/execaccess_policies.py b/functests/execaccess_policies.py index d2b91310..a2c12cd4 100644 --- a/functests/execaccess_policies.py +++ b/functests/execaccess_policies.py @@ -122,7 +122,7 @@ def setUp(self): def tearDown(self): self.ac_access_permissions = None self.ac_props = None - self.arch = None + self.arch.close() class TestAccessPoliciesSimple(TestAccessPoliciesBase): @@ -305,6 +305,10 @@ def setUp(self): print("Org1: subject_1", json_dumps(self.subject_1, indent=4)) print("Org2: subject_2", json_dumps(self.subject_2, indent=4)) + def tearDown(self): + super().tearDown() + self.arch_2.close() + def _create_asset(self, label, arch, uuid): asset_data = deepcopy(REQUEST_EXISTS_ATTACHMENTS) asset_data["attributes"]["arc_namespace"] = uuid diff --git a/functests/execapplications.py b/functests/execapplications.py index 87343c15..aa7f5ff1 100644 --- a/functests/execapplications.py +++ b/functests/execapplications.py @@ -53,6 +53,9 @@ def setUp(self): self.arch = Archivist(getenv("TEST_ARCHIVIST"), auth, verify=False) self.display_name = f"{DISPLAY_NAME} {uuid4()}" + def tearDown(self): + self.arch.close() + def test_applications_create(self): """ Test application creation @@ -220,44 +223,44 @@ def test_archivist_token(self): # archivist using app registration print("New Arch") - new_arch = Archivist( + with Archivist( getenv("TEST_ARCHIVIST"), (application["client_id"], application["credentials"][0]["secret"]), verify=False, - ) + ) as new_arch: - # now we create an asset and add events 10 times with a 60s sleep - # this should trigger a token refresh - traffic_light = deepcopy(ATTRS) - traffic_light["arc_display_type"] = "Traffic light with violation camera" - asset = new_arch.assets.create( - props={ - "proof_mechanism": ProofMechanism.SIMPLE_HASH.name, - }, - attrs=traffic_light, - confirm=True, - ) - print("create asset", json_dumps(asset, indent=4)) - self.assertEqual( - asset["proof_mechanism"], - ProofMechanism.SIMPLE_HASH.name, - msg="Incorrect asset proof mechanism", - ) - identity = asset["identity"] - props = { - "operation": "Record", - "behaviour": "RecordEvidence", - } - - # should cause at least 2 refreshes of token - for i in range(25): - sleep(60) - event = new_arch.events.create( - identity, - props=props, - attrs={ - "arc_description": f"Safety conformance approved for version {i}", + # now we create an asset and add events 10 times with a 60s sleep + # this should trigger a token refresh + traffic_light = deepcopy(ATTRS) + traffic_light["arc_display_type"] = "Traffic light with violation camera" + asset = new_arch.assets.create( + props={ + "proof_mechanism": ProofMechanism.SIMPLE_HASH.name, }, + attrs=traffic_light, confirm=True, ) - print(i, "create event", json_dumps(event, indent=4)) + print("create asset", json_dumps(asset, indent=4)) + self.assertEqual( + asset["proof_mechanism"], + ProofMechanism.SIMPLE_HASH.name, + msg="Incorrect asset proof mechanism", + ) + identity = asset["identity"] + props = { + "operation": "Record", + "behaviour": "RecordEvidence", + } + + # should cause at least 2 refreshes of token + for i in range(25): + sleep(60) + event = new_arch.events.create( + identity, + props=props, + attrs={ + "arc_description": f"Safety conformance approved for version {i}", + }, + confirm=True, + ) + print(i, "create event", json_dumps(event, indent=4)) diff --git a/functests/execattachments.py b/functests/execattachments.py index ed6d7192..d895bfba 100644 --- a/functests/execattachments.py +++ b/functests/execattachments.py @@ -51,6 +51,7 @@ def setUp(self): def tearDown(self) -> None: """Remove the downloaded image for subsequent test runs""" + self.arch.close() with suppress(FileNotFoundError): remove(self.TEST_IMAGE_DOWNLOAD_PATH) @@ -192,6 +193,9 @@ def setUp(self): ) self.arch = Archivist(getenv("TEST_ARCHIVIST"), auth, verify=False) + def tearDown(self): + self.arch.close() + def test_attachment_malware_scan1(self): """ Test file upload through the SDK diff --git a/functests/execcompliance_policies.py b/functests/execcompliance_policies.py index c0531ae9..09c7197e 100644 --- a/functests/execcompliance_policies.py +++ b/functests/execcompliance_policies.py @@ -102,6 +102,9 @@ def setUp(self): self.arch = Archivist(getenv("TEST_ARCHIVIST"), auth, verify=False) print() + def tearDown(self): + self.arch.close() + class TestCompliancePolicies(TestCompliancePoliciesBase): def test_compliancepolicies_create_since(self): diff --git a/functests/execpublicassets.py b/functests/execpublicassets.py index ef8da26e..de3e5294 100644 --- a/functests/execpublicassets.py +++ b/functests/execpublicassets.py @@ -87,7 +87,7 @@ def setUp(self): self.traffic_light["arc_display_type"] = "Traffic light with violation camera" def tearDown(self): - self.arch = None + self.arch.close() self.attrs = None self.traffic_light = None diff --git a/functests/execrunner.py b/functests/execrunner.py index a9480d90..4032977c 100644 --- a/functests/execrunner.py +++ b/functests/execrunner.py @@ -45,7 +45,7 @@ def setUp(self): ) def tearDown(self): - self.arch = None + self.arch.close() def test_runner_dynamic_tolerance(self): """ diff --git a/functests/execsboms.py b/functests/execsboms.py index f8fec41d..ae708ee7 100644 --- a/functests/execsboms.py +++ b/functests/execsboms.py @@ -57,7 +57,7 @@ def setUp(self): def tearDown(self) -> None: """Remove the downloaded sbom for subsequent test runs""" - self.arch = None + self.arch.close() with suppress(FileNotFoundError): remove(TEST_SBOM_DOWNLOAD_PATH) @@ -278,21 +278,3 @@ def test_sbom_upload_and_download(self): ) for i, m in enumerate(metadatas): print(i, ":", json_dumps(m.dict(), indent=4)) - - -class TestSBOMWithApplication(TestSBOM): - """ - Test Archivist SBOM upload/download - """ - - maxDiff = None - - def setUp(self): - super().setUp() - self.title = "TestSBOMWithApplication" - application = self.arch.applications.create( - DISPLAY_NAME, - CUSTOM_CLAIMS, - ) - auth = (application["client_id"], application["credentials"][0]["secret"]) - self.arch = Archivist(getenv("TEST_ARCHIVIST"), auth, verify=False) diff --git a/functests/execsubjects.py b/functests/execsubjects.py index 875b94c0..d130f450 100644 --- a/functests/execsubjects.py +++ b/functests/execsubjects.py @@ -65,6 +65,9 @@ def setUp(self): self.arch = Archivist(getenv("TEST_ARCHIVIST"), auth, verify=False) self.display_name = f"{DISPLAY_NAME} {uuid4()}" + def tearDown(self): + self.arch.close() + def test_subjects_create(self): """ Test subject creation diff --git a/unittests/testappidp.py b/unittests/testappidp.py index 35d9009e..943ad9a2 100644 --- a/unittests/testappidp.py +++ b/unittests/testappidp.py @@ -46,6 +46,9 @@ class TestAppIDP(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_appidp_str(self): """ Test appidp str diff --git a/unittests/testapplications.py b/unittests/testapplications.py index 150c366d..d713c858 100644 --- a/unittests/testapplications.py +++ b/unittests/testapplications.py @@ -59,6 +59,9 @@ class TestApplications(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_applications_str(self): """ Test applications str diff --git a/unittests/testarchivist.py b/unittests/testarchivist.py index 2d21dd40..194aac41 100644 --- a/unittests/testarchivist.py +++ b/unittests/testarchivist.py @@ -64,169 +64,169 @@ def test_archivist(self): """ Test default archivist creation """ - arch = Archivist("https://app.rkvst.io", "authauthauth") - self.assertEqual( - str(arch), - "Archivist(https://app.rkvst.io)", - msg="Incorrect str", - ) - self.assertEqual( - str(arch.access_policies), - "AccessPoliciesClient(https://app.rkvst.io)", - msg="Incorrect access_policies", - ) - self.assertEqual( - str(arch.appidp), - "AppIDPClient(https://app.rkvst.io)", - msg="Incorrect appidp", - ) - self.assertEqual( - str(arch.applications), - "ApplicationsClient(https://app.rkvst.io)", - msg="Incorrect applications", - ) - self.assertEqual( - str(arch.assets), - "AssetsRestricted(https://app.rkvst.io)", - msg="Incorrect assets", - ) - self.assertEqual( - str(arch.assetattachments), - "AssetAttachmentsClient(https://app.rkvst.io)", - msg="Incorrect assets", - ) - self.assertEqual( - str(arch.attachments), - "AttachmentsClient(https://app.rkvst.io)", - msg="Incorrect attachments", - ) - self.assertEqual( - str(arch.compliance), - "ComplianceClient(https://app.rkvst.io)", - msg="Incorrect compliance", - ) - self.assertEqual( - str(arch.compliance_policies), - "CompliancePoliciesClient(https://app.rkvst.io)", - msg="Incorrect compliance_policies", - ) - self.assertEqual( - str(arch.events), - "EventsRestricted(https://app.rkvst.io)", - msg="Incorrect events", - ) - self.assertEqual( - str(arch.locations), - "LocationsClient(https://app.rkvst.io)", - msg="Incorrect locations", - ) - self.assertEqual( - str(arch.runner), - "Runner(https://app.rkvst.io)", - msg="Incorrect runner", - ) - self.assertEqual( - str(arch.sboms), - "SBOMSClient(https://app.rkvst.io)", - msg="Incorrect sboms", - ) - self.assertEqual( - str(arch.subjects), - "SubjectsClient(https://app.rkvst.io)", - msg="Incorrect subjects", - ) - self.assertEqual( - str(arch.Public), - "ArchivistPublic()", - msg="Incorrect Public", - ) - self.assertEqual( - arch.url, - "https://app.rkvst.io", - msg="Incorrect url", - ) - self.assertEqual( - arch.auth, - "authauthauth", - msg="Incorrect auth", - ) - self.assertEqual( - arch.root, - "https://app.rkvst.io/archivist", - msg="Incorrect root", - ) - self.assertEqual( - arch.verify, - True, - msg="verify must be True", - ) - with self.assertRaises(AttributeError): - e = arch.Illegal_endpoint + with Archivist("https://app.rkvst.io", "authauthauth") as arch: + self.assertEqual( + str(arch), + "Archivist(https://app.rkvst.io)", + msg="Incorrect str", + ) + self.assertEqual( + str(arch.access_policies), + "AccessPoliciesClient(https://app.rkvst.io)", + msg="Incorrect access_policies", + ) + self.assertEqual( + str(arch.appidp), + "AppIDPClient(https://app.rkvst.io)", + msg="Incorrect appidp", + ) + self.assertEqual( + str(arch.applications), + "ApplicationsClient(https://app.rkvst.io)", + msg="Incorrect applications", + ) + self.assertEqual( + str(arch.assets), + "AssetsRestricted(https://app.rkvst.io)", + msg="Incorrect assets", + ) + self.assertEqual( + str(arch.assetattachments), + "AssetAttachmentsClient(https://app.rkvst.io)", + msg="Incorrect assets", + ) + self.assertEqual( + str(arch.attachments), + "AttachmentsClient(https://app.rkvst.io)", + msg="Incorrect attachments", + ) + self.assertEqual( + str(arch.compliance), + "ComplianceClient(https://app.rkvst.io)", + msg="Incorrect compliance", + ) + self.assertEqual( + str(arch.compliance_policies), + "CompliancePoliciesClient(https://app.rkvst.io)", + msg="Incorrect compliance_policies", + ) + self.assertEqual( + str(arch.events), + "EventsRestricted(https://app.rkvst.io)", + msg="Incorrect events", + ) + self.assertEqual( + str(arch.locations), + "LocationsClient(https://app.rkvst.io)", + msg="Incorrect locations", + ) + self.assertEqual( + str(arch.runner), + "Runner(https://app.rkvst.io)", + msg="Incorrect runner", + ) + self.assertEqual( + str(arch.sboms), + "SBOMSClient(https://app.rkvst.io)", + msg="Incorrect sboms", + ) + self.assertEqual( + str(arch.subjects), + "SubjectsClient(https://app.rkvst.io)", + msg="Incorrect subjects", + ) + self.assertEqual( + str(arch.Public), + "ArchivistPublic()", + msg="Incorrect Public", + ) + self.assertEqual( + arch.url, + "https://app.rkvst.io", + msg="Incorrect url", + ) + self.assertEqual( + arch.auth, + "authauthauth", + msg="Incorrect auth", + ) + self.assertEqual( + arch.root, + "https://app.rkvst.io/archivist", + msg="Incorrect root", + ) + self.assertEqual( + arch.verify, + True, + msg="verify must be True", + ) + with self.assertRaises(AttributeError): + e = arch.Illegal_endpoint def test_archivist_token(self): """ Test archivist creation with app registration """ - arch = Archivist("https://app.rkvst.io", (CLIENT_ID, CLIENT_SECRET)) - with mock.patch.object(arch.appidp, "token") as mock_token: - mock_token.return_value = RESPONSE - self.assertEqual( - arch.auth, - ACCESS_TOKEN, - msg="Incorrect auth", - ) + with Archivist("https://app.rkvst.io", (CLIENT_ID, CLIENT_SECRET)) as arch: + with mock.patch.object(arch.appidp, "token") as mock_token: + mock_token.return_value = RESPONSE + self.assertEqual( + arch.auth, + ACCESS_TOKEN, + msg="Incorrect auth", + ) def test_archivist_none_token(self): """ Test archivist creation with no token """ - arch = Archivist("https://app.rkvst.io", None) - self.assertIsNone( - arch.auth, - msg="Incorrect auth", - ) + with Archivist("https://app.rkvst.io", None) as arch: + self.assertIsNone( + arch.auth, + msg="Incorrect auth", + ) def test_archivist_appidp_token(self): """ Test archivist creation with appidp token """ - arch = Archivist("https://app.rkvst.io", (CLIENT_ID, CLIENT_SECRET)) - with mock.patch.object(arch.appidp, "token") as mock_token: - mock_token.return_value = NONE_RESPONSE - with self.assertRaises(ArchivistError): - _ = arch.auth + with Archivist("https://app.rkvst.io", (CLIENT_ID, CLIENT_SECRET)) as arch: + with mock.patch.object(arch.appidp, "token") as mock_token: + mock_token.return_value = NONE_RESPONSE + with self.assertRaises(ArchivistError): + _ = arch.auth def test_archivist_copy(self): """ Test archivist copy """ - arch = Archivist("https://app.rkvst.io", "authauthauth", verify=False) - arch1 = copy(arch) - self.assertEqual( - arch.url, - arch1.url, - msg="Incorrect url", - ) - self.assertEqual( - arch.verify, - arch1.verify, - msg="Incorrect verify", - ) - self.assertEqual( - arch.fixtures, - arch1.fixtures, - msg="Incorrect fixtures", - ) + with Archivist("https://app.rkvst.io", "authauthauth", verify=False) as arch: + arch1 = copy(arch) + self.assertEqual( + arch.url, + arch1.url, + msg="Incorrect url", + ) + self.assertEqual( + arch.verify, + arch1.verify, + msg="Incorrect verify", + ) + self.assertEqual( + arch.fixtures, + arch1.fixtures, + msg="Incorrect fixtures", + ) def test_archivist_no_verify(self): """ Test archivist creation with no verify """ - arch = Archivist("https://app.rkvst.io", "authauthauth", verify=False) - self.assertFalse( - arch.verify, - msg="verify must be False", - ) + with Archivist("https://app.rkvst.io", "authauthauth", verify=False) as arch: + self.assertFalse( + arch.verify, + msg="verify must be False", + ) class TestArchivistMethods(TestCase): @@ -237,6 +237,9 @@ class TestArchivistMethods(TestCase): def setUp(self): self.arch = Archivist("https://app.rkvst.io", "authauthauth") + def tearDown(self): + self.arch.close() + class TestArchivistPatch(TestArchivistMethods): """ diff --git a/unittests/testarchivistdelete.py b/unittests/testarchivistdelete.py index 13691027..f3746c77 100644 --- a/unittests/testarchivistdelete.py +++ b/unittests/testarchivistdelete.py @@ -27,6 +27,9 @@ class TestArchivistMethods(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + class TestArchivistDelete(TestArchivistMethods): """ diff --git a/unittests/testarchivistget.py b/unittests/testarchivistget.py index c882813b..0250db73 100644 --- a/unittests/testarchivistget.py +++ b/unittests/testarchivistget.py @@ -28,6 +28,9 @@ class TestArchivistMethods(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + class TestArchivistGet(TestArchivistMethods): """ diff --git a/unittests/testarchivistpost.py b/unittests/testarchivistpost.py index 5272d598..c02ba808 100644 --- a/unittests/testarchivistpost.py +++ b/unittests/testarchivistpost.py @@ -28,6 +28,9 @@ class TestArchivistMethods(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + class TestArchivistPost(TestArchivistMethods): """ diff --git a/unittests/testassetattachments.py b/unittests/testassetattachments.py index 68a774da..f9b993b9 100644 --- a/unittests/testassetattachments.py +++ b/unittests/testassetattachments.py @@ -64,6 +64,9 @@ class TestAssetAttachmentsBase(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_assetattachments_str(self): """ Test attachments str diff --git a/unittests/testassetsconstants.py b/unittests/testassetsconstants.py index a6ac0d7e..d4cb7744 100644 --- a/unittests/testassetsconstants.py +++ b/unittests/testassetsconstants.py @@ -473,7 +473,7 @@ def setUp(self): self.arch = Archivist("url", "authauthauth", max_time=1) def tearDown(self): - self.arch = None + self.arch.close() class TestAssetsBaseConfirm(TestCase): @@ -487,4 +487,4 @@ def setUp(self): self.arch = Archivist("url", "authauthauth", max_time=100) def tearDown(self): - self.arch = None + self.arch.close() diff --git a/unittests/testattachments.py b/unittests/testattachments.py index 231de24d..327d97c1 100644 --- a/unittests/testattachments.py +++ b/unittests/testattachments.py @@ -64,6 +64,9 @@ class TestAttachmentsBase(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_attachments_str(self): """ Test attachments str diff --git a/unittests/testcompliance.py b/unittests/testcompliance.py index 9a5fdb1a..6a3b6c46 100644 --- a/unittests/testcompliance.py +++ b/unittests/testcompliance.py @@ -72,6 +72,9 @@ class TestCompliance(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_compliance_str(self): """ Test compliance str diff --git a/unittests/testcompliance_policies.py b/unittests/testcompliance_policies.py index db079c24..9145bc9b 100644 --- a/unittests/testcompliance_policies.py +++ b/unittests/testcompliance_policies.py @@ -89,6 +89,9 @@ class TestCompliancePolicies(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_compliance_policies_str(self): """ Test compliance_policy str diff --git a/unittests/testeventsconfirm.py b/unittests/testeventsconfirm.py index b7684ad6..81052e93 100644 --- a/unittests/testeventsconfirm.py +++ b/unittests/testeventsconfirm.py @@ -32,7 +32,7 @@ def setUp(self): self.arch = Archivist("url", "authauthauth", max_time=100) def tearDown(self): - self.arch = None + self.arch.close() def test_events_create_with_confirmation(self): """ diff --git a/unittests/testeventsconstants.py b/unittests/testeventsconstants.py index 0afe01fd..26a4f794 100644 --- a/unittests/testeventsconstants.py +++ b/unittests/testeventsconstants.py @@ -420,4 +420,4 @@ def setUp(self): self.arch = Archivist("url", "authauthauth", max_time=1) def tearDown(self): - self.arch = None + self.arch.close() diff --git a/unittests/testfixtures.py b/unittests/testfixtures.py index 5581d6de..9bdac707 100644 --- a/unittests/testfixtures.py +++ b/unittests/testfixtures.py @@ -67,32 +67,32 @@ def test_fixtures(self): """ Test default archivist creation """ - arch = Archivist("url", "authauthauth") - self.assertEqual( - arch.fixtures, - {}, - msg="Incorrect fixtures", - ) - arch.fixtures = FIXTURE1 - self.assertEqual( - arch.fixtures, - FIXTURE1, - msg="Incorrect fixtures", - ) + with Archivist("url", "authauthauth") as arch: + self.assertEqual( + arch.fixtures, + {}, + msg="Incorrect fixtures", + ) + arch.fixtures = FIXTURE1 + self.assertEqual( + arch.fixtures, + FIXTURE1, + msg="Incorrect fixtures", + ) - # prove that copy recreates all underlying fixtures - newarch = copy(arch) + # prove that copy recreates all underlying fixtures + newarch = copy(arch) - newarch.fixtures = FIXTURE2 - self.assertEqual( - newarch.fixtures, - FIXTURE3, - msg="Incorrect fixtures", - ) + newarch.fixtures = FIXTURE2 + self.assertEqual( + newarch.fixtures, + FIXTURE3, + msg="Incorrect fixtures", + ) - # but original arch is untouched - self.assertEqual( - arch.fixtures, - FIXTURE1, - msg="Incorrect fixtures", - ) + # but original arch is untouched + self.assertEqual( + arch.fixtures, + FIXTURE1, + msg="Incorrect fixtures", + ) diff --git a/unittests/testlocations.py b/unittests/testlocations.py index a5fed80b..e49f5c8e 100644 --- a/unittests/testlocations.py +++ b/unittests/testlocations.py @@ -155,6 +155,9 @@ class TestLocations(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_locations_str(self): """ Test locations str diff --git a/unittests/testpublic.py b/unittests/testpublic.py index fdb60cc5..aaff77cb 100644 --- a/unittests/testpublic.py +++ b/unittests/testpublic.py @@ -35,66 +35,66 @@ def test_public(self): """ Test default public creation """ - public = ArchivistPublic() - self.assertEqual( - str(public), - "ArchivistPublic()", - msg="Incorrect str", - ) - self.assertEqual( - str(public.assets), - "AssetsPublic()", - msg="Incorrect assets", - ) - self.assertEqual( - str(public.events), - "EventsPublic()", - msg="Incorrect events", - ) - self.assertEqual( - public.verify, - True, - msg="verify must be True", - ) - self.assertEqual( - public.public, - True, - msg="verify must be True", - ) - with self.assertRaises(AttributeError): - e = public.Illegal_endpoint + with ArchivistPublic() as public: + self.assertEqual( + str(public), + "ArchivistPublic()", + msg="Incorrect str", + ) + self.assertEqual( + str(public.assets), + "AssetsPublic()", + msg="Incorrect assets", + ) + self.assertEqual( + str(public.events), + "EventsPublic()", + msg="Incorrect events", + ) + self.assertEqual( + public.verify, + True, + msg="verify must be True", + ) + self.assertEqual( + public.public, + True, + msg="verify must be True", + ) + with self.assertRaises(AttributeError): + e = public.Illegal_endpoint def test_public_copy(self): """ Test public copy """ - public = ArchivistPublic(verify=False) - public1 = copy(public) - self.assertEqual( - public.verify, - public1.verify, - msg="Incorrect verify", - ) - self.assertEqual( - public.fixtures, - public1.fixtures, - msg="Incorrect fixtures", - ) - self.assertEqual( - public.public, - public1.public, - msg="Incorrect public", - ) + with ArchivistPublic(verify=False) as public: + public1 = copy(public) + self.assertEqual( + public.verify, + public1.verify, + msg="Incorrect verify", + ) + self.assertEqual( + public.fixtures, + public1.fixtures, + msg="Incorrect fixtures", + ) + self.assertEqual( + public.public, + public1.public, + msg="Incorrect public", + ) def test_public_no_verify(self): """ Test public creation with no verify """ - public = ArchivistPublic(verify=False) - self.assertFalse( - public.verify, - msg="verify must be False", - ) + with ArchivistPublic(verify=False) as public: + self.assertFalse( + public.verify, + msg="verify must be False", + ) class TestPublicMethods(TestCase): @@ -105,6 +105,9 @@ class TestPublicMethods(TestCase): def setUp(self): self.public = ArchivistPublic() + def tearDown(self): + self.public.close() + class TestPublicCount(TestPublicMethods): """ diff --git a/unittests/testpublicassets.py b/unittests/testpublicassets.py index e83cdbc7..a6f36e35 100644 --- a/unittests/testpublicassets.py +++ b/unittests/testpublicassets.py @@ -31,7 +31,6 @@ def setUp(self): def tearDown(self): self.public.close() - self.public = None class TestPublicAssetsUtil(TestPublicAssetsBase): diff --git a/unittests/testrunner.py b/unittests/testrunner.py index 0de2a0c0..1a8bd2ea 100644 --- a/unittests/testrunner.py +++ b/unittests/testrunner.py @@ -58,6 +58,9 @@ class TestRunner(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_runner_str(self): """ Test runner str diff --git a/unittests/testrunneractiomap.py b/unittests/testrunneractiomap.py index d39e7919..6c87e2e3 100644 --- a/unittests/testrunneractiomap.py +++ b/unittests/testrunneractiomap.py @@ -32,6 +32,9 @@ def setUp(self): self.arch = Archivist("url", "authauthauth") self.actionmap = _ActionMap(self.arch) + def tearDown(self): + self.arch.close() + def test_runner_actionmap_action(self): """ Test runner action map diff --git a/unittests/testrunnerlocation.py b/unittests/testrunnerlocation.py index 903febde..3b2d4ba9 100644 --- a/unittests/testrunnerlocation.py +++ b/unittests/testrunnerlocation.py @@ -67,6 +67,9 @@ class TestRunnerLocationsCreate(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + @mock.patch("archivist.runner.time_sleep") def test_runner_locations_create(self, mock_sleep): """ diff --git a/unittests/testrunnerstep.py b/unittests/testrunnerstep.py index 802d60e8..43cfef35 100644 --- a/unittests/testrunnerstep.py +++ b/unittests/testrunnerstep.py @@ -41,6 +41,9 @@ def location_id_method(unused_label): def setUp(self): self.arch = Archivist("url", "authauthauth") + def tearDown(self): + self.arch.close() + def test_runner_step_with_delete_method(self): """ Test runner step diff --git a/unittests/testsboms.py b/unittests/testsboms.py index bcb29e6e..b83068dd 100644 --- a/unittests/testsboms.py +++ b/unittests/testsboms.py @@ -85,7 +85,7 @@ def setUp(self): self.arch = Archivist("url", "authauthauth", max_time=1) def tearDown(self): - self.arch = None + self.arch.close() class TestSBOMS(TestSBOMSBase): diff --git a/unittests/testsubjects.py b/unittests/testsubjects.py index c07ae807..819e68d1 100644 --- a/unittests/testsubjects.py +++ b/unittests/testsubjects.py @@ -558,6 +558,9 @@ class TestSubjectsConfirm(TestCase): def setUp(self): self.arch = Archivist("url", "authauthauth", max_time=100) + def tearDown(self): + self.arch.close() + def test_subjects_create_with_confirmation(self): """ Test subjects creation diff --git a/unittests/testtenancies.py b/unittests/testtenancies.py index 826878b5..7ca846d1 100644 --- a/unittests/testtenancies.py +++ b/unittests/testtenancies.py @@ -46,7 +46,7 @@ def setUp(self): self.arch = Archivist("url", "authauthauth", max_time=1) def tearDown(self): - self.arch = None + self.arch.close() class TestTenanciesRead(TestTenanciesBase):