Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion archivist/archivist.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,13 @@ def post(

@retry_429
def post_file(
self, path: str, fd: BinaryIO, mtype: str, *, form: Optional[str] = "file"
self,
path: str,
fd: BinaryIO,
mtype: str,
*,
form: Optional[str] = "file",
params: Optional[Dict] = None,
) -> Dict:
"""POST method (REST) - upload binary

Expand All @@ -365,6 +371,7 @@ def post_file(
path (str): e.g. v2/assets
fd : iterable representing the contents of a file.
mtype (str): mime type e.g. image/jpg
params (dict): dictiuonary of optional path params

Returns:
dict representing the response body (entity).
Expand All @@ -379,6 +386,10 @@ def post_file(
headers = {
"content-type": multipart.content_type,
}
if params:
qry = "&".join(sorted(f"{k}={v}" for k, v in _dotstring(params)))
path = "?".join((path, qry))

response = self._session.post(
SEP.join((self.url, ROOT, path)),
data=multipart, # type: ignore https://github.com/requests/toolbelt/issues/312
Expand Down
4 changes: 2 additions & 2 deletions archivist/confirmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _wait_for_confirmation(
on_giveup=__on_giveup_confirmation,
)
def _wait_for_confirmation(self, identity):
"""docstring"""
"""Return None until entity is confirmed"""
entity = self.read(identity)

if CONFIRMATION_STATUS not in entity:
Expand Down Expand Up @@ -115,7 +115,7 @@ def __on_giveup_confirmed(details):
on_giveup=__on_giveup_confirmed,
)
def _wait_for_confirmed(self, *, props=None, **kwargs) -> bool:
"""docstring"""
"""Return False until all entities are confirmed"""

# look for unconfirmed entities
newprops = deepcopy(props) if props else {}
Expand Down
17 changes: 1 addition & 16 deletions archivist/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import logging

from typing import overload

import backoff

from .errors import ArchivistUnpublishedError
Expand Down Expand Up @@ -43,19 +41,6 @@ def __on_giveup_publication(details):
)


# These overloads are used for type hinting, if self is sboms client then
# an SBOM metadata will be returned.
# Overloads are evaluated at startup but not at runtime, therefore
# no test coverage be done directly.


@overload
def _wait_for_publication(
self: "sboms._SbomsClient", identity: str
) -> "sbommetadata.SBOM":
... # pragma: no cover


@backoff.on_predicate(
backoff.expo,
logger=LOGGER,
Expand All @@ -64,7 +49,7 @@ def _wait_for_publication(
on_giveup=__on_giveup_publication,
)
def _wait_for_publication(self, identity):
"""docstring"""
"""Return None until published date is set"""
entity = self.read(identity)

if entity.published_date:
Expand Down
41 changes: 36 additions & 5 deletions archivist/sboms.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
SBOMS_WITHDRAW,
SBOMS_PUBLISH,
)
from . import publisher, withdrawer
from . import publisher, uploader, withdrawer
from .dictmerge import _deepmerge
from .sbommetadata import SBOM

Expand All @@ -66,29 +66,60 @@ class _SBOMSClient:
def __init__(self, archivist: "type_helper.Archivist"):
self._archivist = archivist

def upload(self, fd: BinaryIO, *, mtype: str = "text/xml") -> SBOM:
def upload(
self,
fd: BinaryIO,
*,
confirm: bool = False,
mtype: str = "text/xml",
params: Optional[Dict] = None,
) -> SBOM:
"""Create SBOM

Creates SBOM from opened file or other data source.

Args:
fd (file): opened file descriptor or other file-type iterable.
confirm (bool): if True wait for sbom to be uploaded.
mtype (str): mimetype of data.
params (dict): optional e.g. {"sbomType": "cyclonedx-xml", "privacy": "PUBLIC" }

Returns:
:class:`SBOM` instance

"""

LOGGER.debug("Upload SBOM")
return SBOM(

sbom = SBOM(
**self._archivist.post_file(
f"{SBOMS_SUBPATH}/{SBOMS_LABEL}",
fd,
mtype,
form="sbom",
params=params,
)
)
if not confirm:
return sbom

return self.wait_for_uploading(sbom.identity)

def wait_for_uploading(self, identity: str) -> SBOM:
"""Wait for sbom to be uploaded.

Waits for sbom to be uploaded.

Args:
identity (str): identity of sbom

Returns:
True if sbom is uploaded.

"""
uploader.MAX_TIME = self._archivist.max_time
# pylint: disable=protected-access
return uploader._wait_for_uploading(self, identity) # type: ignore

def download(self, identity: str, fd: BinaryIO) -> Response:
"""Read SBOM
Expand Down Expand Up @@ -212,7 +243,7 @@ def wait_for_publication(self, identity: str) -> SBOM:
"""
publisher.MAX_TIME = self._archivist.max_time
# pylint: disable=protected-access
return publisher._wait_for_publication(self, identity)
return publisher._wait_for_publication(self, identity) # type: ignore

def withdraw(self, identity: str, confirm: bool = False) -> SBOM:
"""Withdraw SBOM
Expand Down Expand Up @@ -254,4 +285,4 @@ def wait_for_withdrawn(self, identity: str) -> SBOM:
"""
withdrawer.MAX_TIME = self._archivist.max_time
# pylint: disable=protected-access
return withdrawer._wait_for_withdrawn(self, identity)
return withdrawer._wait_for_withdrawn(self, identity) # type: ignore
57 changes: 57 additions & 0 deletions archivist/uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""uploader interface
"""

import logging

import backoff

from .errors import ArchivistNotFoundError


# pylint:disable=unused-import # To prevent cyclical import errors forward referencing is used
# pylint:disable=cyclic-import # but pylint doesn't understand this feature
from . import sboms

MAX_TIME = 1200

LOGGER = logging.getLogger(__name__)


def __lookup_max_time():
return MAX_TIME


# pylint: disable=consider-using-f-string
def __backoff_handler(details):
LOGGER.debug("MAX_TIME %s", MAX_TIME)
LOGGER.debug(
"Backing off {wait:0.1f} seconds afters {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details)
)


def __on_giveup_uploading(details):
identity = details["args"][1] # first argument to wait_for_uploading
elapsed = details["elapsed"]
raise ArchivistNotFoundError(
f"uploading for {identity} timed out after {elapsed} seconds"
)


@backoff.on_predicate(
backoff.expo,
logger=LOGGER,
max_time=__lookup_max_time,
on_backoff=__backoff_handler,
on_giveup=__on_giveup_uploading,
)
def _wait_for_uploading(self, identity):
"""Return None until identity is found"""
try:
LOGGER.debug("Uploader Read %s", identity)
entity = self.read(identity)
except ArchivistNotFoundError:
return None

return entity
17 changes: 1 addition & 16 deletions archivist/withdrawer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import logging

from typing import overload

import backoff

from .errors import ArchivistUnwithdrawnError
Expand Down Expand Up @@ -43,19 +41,6 @@ def __on_giveup_withdrawn(details):
)


# These overloads are used for type hinting, if self is sboms client then
# an SBOM metadata will be returned.
# Overloads are evaluated at startup but not at runtime, therefore
# no test coverage be done directly.


@overload
def _wait_for_publication(
self: "sboms._SbomsClient", identity: str
) -> "sbommetadata.SBOM":
... # pragma: no cover


@backoff.on_predicate(
backoff.expo,
logger=LOGGER,
Expand All @@ -64,7 +49,7 @@ def _wait_for_publication(
on_giveup=__on_giveup_withdrawn,
)
def _wait_for_withdrawn(self, identity):
"""docstring"""
"""Return None until withdrawn date is set"""
entity = self.read(identity)

if entity.withdrawn_date:
Expand Down
6 changes: 6 additions & 0 deletions functests/execapplications.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ def test_appidp_token(self):
def test_archivist_token(self):
"""
Test archivist with client id/secret
WARN: this test takes over 10 minutes
"""
print("This test takes over 10 minutes...")
application = self.arch.applications.create(
self.display_name,
CUSTOM_CLAIMS,
Expand All @@ -187,6 +189,7 @@ def test_archivist_token(self):
)

# archivist using app registration
print("New Arch")
new_arch = Archivist(
environ["TEST_ARCHIVIST"],
(application["client_id"], application["credentials"][0]["secret"]),
Expand All @@ -198,6 +201,9 @@ def test_archivist_token(self):
traffic_light = deepcopy(ATTRS)
traffic_light["arc_display_type"] = "Traffic light with violation camera"
asset = new_arch.assets.create(
props={
"proof_mechanism": ProofMechanism.SIMPLE_HASH.name,
},
attrs=traffic_light,
confirm=True,
)
Expand Down
59 changes: 53 additions & 6 deletions functests/execsboms.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,60 @@ def tearDown(cls) -> None:
with suppress(FileNotFoundError):
remove(TEST_SBOM_DOWNLOAD_PATH)

def test_sbom_upload_with_public_privacy(self):
"""
Test sbom upload with privacy
"""
now = now_timestamp()
print("Title:", self.title, now)
with open(TEST_SBOM_PATH, "rb") as fd:
metadata = self.arch.sboms.upload(
fd, confirm=True, params={"privacy": "PUBLIC"}
)
print("first upload", json_dumps(metadata.dict(), indent=4))
identity = metadata.identity

metadata1 = self.arch.sboms.read(identity)
print("read", json_dumps(metadata1.dict(), indent=4))
self.assertEqual(
metadata,
metadata1,
msg="Metadata not correct",
)

def test_sbom_upload_with_confirmation(self):
"""
Test sbom upload with confirmation
"""
now = now_timestamp()
print("Title:", self.title, now)
with open(TEST_SBOM_PATH, "rb") as fd:
metadata = self.arch.sboms.upload(fd, confirm=True)
print("first upload", json_dumps(metadata.dict(), indent=4))
identity = metadata.identity

metadata1 = self.arch.sboms.read(identity)
print("read", json_dumps(metadata1.dict(), indent=4))
self.assertEqual(
metadata,
metadata1,
msg="Metadata not correct",
)

sleep(1) # the data may have not reached cogsearch
metadatas = list(self.arch.sboms.list(metadata={"uploaded_since": now}))
self.assertEqual(
len(metadatas),
1,
msg="No. of SBOMS should be 1",
)

def test_sbom_upload_and_download(self):
"""
Test sbom upload and download through the SDK
"""
print("Title:", self.title)
now = now_timestamp()
print("Title:", self.title, now)
with open(TEST_SBOM_PATH, "rb") as fd:
metadata = self.arch.sboms.upload(fd)

Expand All @@ -83,11 +131,8 @@ def test_sbom_upload_and_download(self):
msg="Metadata not correct",
)

sleep(1) # otherwise test fails
sleep(1) # the data may have not reached cogsearch
metadatas = list(self.arch.sboms.list(metadata={"uploaded_since": now}))
for i, m in enumerate(metadatas):
print(i, ":", json_dumps(m.dict(), indent=4))

self.assertEqual(
len(metadatas),
1,
Expand All @@ -99,6 +144,9 @@ def test_sbom_upload_and_download(self):
msg="Metadata not correct",
)

for i, m in enumerate(metadatas):
print(i, ":", json_dumps(m.dict(), indent=4))

metadata2 = self.arch.sboms.publish(identity, confirm=True)
print("publish", json_dumps(metadata2.dict(), indent=4))
self.assertNotEqual(
Expand Down Expand Up @@ -135,7 +183,6 @@ def test_sbom_upload_and_download(self):
msg="Withdrawn_date not correct",
)

sleep(1) # otherwise test fails
metadatas = list(
self.arch.sboms.list(
page_size=50,
Expand Down
Loading