forked from ansible/galaxy_ng
/
collection.py
360 lines (296 loc) · 14.7 KB
/
collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
import logging
import requests
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.http import HttpResponseRedirect, StreamingHttpResponse
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import extend_schema
from pulp_ansible.app.galaxy.v3 import views as pulp_ansible_views
from pulp_ansible.app.models import AnsibleDistribution
from pulp_ansible.app.models import CollectionImport as PulpCollectionImport
from pulp_ansible.app.models import CollectionVersion
from pulpcore.plugin.models import Content
from pulpcore.plugin.models import SigningService
from pulpcore.plugin.models import Task
from pulpcore.plugin.serializers import AsyncOperationResponseSerializer
from pulpcore.plugin.tasking import add_and_remove, dispatch
from rest_framework import status
from rest_framework.exceptions import APIException, NotFound
from rest_framework.response import Response
from galaxy_ng.app import models
from galaxy_ng.app.access_control import access_policy
from galaxy_ng.app.api import base as api_base
from galaxy_ng.app.api.v3.serializers import CollectionUploadSerializer
from galaxy_ng.app.common import metrics
from galaxy_ng.app.common.parsers import AnsibleGalaxy29MultiPartParser
from galaxy_ng.app.constants import INBOUND_REPO_NAME_FORMAT, DeploymentMode
from galaxy_ng.app.tasks import (
call_move_content_task,
call_sign_and_move_task,
import_and_auto_approve,
import_and_move_to_staging,
)
log = logging.getLogger(__name__)
class CollectionUploadViewSet(api_base.LocalSettingsMixin,
pulp_ansible_views.CollectionUploadViewSet):
permission_classes = [access_policy.CollectionAccessPolicy]
parser_classes = [AnsibleGalaxy29MultiPartParser]
serializer_class = CollectionUploadSerializer
def _dispatch_upload_collection_task(self, args=None, kwargs=None, repository=None):
"""Dispatch a pulp task started on upload of collection version."""
locks = []
context = super().get_serializer_context()
request = context.get("request", None)
kwargs = kwargs or {}
kwargs["general_args"] = args
kwargs["username"] = request.user.username
if repository:
locks.append(repository)
kwargs["repository_pk"] = repository.pk
if settings.GALAXY_REQUIRE_CONTENT_APPROVAL:
return dispatch(import_and_move_to_staging, exclusive_resources=locks, kwargs=kwargs)
return dispatch(import_and_auto_approve, exclusive_resources=locks, kwargs=kwargs)
# Wrap super().create() so we can create a galaxy_ng.app.models.CollectionImport based on the
# the import task and the collection artifact details
def _get_data(self, request):
serializer = CollectionUploadSerializer(data=request.data, context={'request': request})
serializer.is_valid(raise_exception=True)
return serializer.validated_data
@staticmethod
def _get_path(kwargs, filename_ns):
"""Use path from '/content/<path>/v3/' or
if user does not specify distribution base path
then use an inbound distribution based on filename namespace.
"""
# the legacy collection upload views don't get redirected and still have to use the
# old path arg
path = kwargs.get(
'distro_base_path',
kwargs.get('path', settings.ANSIBLE_DEFAULT_DISTRIBUTION_PATH)
)
if path == settings.ANSIBLE_DEFAULT_DISTRIBUTION_PATH:
path = INBOUND_REPO_NAME_FORMAT.format(namespace_name=filename_ns)
return path
@staticmethod
def _check_path_matches_expected_repo(path, filename_ns):
"""Reject if path does not match expected inbound format
containing filename namespace.
Examples:
Reject if path is "staging".
Reject if path does not start with "inbound-".
Reject if path is "inbound-alice" but filename namepace is "bob".
"""
distro = get_object_or_404(AnsibleDistribution, base_path=path)
repo_name = distro.repository.name
if INBOUND_REPO_NAME_FORMAT.format(namespace_name=filename_ns) == repo_name:
return
raise NotFound(
_('Path does not match: "%s"')
% INBOUND_REPO_NAME_FORMAT.format(namespace_name=filename_ns)
)
@extend_schema(
description="Create an artifact and trigger an asynchronous task to create "
"Collection content from it.",
summary="Upload a collection",
request=CollectionUploadSerializer,
responses={202: AsyncOperationResponseSerializer},
)
def create(self, request, *args, **kwargs):
data = self._get_data(request)
filename = data['filename']
path = self._get_path(kwargs, filename_ns=filename.namespace)
try:
namespace = models.Namespace.objects.get(name=filename.namespace)
except models.Namespace.DoesNotExist:
raise ValidationError(
_('Namespace "{0}" does not exist.').format(filename.namespace)
)
self._check_path_matches_expected_repo(path, filename_ns=namespace.name)
self.check_object_permissions(request, namespace)
try:
response = super(CollectionUploadViewSet, self).create(request, path)
except ValidationError:
log.exception('Failed to publish artifact %s (namespace=%s, sha256=%s)', # noqa
data['file'].name, namespace, data.get('sha256'))
raise
task_href = response.data['task']
# icky, have to extract task id from the task_href url
task_id = task_href.strip("/").split("/")[-1]
task_detail = Task.objects.get(pk=task_id)
pulp_collection_import = PulpCollectionImport.objects.get(pk=task_id)
models.CollectionImport.objects.create(
task_id=pulp_collection_import,
created_at=task_detail.pulp_created,
namespace=namespace,
name=data['filename'].name,
version=data['filename'].version,
)
# TODO: CollectionImport.get_absolute_url() should be able to generate this, but
# it needs the repo/distro base_path for the <path> part of url
import_obj_url = reverse("galaxy:api:v3:collection-imports-detail",
kwargs={'pk': str(task_detail.pk),
'path': path})
log.debug('import_obj_url: %s', import_obj_url)
return Response(
data={'task': import_obj_url},
status=response.status_code
)
class CollectionArtifactDownloadView(api_base.APIView):
permission_classes = [access_policy.CollectionAccessPolicy]
action = 'download'
def _get_tcp_response(self, url):
return requests.get(url, stream=True, allow_redirects=False)
def _get_ansible_distribution(self, base_path):
return AnsibleDistribution.objects.get(base_path=base_path)
def get(self, request, *args, **kwargs):
metrics.collection_artifact_download_attempts.inc()
distro_base_path = self.kwargs['distro_base_path']
filename = self.kwargs['filename']
prefix = settings.CONTENT_PATH_PREFIX.strip('/')
distribution = self._get_ansible_distribution(distro_base_path)
if settings.ANSIBLE_COLLECT_DOWNLOAD_LOG:
pulp_ansible_views.CollectionArtifactDownloadView.log_download(
request, filename, distro_base_path
)
if settings.GALAXY_DEPLOYMENT_MODE == DeploymentMode.INSIGHTS.value:
url = 'http://{host}:{port}/{prefix}/{distro_base_path}/{filename}'.format(
host=settings.X_PULP_CONTENT_HOST,
port=settings.X_PULP_CONTENT_PORT,
prefix=prefix,
distro_base_path=distro_base_path,
filename=filename,
)
response = self._get_tcp_response(
distribution.content_guard.cast().preauthenticate_url(url)
)
if response.status_code == requests.codes.not_found:
metrics.collection_artifact_download_failures.labels(
status=requests.codes.not_found
).inc()
raise NotFound()
if response.status_code == requests.codes.found:
return HttpResponseRedirect(response.headers['Location'])
if response.status_code == requests.codes.ok:
metrics.collection_artifact_download_successes.inc()
return StreamingHttpResponse(
response.raw.stream(amt=4096),
content_type=response.headers['Content-Type']
)
metrics.collection_artifact_download_failures.labels(status=response.status_code).inc()
raise APIException(
_('Unexpected response from content app. Code: %s.') % response.status_code
)
elif settings.GALAXY_DEPLOYMENT_MODE == DeploymentMode.STANDALONE.value:
url = '{host}/{prefix}/{distro_base_path}/{filename}'.format(
host=settings.CONTENT_ORIGIN.strip("/"),
prefix=prefix,
distro_base_path=distro_base_path,
filename=filename,
)
return redirect(distribution.content_guard.cast().preauthenticate_url(url))
class CollectionRepositoryMixing:
@property
def version_str(self):
"""Build version_str from request."""
return '-'.join([self.kwargs[key] for key in ('namespace', 'name', 'version')])
def get_collection_version(self):
"""Get collection version entity."""
try:
return CollectionVersion.objects.get(
namespace=self.kwargs['namespace'],
name=self.kwargs['name'],
version=self.kwargs['version'],
)
except ObjectDoesNotExist:
raise NotFound(_('Collection %s not found') % self.version_str)
def get_repos(self):
"""Get src and dest repos."""
try:
src_repo = AnsibleDistribution.objects.get(
base_path=self.kwargs['source_path']).repository
dest_repo = AnsibleDistribution.objects.get(
base_path=self.kwargs['dest_path']).repository
except ObjectDoesNotExist:
raise NotFound(_('Repo(s) for moving collection %s not found') % self.version_str)
return src_repo, dest_repo
class CollectionVersionCopyViewSet(api_base.ViewSet, CollectionRepositoryMixing):
permission_classes = [access_policy.CollectionAccessPolicy]
def copy_content(self, request, *args, **kwargs):
"""Copy collection version from one repository to another."""
collection_version = self.get_collection_version()
src_repo, dest_repo = self.get_repos()
copy_task = dispatch(
add_and_remove,
exclusive_resources=[dest_repo],
shared_resources=[src_repo],
kwargs={
"repository_pk": dest_repo.pk,
"add_content_units": [collection_version.pk],
"remove_content_units": [],
}
)
return Response(data={"task_id": copy_task.pk}, status='202')
class CollectionVersionMoveViewSet(api_base.ViewSet, CollectionRepositoryMixing):
permission_classes = [access_policy.CollectionAccessPolicy]
def move_content(self, request, *args, **kwargs):
"""Remove content from source repo and add to destination repo.
Creates new RepositoryVersion of source repo without content included.
Creates new RepositoryVersion of destination repo with content included.
"""
collection_version = self.get_collection_version()
src_repo, dest_repo = self.get_repos()
content_obj = Content.objects.get(pk=collection_version.pk)
if content_obj not in src_repo.latest_version().content:
raise NotFound(_('Collection %s not found in source repo') % self.version_str)
if content_obj in dest_repo.latest_version().content:
raise NotFound(_('Collection %s already found in destination repo') % self.version_str)
response_data = {
"copy_task_id": None,
"remove_task_id": None,
# Can be removed once all synclist stuff is remove
# and client compat isnt a concern -akl
"curate_all_synclist_repository_task_id": None,
}
golden_repo = settings.get("GALAXY_API_DEFAULT_DISTRIBUTION_BASE_PATH", "published")
auto_sign = settings.get("GALAXY_AUTO_SIGN_COLLECTIONS", False)
move_task_params = {
"collection_version": collection_version,
"source_repo": src_repo,
"dest_repo": dest_repo,
}
if auto_sign and dest_repo.name == golden_repo:
# Assumed that if user has access to modify the repo, they can also sign the content
# so we don't need to check access policies here.
signing_service_name = settings.get(
"GALAXY_COLLECTION_SIGNING_SERVICE", "ansible-default"
)
try:
signing_service = SigningService.objects.get(name=signing_service_name)
except ObjectDoesNotExist:
raise NotFound(_('Signing %s service not found') % signing_service_name)
move_task = call_sign_and_move_task(signing_service, **move_task_params)
else:
require_signatures = settings.get("GALAXY_REQUIRE_SIGNATURE_FOR_APPROVAL", False)
if dest_repo.name == golden_repo and require_signatures:
if collection_version.signatures.count() == 0:
return Response(
{
"detail": _(
"Collection {namespace}.{name} could not be approved "
"because system requires at least a signature for approval."
).format(
namespace=collection_version.namespace,
name=collection_version.name,
)
},
status=status.HTTP_400_BAD_REQUEST,
)
move_task = call_move_content_task(**move_task_params)
response_data['copy_task_id'] = response_data['remove_task_id'] = move_task.pk
if settings.GALAXY_DEPLOYMENT_MODE == DeploymentMode.INSIGHTS.value:
golden_repo = AnsibleDistribution.objects.get(
base_path=settings.GALAXY_API_DEFAULT_DISTRIBUTION_BASE_PATH
).repository
return Response(data=response_data, status='202')