Skip to content

Commit

Permalink
implemented feedback from the review
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Nov 22, 2023
1 parent 20b7256 commit 3305ba6
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
2 changes: 1 addition & 1 deletion karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def update_require_validation_for_topic(
topic_name: TopicName,
skip_validation: bool,
) -> None:
key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0}
key = {"keytype": str(MessageType.schema_validation), "magic": 0}
value = {"skip_validation": skip_validation, "topic": topic_name}
self.producer.send_message(key=key, value=value)

Expand Down
55 changes: 40 additions & 15 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,10 @@ async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None:
status=HTTPStatus.SERVICE_UNAVAILABLE,
)
else:
url = f"{master_url}{request.url.path}"
await self._forward_request_remote(
request=request,
body=request.json,
url=url,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=request.get_header("Content-Type"),
method=request.method,
)
Expand Down Expand Up @@ -621,8 +620,13 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT")
await self._forward_request_remote(
request=request,
body=body,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="PUT",
)

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)

Expand Down Expand Up @@ -692,9 +696,12 @@ async def config_subject_set(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
await self._forward_request_remote(
request=request, body=request.json, url=url, content_type=content_type, method="PUT"
request=request,
body=request.json,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="PUT",
)

self.r({"compatibility": compatibility_level.value}, content_type)
Expand All @@ -717,9 +724,12 @@ async def config_subject_delete(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
await self._forward_request_remote(
request=request, body=request.json, url=url, content_type=content_type, method="PUT"
request=request,
body=request.json,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="PUT",
)

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)
Expand Down Expand Up @@ -791,8 +801,13 @@ async def subject_delete(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")
await self._forward_request_remote(
request=request,
body={},
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url) + f"?permanent={permanent}"),
content_type=content_type,
method="DELETE",
)

async def subject_version_get(
self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None
Expand Down Expand Up @@ -894,8 +909,13 @@ async def subject_version_delete(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")
await self._forward_request_remote(
request=request,
body={},
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url) + f"?permanent={permanent}"),
content_type=content_type,
method="DELETE",
)

async def subject_version_schema_get(
self, content_type: str, *, subject: str, version: str, user: User | None = None
Expand Down Expand Up @@ -1279,8 +1299,13 @@ async def subject_post(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")
await self._forward_request_remote(
request=request,
body=body,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="POST",
)

async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None:
require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic))
Expand Down Expand Up @@ -1319,7 +1344,7 @@ async def set_topic_require_validation(
await self._forward_request_remote(
request=request,
body=None,
url=compute_forwarded_url(master_url=master_url, request_url=request.url),
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="POST",
)
Expand Down

0 comments on commit 3305ba6

Please sign in to comment.