forked from pulp/pulp_deb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_download_policies.py
96 lines (82 loc) · 3.63 KB
/
test_download_policies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""Tests for Pulp download policies."""
import pytest
from pulp_smash.pulp3.bindings import delete_orphans
from pulp_smash.pulp3.utils import (
get_added_content_summary,
get_content_summary,
)
from pulp_deb.tests.functional.constants import DEB_FIXTURE_PACKAGE_COUNT, DEB_FIXTURE_SUMMARY
@pytest.mark.parametrize("policy", ["on_demand", "streamed"])
def test_download_policy(
apt_package_api,
deb_get_fixture_server_url,
deb_get_repository_by_href,
deb_publication_factory,
deb_remote_factory,
deb_repository_factory,
deb_sync_repository,
orphans_cleanup_api_client,
policy,
):
"""Test whether lazy synced content can be accessed with different download policies."""
orphans_cleanup_api_client.cleanup({"orphan_protection_time": 0})
# Create repository and remote and verify latest `repository_version` is 0
repo = deb_repository_factory()
url = deb_get_fixture_server_url()
remote = deb_remote_factory(url=url, policy=policy)
assert repo.latest_version_href.endswith("/0/")
# Sync and verify latest `repository_version` is 1
deb_sync_repository(remote, repo)
repo = deb_get_repository_by_href(repo.pulp_href)
assert repo.latest_version_href.endswith("/1/")
# Verify the correct amount of content units are available
assert DEB_FIXTURE_SUMMARY == get_content_summary(repo.to_dict())
assert DEB_FIXTURE_SUMMARY == get_added_content_summary(repo.to_dict())
# Sync again and verify the same amount of content units are available
latest_version_href = repo.latest_version_href
deb_sync_repository(remote, repo)
repo = deb_get_repository_by_href(repo.pulp_href)
assert repo.latest_version_href == latest_version_href
assert DEB_FIXTURE_SUMMARY == get_content_summary(repo.to_dict())
# Create a publication and verify the `repository_version` is not empty
publication = deb_publication_factory(repo, simple=True)
assert publication.repository_version is not None
# Verify the correct amount of packages are available
content = apt_package_api.list()
assert content.count == DEB_FIXTURE_PACKAGE_COUNT
# TODO: Should think of a better approach to test this case.
@pytest.mark.skip(reason="Currently breaking the CI")
@pytest.mark.parametrize("policy", ["on_demand", "streamed"])
def test_lazy_sync_immediate_download_test(
artifacts_api_client,
deb_get_fixture_server_url,
deb_get_remote_by_href,
deb_get_repository_by_href,
deb_patch_remote,
deb_remote_factory,
deb_repository_factory,
deb_sync_repository,
policy,
):
"""Test whether a immediate sync after a lazy one has all artifacts available."""
# Cleanup artifacts
NON_LAZY_ARTIFACT_COUNT = 17
delete_orphans()
# Create repository and remote and sync them
repo = deb_repository_factory()
url = deb_get_fixture_server_url()
remote = deb_remote_factory(url=url, policy=policy)
deb_sync_repository(remote, repo)
repo = deb_get_repository_by_href(repo.pulp_href)
# Verify amount of artifacts are equal to NON_LAZY_ARTIFACT_COUNT
artifacts = artifacts_api_client.list()
assert artifacts.count == NON_LAZY_ARTIFACT_COUNT
# Update remote policy to immediate and verify it is set
deb_patch_remote(remote, {"policy": "immediate"})
remote = deb_get_remote_by_href(remote.pulp_href)
assert remote.policy == "immediate"
# Sync with updated remote and verify the correct amount artifacts
deb_sync_repository(remote, repo)
repo = deb_get_repository_by_href(repo.pulp_href)
artifacts = artifacts_api_client.list()
assert artifacts.count == NON_LAZY_ARTIFACT_COUNT + DEB_FIXTURE_PACKAGE_COUNT