Skip to content

Commit

Permalink
wip: trash code to show the flow for pull-thru proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
flavianmissi committed Jan 14, 2022
1 parent 6510043 commit 69ac948
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 7 deletions.
82 changes: 82 additions & 0 deletions endpoints/v2/blob.py
@@ -1,5 +1,6 @@
import logging
import re
import requests

from flask import url_for, request, redirect, Response, abort as flask_abort

Expand Down Expand Up @@ -88,6 +89,87 @@ def check_blob_exists(namespace_name, repo_name, digest):
@check_region_blacklisted(BlobDownloadGeoBlocked)
@cache_control(max_age=31536000)
def download_blob(namespace_name, repo_name, digest):
# TODO: quay auth check returns 401 when the repository doesn't exist.
# we shouldn't force customers to create every single repository before
# doing the pull-thru, so this need to be circumvented somehow.

# hard code pull-thru proxy config for proof of concept
PULL_THRU_CONFIG = {
"namespace": "library",
"registry": "https://registry.hub.docker.com",
"auth": "https://auth.docker.io/token",
"repository": "postgres",
}
if namespace_name == PULL_THRU_CONFIG["namespace"]:
session = requests.Session()

# use anonymous auth for now.
auth_url = PULL_THRU_CONFIG["auth"].rstrip("/") + f"?service=registry.docker.io&scope=repository:{namespace_name}/{repo_name}:pull"
token = session.get(auth_url).json()["token"]

url = PULL_THRU_CONFIG["registry"].rstrip("/") + f"/v2/{namespace_name}/{repo_name}/blobs/{digest}"
headers = {
"Authorization": f"Bearer {token}",
}
accept = request.headers.get("Accept", None)
if accept is not None:
headers["Accept"] = accept

# check if blob exists in upstream, this won't count towards quota
resp = session.head(url, headers=headers, allow_redirects=True)
if resp.status_code != 200:
# TODO: make it clear to the client that this error is a 404
# from the upstream registry.
error = {
"errors": [{"message": "fail to fetch upstream stuff"}],
"upstream_error": resp.json(),
}
return Response(
error,
status=resp.status_code,
)

# download blob
resp = session.get(
url,
headers=headers,
allow_redirects=True,
stream=True,
)
if resp.status_code != 200:
# TODO: inspect response and return appropriate error to client
error = {
"error": "fail to fetch upstream stuff",
"errors": [{"message": "fail to fetch upstream stuff"}],
"upstream_error": resp.json(),
}
return Response(
error,
status=resp.status_code,
)

def stream_contents():
chunk = 1024
for chunk in resp.iter_content(chunk_size=chunk):
yield chunk

blob_size = int(resp.headers.get("Content-Length"))
headers = {
"Docker-Content-Digest": digest,
"Content-Type": BLOB_CONTENT_TYPE,
"Content-Length": blob_size,
}
accept_ranges = resp.headers.get("Accept-Ranges", None)
if accept_ranges is not None:
headers["Accept-Ranges"] = accept_ranges

image_pulled_bytes.labels("v2").inc(blob_size)
return Response(
stream_contents(),
headers=headers,
)


# Find the blob.
blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest)
if blob is None:
Expand Down
92 changes: 85 additions & 7 deletions endpoints/v2/manifest.py
Expand Up @@ -4,15 +4,21 @@

from flask import request, url_for, Response

import requests

import features

from app import app, storage
from auth.registry_jwt_auth import process_registry_jwt_auth
from auth.permissions import CreateRepositoryPermission
from auth.auth_context import get_authenticated_user
from digest import digest_tools
from data.database import db_disallow_replica_use
from data.database import db_disallow_replica_use, db_transaction
from data.registry_model.blobuploader import create_blob_upload, complete_when_uploaded
from data.registry_model import registry_model
from data.model.oci.manifest import CreateManifestException
from data.model.oci.tag import RetargetTagException
from endpoints.api.repository_models_pre_oci import pre_oci_model
from endpoints.decorators import (
anon_protect,
disallow_for_account_recovery_mode,
Expand All @@ -21,16 +27,22 @@
)
from endpoints.metrics import image_pulls, image_pushes
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write
from endpoints.v2.blob import _upload_chunk
from endpoints.v2.errors import (
ManifestInvalid,
ManifestUnknown,
NameInvalid,
InvalidRequest,
TagExpired,
NameUnknown,
)
from image.shared import ManifestException
from image.shared.schemas import parse_manifest_from_bytes
from image.docker.schema1 import DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE, DOCKER_SCHEMA1_CONTENT_TYPES
from image.docker.schema1 import (
DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE,
DOCKER_SCHEMA1_CONTENT_TYPES,
DockerSchema1Manifest,
)
from image.docker.schema2 import DOCKER_SCHEMA2_CONTENT_TYPES
from image.oci import OCI_CONTENT_TYPES
from notifications import spawn_notification
Expand All @@ -47,13 +59,70 @@
MANIFEST_TAGNAME_ROUTE = BASE_MANIFEST_ROUTE.format(VALID_TAG_PATTERN)


def _proxy_upstream(config, namespace_name, repo_name, manifest_ref):
session = requests.Session()

# use anonymous auth for now.
auth_url = config["auth"].rstrip("/") + f"?service=registry.docker.io&scope=repository:{namespace_name}/{repo_name}:pull"
token = session.get(auth_url).json()["token"]

url = config["registry"].rstrip("/") + f"/v2/{namespace_name}/{repo_name}/manifests/{manifest_ref}"
headers = {
"Authorization": f"Bearer {token}",
}
accept = request.headers.get("Accept", None)
if accept is not None:
headers["Accept"] = accept

# check if manifest exists in upstream, this won't count towards quota
resp = session.head(url, headers=headers)
if resp.status_code != 200:
# TODO: make it clear to the client that this error is a 404
# from the upstream registry.
return Response(
resp.text,
status=resp.status_code,
)

resp = session.get(url, headers=headers)
if resp.status_code != 200:
# TODO: inspect response and return appropriate error to client
return Response(
resp.text,
status=resp.status_code,
)

return Response(
resp.text,
status=200,
headers={
"Content-Type": resp.headers["Content-Type"],
"Docker-Content-Digest": resp.headers["Docker-Content-Digest"],
},
)


@v2_bp.route(MANIFEST_TAGNAME_ROUTE, methods=["GET"])
@disallow_for_account_recovery_mode
@parse_repository_name()
@process_registry_jwt_auth(scopes=["pull"])
@require_repo_read
@anon_protect
def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
# TODO: quay auth check returns 401 when the repository doesn't exist.
# we shouldn't force customers to create every single repository before
# doing the pull-thru, so this need to be circumvented somehow.

# hard code pull-thru proxy config for proof of concept
PULL_THRU_CONFIG = {
"namespace": "library",
"registry": "https://registry.hub.docker.com",
"auth": "https://auth.docker.io/token",
"repository": "postgres",
}
if namespace_name == PULL_THRU_CONFIG["namespace"]:
return _proxy_upstream(PULL_THRU_CONFIG, namespace_name, repo_name, manifest_ref)

repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:
image_pulls.labels("v2", "tag", 404).inc()
Expand Down Expand Up @@ -113,6 +182,15 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
@require_repo_read
@anon_protect
def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref):
# hard code pull-thru proxy config for proof of concept
PULL_THRU_CONFIG = {
"namespace": "library",
"registry": "https://registry.hub.docker.com",
"auth": "https://auth.docker.io/token",
}
if namespace_name == PULL_THRU_CONFIG["namespace"]:
return _proxy_upstream(PULL_THRU_CONFIG, namespace_name, repo_name, manifest_ref)

repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:
image_pulls.labels("v2", "manifest", 404).inc()
Expand Down Expand Up @@ -228,7 +306,7 @@ def _doesnt_accept_schema_v1():
@anon_protect
@check_readonly
def write_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
parsed = _parse_manifest()
parsed = _parse_manifest(request.content_type, request.data)
return _write_manifest_and_log(namespace_name, repo_name, manifest_ref, parsed)


Expand All @@ -241,7 +319,7 @@ def write_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
@anon_protect
@check_readonly
def write_manifest_by_digest(namespace_name, repo_name, manifest_ref):
parsed = _parse_manifest()
parsed = _parse_manifest(request.content_type, request.data)
if parsed.digest != manifest_ref:
image_pushes.labels("v2", 400, "").inc()
raise ManifestInvalid(detail={"message": "manifest digest mismatch"})
Expand Down Expand Up @@ -280,14 +358,14 @@ def write_manifest_by_digest(namespace_name, repo_name, manifest_ref):
)


def _parse_manifest():
content_type = request.content_type or DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE
def _parse_manifest(content_type, request_data):
content_type = content_type or DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE
if content_type == "application/json":
# For back-compat.
content_type = DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE

try:
return parse_manifest_from_bytes(Bytes.for_string_or_unicode(request.data), content_type)
return parse_manifest_from_bytes(Bytes.for_string_or_unicode(request_data), content_type)
except ManifestException as me:
logger.exception("failed to parse manifest when writing by tagname")
raise ManifestInvalid(detail={"message": "failed to parse manifest: %s" % me})
Expand Down

0 comments on commit 69ac948

Please sign in to comment.