From 873d258a400a24ee06cf1cef5218c59f38cea284 Mon Sep 17 00:00:00 2001 From: Christopher Hoogeboom Date: Mon, 10 Aug 2020 13:59:53 -0400 Subject: [PATCH 1/3] Use the MappingProxyEncoder to encode the JSON --- confluent_kafka/avro/cached_schema_registry_client.py | 9 +++++---- tests/avro/mock_registry.py | 11 ++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index 6fee6151d..ac8b8afec 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -24,6 +24,7 @@ import warnings from collections import defaultdict +from avro.schema import MappingProxyEncoder from requests import Session, utils from .error import ClientError @@ -213,7 +214,7 @@ def register(self, subject, avro_schema): url = '/'.join([self.url, 'subjects', subject, 'versions']) # body is { schema : json_string } - body = {'schema': json.dumps(avro_schema.to_json())} + body = {'schema': json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder)} result, code = self._send_request(url, method='POST', body=body) if (code == 401 or code == 403): raise ClientError("Unauthorized access. Error code:" + str(code)) @@ -253,7 +254,7 @@ def check_registration(self, subject, avro_schema): url = '/'.join([self.url, 'subjects', subject]) # body is { schema : json_string } - body = {'schema': json.dumps(avro_schema.to_json())} + body = {'schema': json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder)} result, code = self._send_request(url, method='POST', body=body) if code == 401 or code == 403: raise ClientError("Unauthorized access. Error code:" + str(code)) @@ -374,7 +375,7 @@ def get_version(self, subject, avro_schema): return version url = '/'.join([self.url, 'subjects', subject]) - body = {'schema': json.dumps(avro_schema.to_json())} + body = {'schema': json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder)} result, code = self._send_request(url, method='POST', body=body) if code == 404: @@ -402,7 +403,7 @@ def test_compatibility(self, subject, avro_schema, version='latest'): """ url = '/'.join([self.url, 'compatibility', 'subjects', subject, 'versions', str(version)]) - body = {'schema': json.dumps(avro_schema.to_json())} + body = {'schema': json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder)} try: result, code = self._send_request(url, method='POST', body=body) if code == 404: diff --git a/tests/avro/mock_registry.py b/tests/avro/mock_registry.py index f40e82f25..395444b09 100644 --- a/tests/avro/mock_registry.py +++ b/tests/avro/mock_registry.py @@ -23,9 +23,10 @@ import sys import json import re - from threading import Thread, Event +from avro.schema import MappingProxyEncoder + from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient from confluent_kafka import avro from confluent_kafka.avro.error import ClientError @@ -100,13 +101,13 @@ def get_schema_by_id(self, req, groups): if not schema: return self._create_error("schema not found", 404) result = { - "schema": json.dumps(schema.to_json()) + "schema": json.dumps(schema.to_json(), cls=MappingProxyEncoder) } return (200, result) def _get_identity_schema(self, avro_schema): # normalized - schema_str = json.dumps(avro_schema.to_json()) + schema_str = json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder) if schema_str in self.schema_cache: return self.schema_cache[schema_str] self.schema_cache[schema_str] = avro_schema @@ -150,7 +151,7 @@ def get_version(self, req, groups): schema_id = self.registry.get_id_for_schema(subject, avro_schema) result = { - "schema": json.dumps(avro_schema.to_json()), + "schema": json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder), "subject": subject, "id": schema_id, "version": version @@ -163,7 +164,7 @@ def get_latest(self, req, groups): if schema_id is None: return self._create_error("Not found", 404) result = { - "schema": json.dumps(avro_schema.to_json()), + "schema": json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder), "subject": subject, "id": schema_id, "version": version From 033171eb6f005284560d3d0db030988725fb5bd2 Mon Sep 17 00:00:00 2001 From: Christopher Hoogeboom Date: Mon, 10 Aug 2020 14:19:28 -0400 Subject: [PATCH 2/3] Use str instead --- confluent_kafka/avro/cached_schema_registry_client.py | 9 ++++----- tests/avro/mock_registry.py | 11 +++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index ac8b8afec..4b376ef6b 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -24,7 +24,6 @@ import warnings from collections import defaultdict -from avro.schema import MappingProxyEncoder from requests import Session, utils from .error import ClientError @@ -214,7 +213,7 @@ def register(self, subject, avro_schema): url = '/'.join([self.url, 'subjects', subject, 'versions']) # body is { schema : json_string } - body = {'schema': json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder)} + body = {'schema': str(avro_schema)} result, code = self._send_request(url, method='POST', body=body) if (code == 401 or code == 403): raise ClientError("Unauthorized access. Error code:" + str(code)) @@ -254,7 +253,7 @@ def check_registration(self, subject, avro_schema): url = '/'.join([self.url, 'subjects', subject]) # body is { schema : json_string } - body = {'schema': json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder)} + body = {'schema': str(avro_schema)} result, code = self._send_request(url, method='POST', body=body) if code == 401 or code == 403: raise ClientError("Unauthorized access. Error code:" + str(code)) @@ -375,7 +374,7 @@ def get_version(self, subject, avro_schema): return version url = '/'.join([self.url, 'subjects', subject]) - body = {'schema': json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder)} + body = {'schema': str(avro_schema)} result, code = self._send_request(url, method='POST', body=body) if code == 404: @@ -403,7 +402,7 @@ def test_compatibility(self, subject, avro_schema, version='latest'): """ url = '/'.join([self.url, 'compatibility', 'subjects', subject, 'versions', str(version)]) - body = {'schema': json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder)} + body = {'schema': str(avro_schema)} try: result, code = self._send_request(url, method='POST', body=body) if code == 404: diff --git a/tests/avro/mock_registry.py b/tests/avro/mock_registry.py index 395444b09..95c74f9c7 100644 --- a/tests/avro/mock_registry.py +++ b/tests/avro/mock_registry.py @@ -23,9 +23,8 @@ import sys import json import re -from threading import Thread, Event -from avro.schema import MappingProxyEncoder +from threading import Thread, Event from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient from confluent_kafka import avro @@ -101,13 +100,13 @@ def get_schema_by_id(self, req, groups): if not schema: return self._create_error("schema not found", 404) result = { - "schema": json.dumps(schema.to_json(), cls=MappingProxyEncoder) + "schema": str(schema) } return (200, result) def _get_identity_schema(self, avro_schema): # normalized - schema_str = json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder) + schema_str = str(avro_schema) if schema_str in self.schema_cache: return self.schema_cache[schema_str] self.schema_cache[schema_str] = avro_schema @@ -151,7 +150,7 @@ def get_version(self, req, groups): schema_id = self.registry.get_id_for_schema(subject, avro_schema) result = { - "schema": json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder), + "schema": str(avro_schema), "subject": subject, "id": schema_id, "version": version @@ -164,7 +163,7 @@ def get_latest(self, req, groups): if schema_id is None: return self._create_error("Not found", 404) result = { - "schema": json.dumps(avro_schema.to_json(), cls=MappingProxyEncoder), + "schema": str(avro_schema), "subject": subject, "id": schema_id, "version": version From 67b1b9469e9271055c554056c00d7ddc93489ef9 Mon Sep 17 00:00:00 2001 From: Christopher Hoogeboom Date: Mon, 10 Aug 2020 14:26:25 -0400 Subject: [PATCH 3/3] Remove unused import --- confluent_kafka/avro/cached_schema_registry_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py index 4b376ef6b..bdbe380e2 100644 --- a/confluent_kafka/avro/cached_schema_registry_client.py +++ b/confluent_kafka/avro/cached_schema_registry_client.py @@ -19,7 +19,6 @@ # # derived from https://github.com/verisign/python-confluent-schemaregistry.git # -import json import logging import warnings from collections import defaultdict