Skip to content

Commit

Permalink
fix: record state after timestamp update (#588)
Browse files Browse the repository at this point in the history
fixing a bug where state was not recorded after updating timestamp

Signed-off-by: Arvind Somya <arvind.somya@anchore.com>
  • Loading branch information
asomya committed May 24, 2024
1 parent 746854a commit 012417f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/vunnel/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def _update(self) -> None:
else:
urls, count = self.update(last_updated=last_updated)

if count > 0:
if count > 0 or stale:
self.workspace.record_state(
stale=stale,
version=self.version(),
Expand All @@ -193,17 +193,19 @@ def _fetch_or_use_results_archive(self) -> tuple[list[str], int, datetime.dateti
latest_entry = listing_doc.latest_entry(schema_version=self.distribution_version())
if not latest_entry:
raise RuntimeError("no listing entry found")

timestamp = None
if self.runtime_cfg.skip_newer_archive_check or self._has_newer_archive(latest_entry=latest_entry):
self.logger.info("fetching latest listing")
self._prep_workspace_from_listing_entry(entry=latest_entry)
else:
# Update the timestamp of the state to the latest entry's built time
self.logger.info("using existing listing and updating timestamp")
self.workspace.state().timestamp = datetime.datetime.fromisoformat(latest_entry.built)
timestamp = datetime.datetime.fromisoformat(latest_entry.built)

state = self.workspace.state()
return state.urls, state.result_count(self.workspace.path), state.timestamp
if not timestamp:
timestamp = state.timestamp
return state.urls, state.result_count(self.workspace.path), timestamp

def _fetch_listing_document(self) -> distribution.ListingDocument:
url = self.runtime_cfg.import_url(provider_name=self.name())
Expand Down
54 changes: 54 additions & 0 deletions tests/unit/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def update(self, *args, **kwargs):
return ["http://localhost:8000/dummy-input-1.json"], 1


class DummyProviderWithZeroCountOnUpdate(DummyProvider):
def _fetch_or_use_results_archive(self):
urls, _ = self.update()
return urls, 0, datetime.datetime(2021, 1, 1, 0, 0, 0)


def get_random_string(length=10):
characters = string.ascii_letters + string.digits
return "".join(random.choice(characters) for _ in range(length))
Expand Down Expand Up @@ -96,6 +102,30 @@ def apply(populate=True, use_dir=None, **kwargs) -> provider.Provider:
return apply


@pytest.fixture()
def dummy_provider_with_zero_count_on_update(tmpdir):
def apply(populate=True, use_dir=None, **kwargs) -> provider.Provider:
if not use_dir:
use_dir = tmpdir + get_random_string()
# create a dummy provider
subject = DummyProviderWithZeroCountOnUpdate(root=use_dir, **kwargs)

if populate:
# update the provider
subject.run()

# check that the input and results are populated
assert os.path.exists(subject.input_file)
existing_results = os.listdir(subject.workspace.results_path)
assert len(existing_results) > 0
else:
subject.workspace.create()

return subject

return apply


def test_clear_existing_state(dummy_provider):
policy = provider.RuntimeConfig(
existing_input=provider.InputStatePolicy.DELETE,
Expand Down Expand Up @@ -813,6 +843,30 @@ def test_has_newer_archive_false(dummy_provider):
assert not subject._has_newer_archive(entry)


def test_timestamp_updated_on_fetch_or_use_results_archive(tmpdir, dummy_provider):
subject = dummy_provider(populate=True)
subject.runtime_cfg.import_results_enabled = True
subject.runtime_cfg.import_results_host = "http://localhost"
subject.runtime_cfg.import_results_path = "{provider_name}/listing.json"
current_state = subject.workspace.state()
# fetch the results archive
urls, count, timestamp = subject._fetch_or_use_results_archive()
assert current_state.timestamp != timestamp
assert timestamp == datetime.datetime(2021, 1, 1, 0, 0, 0)


def test_state_update_on_stale(tmpdir, dummy_provider_with_zero_count_on_update):
subject = dummy_provider_with_zero_count_on_update(populate=True)
current_state = subject.workspace.state()
subject.runtime_cfg.import_results_enabled = True
subject.runtime_cfg.import_results_host = "http://localhost"
subject.runtime_cfg.import_results_path = "{provider_name}/listing.json"
subject._update()
new_state = subject.workspace.state()
assert new_state.timestamp is not None
assert new_state.timestamp == datetime.datetime(2021, 1, 1, 0, 0, 0)


@pytest.mark.parametrize(
"host,path,want",
[
Expand Down

0 comments on commit 012417f

Please sign in to comment.