From 717aafd454aff0108135625ade99338a01f172c8 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 20 Jan 2019 11:42:36 -0800 Subject: [PATCH 1/5] Support defining and validating schema on Python client --- pulsar-client-cpp/python/CMakeLists.txt | 2 +- pulsar-client-cpp/python/pulsar/__init__.py | 77 +++- .../python/pulsar/schema/__init__.py | 5 + .../python/pulsar/schema/definition.py | 205 +++++++++ .../python/pulsar/schema/schema.py | 85 ++++ pulsar-client-cpp/python/pulsar_test.py | 2 + pulsar-client-cpp/python/schema_test.py | 395 ++++++++++++++++++ pulsar-client-cpp/python/setup.py | 2 +- pulsar-client-cpp/python/src/config.cc | 6 + pulsar-client-cpp/python/src/enums.cc | 18 + pulsar-client-cpp/python/src/pulsar.cc | 2 + pulsar-client-cpp/python/src/schema.cc | 30 ++ 12 files changed, 817 insertions(+), 12 deletions(-) create mode 100644 pulsar-client-cpp/python/pulsar/schema/__init__.py create mode 100644 pulsar-client-cpp/python/pulsar/schema/definition.py create mode 100644 pulsar-client-cpp/python/pulsar/schema/schema.py create mode 100755 pulsar-client-cpp/python/schema_test.py create mode 100644 pulsar-client-cpp/python/src/schema.cc diff --git a/pulsar-client-cpp/python/CMakeLists.txt b/pulsar-client-cpp/python/CMakeLists.txt index 2c51f6d0b02c5..e57f58330bb7e 100644 --- a/pulsar-client-cpp/python/CMakeLists.txt +++ b/pulsar-client-cpp/python/CMakeLists.txt @@ -22,7 +22,7 @@ INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" "${PYTHON_INCLUDE_DIRS}") ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc src/consumer.cc src/config.cc src/enums.cc src/client.cc src/message.cc src/authentication.cc - src/reader.cc) + src/reader.cc src/schema.cc) SET(CMAKE_SHARED_LIBRARY_PREFIX ) SET(CMAKE_SHARED_LIBRARY_SUFFIX .so) diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index ab2cd3fa3d580..14fa00886620b 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -103,6 +103,8 @@ def send_callback(res, msg): from pulsar.functions.function import Function from pulsar.functions.context import Context from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe +from pulsar import schema +_schema = schema import re _retype = type(re.compile('x')) @@ -143,10 +145,16 @@ class Message: def data(self): """ - Returns object typed bytes with the content of the message. + Returns object typed bytes with the payload of the message. """ return self._message.data() + def value(self): + """ + Returns object with the de-serialized version of the message content + """ + return self._schema.decode(self._message.data()) + def properties(self): """ Return the properties attached to the message. Properties are @@ -206,6 +214,7 @@ def __init__(self, dynamicLibPath, authParamsString): _check_type(str, authParamsString, 'authParamsString') self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString) + class AuthenticationTLS(Authentication): """ TLS Authentication implementation @@ -241,6 +250,7 @@ def __init__(self, token): raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") self.auth = _pulsar.AuthenticationToken(token) + class AuthenticationAthenz(Authentication): """ Athenz Authentication implementation @@ -345,6 +355,7 @@ def __init__(self, service_url, def create_producer(self, topic, producer_name=None, + schema=schema.BytesSchema(), initial_sequence_id=None, send_timeout_millis=30000, compression_type=CompressionType.NONE, @@ -374,6 +385,12 @@ def create_producer(self, topic, with `Producer.producer_name()`. When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique across all Pulsar's clusters. + * `schema`: + Define the schema of the data that will be published by this producer. + The schema will be used for two purposes: + - Validate the data format against the topic defined schema + - Perform serialization/deserialization between data and objects + An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`. * `initial_sequence_id`: Set the baseline for the sequence ids for messages published by the producer. First message will be using @@ -403,6 +420,7 @@ def create_producer(self, topic, """ _check_type(str, topic, 'topic') _check_type_or_none(str, producer_name, 'producer_name') + _check_type(_schema.Schema, schema, 'schema') _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') _check_type(int, send_timeout_millis, 'send_timeout_millis') _check_type(CompressionType, compression_type, 'compression_type') @@ -434,12 +452,16 @@ def create_producer(self, topic, for k, v in properties.items(): conf.property(k, v) + conf.schema(schema.schema_info()) + p = Producer() p._producer = self._client.create_producer(topic, conf) + p._schema = schema return p def subscribe(self, topic, subscription_name, consumer_type=ConsumerType.Exclusive, + schema=schema.BytesSchema(), message_listener=None, receiver_queue_size=1000, max_total_receiver_queue_size_across_partitions=50000, @@ -466,6 +488,8 @@ def subscribe(self, topic, subscription_name, * `consumer_type`: Select the subscription type to be used when subscribing to the topic. + * `schema`: + Define the schema of the data that will be received by this consumer. * `message_listener`: Sets a message listener for the consumer. When the listener is set, the application will receive messages through it. Calls to @@ -513,6 +537,7 @@ def my_listener(consumer, message): """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') + _check_type(_schema.Schema, schema, 'schema') _check_type(int, receiver_queue_size, 'receiver_queue_size') _check_type(int, max_total_receiver_queue_size_across_partitions, 'max_total_receiver_queue_size_across_partitions') @@ -526,7 +551,7 @@ def my_listener(consumer, message): conf.consumer_type(consumer_type) conf.read_compacted(is_read_compacted) if message_listener: - conf.message_listener(message_listener) + conf.message_listener(_listener_wrapper(message_listener, schema)) conf.receiver_queue_size(receiver_queue_size) conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) if consumer_name: @@ -538,6 +563,8 @@ def my_listener(consumer, message): for k, v in properties.items(): conf.property(k, v) + conf.schema(schema.schema_info()) + c = Consumer() if isinstance(topic, str): # Single topic @@ -552,10 +579,12 @@ def my_listener(consumer, message): raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") c._client = self + c._schema = schema self._consumers.append(c) return c def create_reader(self, topic, start_message_id, + schema=schema.BytesSchema(), reader_listener=None, receiver_queue_size=1000, reader_name=None, @@ -585,6 +614,8 @@ def create_reader(self, topic, start_message_id, **Options** + * `schema`: + Define the schema of the data that will be received by this reader. * `reader_listener`: Sets a message listener for the reader. When the listener is set, the application will receive messages through it. Calls to @@ -608,21 +639,25 @@ def my_listener(reader, message): """ _check_type(str, topic, 'topic') _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') + _check_type(_schema.Schema, schema, 'schema') _check_type(int, receiver_queue_size, 'receiver_queue_size') _check_type_or_none(str, reader_name, 'reader_name') _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') conf = _pulsar.ReaderConfiguration() if reader_listener: - conf.reader_listener(reader_listener) + conf.reader_listener(_listener_wrapper(reader_listener, schema)) conf.receiver_queue_size(receiver_queue_size) if reader_name: conf.reader_name(reader_name) if subscription_role_prefix: conf.subscription_role_prefix(subscription_role_prefix) + conf.schema(schema.schema_info()) + c = Reader() c._reader = self._client.create_reader(topic, start_message_id, conf) c._client = self + c._schema = schema self._consumers.append(c) return c @@ -779,7 +814,9 @@ def close(self): def _build_msg(self, content, properties, partition_key, sequence_id, replication_clusters, disable_replication, event_timestamp): - _check_type(bytes, content, 'content') + data = self._schema.encode(content) + + _check_type(bytes, data, 'data') _check_type_or_none(dict, properties, 'properties') _check_type_or_none(str, partition_key, 'partition_key') _check_type_or_none(int, sequence_id, 'sequence_id') @@ -788,7 +825,7 @@ def _build_msg(self, content, properties, partition_key, sequence_id, _check_type_or_none(int, event_timestamp, 'event_timestamp') mb = _pulsar.MessageBuilder() - mb.content(content) + mb.content(data) if properties: for k, v in properties.items(): mb.property(k, v) @@ -848,10 +885,15 @@ def receive(self, timeout_millis=None): available within the timeout. """ if timeout_millis is None: - return self._consumer.receive() + msg = self._consumer.receive() else: _check_type(int, timeout_millis, 'timeout_millis') - return self._consumer.receive(timeout_millis) + msg = self._consumer.receive(timeout_millis) + + m = Message() + m._message = msg + m._schema = self._schema + return m def acknowledge(self, message): """ @@ -955,10 +997,15 @@ def read_next(self, timeout_millis=None): available within the timeout. """ if timeout_millis is None: - return self._reader.read_next() + msg = self._reader.read_next() else: _check_type(int, timeout_millis, 'timeout_millis') - return self._reader.read_next(timeout_millis) + msg = self._reader.read_next(timeout_millis) + + m = Message() + m._message = msg + m._schema = self._schema + return m def has_message_available(self): """ @@ -976,10 +1023,20 @@ def close(self): def _check_type(var_type, var, name): if not isinstance(var, var_type): - raise ValueError("Argument %s is expected to be of type '%s'" % (name, var_type.__name__)) + raise ValueError("Argument %s is expected to be of type '%s' and not '%s'" + % (name, var_type.__name__, type(var).__name__)) def _check_type_or_none(var_type, var, name): if var is not None and not isinstance(var, var_type): raise ValueError("Argument %s is expected to be either None or of type '%s'" % (name, var_type.__name__)) + + +def _listener_wrapper(listener, schema): + def wrapper(c, msg): + m = Message() + m._message = msg + m._schema = schema + listener(c, m) + return wrapper diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py new file mode 100644 index 0000000000000..dd5eb1e88afa5 --- /dev/null +++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py @@ -0,0 +1,5 @@ + +from .definition import * +from .schema import * + + diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py new file mode 100644 index 0000000000000..f86ebdbd9b7e2 --- /dev/null +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -0,0 +1,205 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from abc import abstractmethod, ABCMeta +from enum import Enum, EnumMeta +from collections import OrderedDict + + +def _check_record_or_field(x): + if (type(x) is type and not issubclass(x, Record)) \ + and not isinstance(x, Field): + raise Exception('Argument ' + x + ' is not a Record or a Field') + + +class RecordMeta(type): + def __new__(metacls, name, parents, dct): + if name != 'Record': + # Do not apply this logic to the base class itself + dct['_fields'] = RecordMeta._get_fields(dct) + return super().__new__(metacls, name, parents, dct) + + @classmethod + def _get_fields(cls, dct): + # Build a set of valid fields for this record + fields = OrderedDict() + for name, value in dct.items(): + if issubclass(type(value), EnumMeta): + # Wrap Python enums + value = _Enum(value) + elif type(value) == RecordMeta: + # We expect an instance of a record rather than the class itself + value = value() + + if isinstance(value, Record) or isinstance(value, Field): + fields[name] = value + return fields + + +class Record(metaclass=RecordMeta): + + def __init__(self, *args, **kwargs): + if args: + # Only allow keyword args + raise TypeError('Non-keyword arguments not allowed when initializing Records') + + for k, value in self._fields.items(): + if k in kwargs: + # Value was overridden at constructor + self.__setattr__(k, kwargs[k]) + else: + # Set field to default value + self.__setattr__(k, value.default()) + + @classmethod + def schema(cls): + schema = { + 'name': str(cls.__name__), + 'type': 'record', + 'fields': [] + } + + for name, value in cls._fields.items(): + schema['fields'].append({ + 'name': name, + 'type': value.schema() + }) + return schema + + def __setattr__(self, key, value): + if key not in self._fields: + raise AttributeError('Cannot set undeclared field ' + key + ' on record') + super(Record, self).__setattr__(key, value) + + def __eq__(self, other): + for field in self._fields: + if self.__getattribute__(field) != other.__getattribute__(field): + return False + return True + + def __str__(self): + return str(self.__dict__) + + +class Field(object): + def __init__(self, default=None): + self._default = default + + @abstractmethod + def type(self): + pass + + def schema(self): + # For primitive types, the schema would just be the type itself + return self.type() + + def default(self): + return self._default + +# All types + + +class Null(Field): + def type(self): + return 'null' + + +class Boolean(Field): + def type(self): + return 'boolean' + + +class Integer(Field): + def type(self): + return 'int' + + +class Long(Field): + def type(self): + return 'long' + + +class Float(Field): + def type(self): + return 'float' + + +class Double(Field): + def type(self): + return 'double' + + +class Bytes(Field): + def type(self): + return 'bytes' + + +class String(Field): + def type(self): + return 'string' + + +# Complex types + +class _Enum(Field): + def __init__(self, enum_type): + if not issubclass(enum_type, Enum): + raise Exception(enum_type + " is not a valid Enum type") + self.enum_type = enum_type + + def type(self): + return 'enum' + + def schema(self): + return { + 'type': self.type(), + 'name': self.enum_type.__name__, + 'symbols': [x.name for x in self.enum_type] + } + + +class Array(Field): + def __init__(self, array_type): + _check_record_or_field(array_type) + self.array_type = array_type + + def type(self): + return 'array' + + def schema(self): + return { + 'type': self.type(), + 'items': self.array_type.schema() + } + + +class Map(Field): + def __init__(self, value_type): + _check_record_or_field(value_type) + self.value_type = value_type + + def type(self): + return 'map' + + def schema(self): + return { + 'type': self.type(), + 'values': self.value_type.schema() + } + diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py new file mode 100644 index 0000000000000..d9ee74062f4a7 --- /dev/null +++ b/pulsar-client-cpp/python/pulsar/schema/schema.py @@ -0,0 +1,85 @@ + +from abc import abstractmethod +import json +import fastavro +import _pulsar +import io + + +class Schema(object): + def __init__(self, record_cls, schema_type, schema_definition, schema_name): + self._record_cls = record_cls + self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name, + json.dumps(schema_definition, indent=True)) + + @abstractmethod + def encode(self, obj): + pass + + @abstractmethod + def decode(self, data): + pass + + def schema_info(self): + return self._schema_info + + def _validate_object_type(self, obj): + if not isinstance(obj, self._record_cls): + raise TypeError('Invalid record obj of type ' + str(type(obj)) + + ' - expected type is ' + str(self._record_cls)) + + +class BytesSchema(Schema): + def __init__(self): + super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES') + + def encode(self, data): + self._validate_object_type(data) + return data + + def decode(self, data): + return data + + +class StringSchema(Schema): + def __init__(self): + super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING') + + def encode(self, obj): + self._validate_object_type(obj) + return obj.encode('utf-8') + + def decode(self, data): + return data.decode('utf-8') + + +class JsonSchema(Schema): + + def __init__(self, record_cls): + super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON, + record_cls.schema(), 'JSON') + + def encode(self, obj): + self._validate_object_type(obj) + return json.dumps(obj.__dict__, indent=True).encode('utf-8') + + def decode(self, data): + return self._record_cls(**json.loads(data)) + + +class AvroSchema(Schema): + def __init__(self, record_cls): + super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, + record_cls.schema(), 'AVRO') + self._schema = record_cls.schema() + + def encode(self, obj): + self._validate_object_type(obj) + buffer = io.BytesIO() + fastavro.schemaless_writer(buffer, self._schema, obj.__dict__) + return buffer.getvalue() + + def decode(self, data): + buffer = io.BytesIO(data) + d = fastavro.schemaless_reader(buffer, self._schema) + return self._record_cls(**d) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 2df171b22e2c2..72c47c84238fa 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -28,6 +28,8 @@ from _pulsar import ProducerConfiguration, ConsumerConfiguration +from schema_test import * + try: # For Python 3.0 and later from urllib.request import urlopen, Request diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py new file mode 100755 index 0000000000000..5bbd077571b98 --- /dev/null +++ b/pulsar-client-cpp/python/schema_test.py @@ -0,0 +1,395 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from unittest import TestCase, main +import pulsar +from pulsar.schema import * +from enum import Enum +import json + + +class SchemaTest(TestCase): + + serviceUrl = 'pulsar://localhost:6650' + + def test_simple(self): + class Color(Enum): + red = 1 + green = 2 + blue = 3 + + class Example(Record): + a = String() + b = Integer() + c = Array(String()) + d = Color + e = Boolean() + f = Float() + g = Double() + h = Bytes() + i = Map(String()) + + self.assertEqual(Example.schema(), { + "name": "Example", + "type": "record", + "fields": [ + {"name": "a", "type": "string"}, + {"name": "b", "type": "int"}, + {"name": "c", "type": { + "type": "array", + "items": "string"} + }, + {"name": "d", + "type": { + "type": "enum", + "name": "Color", + "symbols": ["red", "green", "blue"]} + }, + {"name": "e", "type": "boolean"}, + {"name": "f", "type": "float"}, + {"name": "g", "type": "double"}, + {"name": "h", "type": "bytes"}, + {"name": "i", "type": { + "type": "map", + "values": "string"} + }, + ] + }) + + def test_complex(self): + class MySubRecord(Record): + x = Integer() + y = Long() + z = String() + + class Example(Record): + a = String() + sub = MySubRecord # Test with class + sub2 = MySubRecord() # Test with instance + + self.assertEqual(Example.schema(), { + "name": "Example", + "type": "record", + "fields": [ + {"name": "a", "type": "string"}, + {"name": "sub", + "type": { + "name": "MySubRecord", + "type": "record", + "fields": [{"name": "x", "type": "int"}, + {"name": "y", "type": "long"}, + {"name": "z", "type": "string"}] + } + }, + {"name": "sub2", + "type": { + "name": "MySubRecord", + "type": "record", + "fields": [{"name": "x", "type": "int"}, + {"name": "y", "type": "long"}, + {"name": "z", "type": "string"}] + } + } + ] + }) + + def test_invalid_enum(self): + class Color: + red = 1 + green = 2 + blue = 3 + + class InvalidEnum(Record): + a = Integer() + b = Color + + # Enum will be ignored + self.assertEqual(InvalidEnum.schema(), + {'name': 'InvalidEnum', 'type': 'record', 'fields': [{'name': 'a', 'type': 'int'}]}) + + def test_initialization(self): + class Example(Record): + a = Integer() + b = Integer() + + r = Example(a=1, b=2) + self.assertEqual(r.a, 1) + self.assertEqual(r.b, 2) + + r.b = 5 + + self.assertEqual(r.b, 5) + + # Setting non-declared field should fail + try: + r.c = 3 + self.fail('Should have failed') + except AttributeError: + # Expected + pass + + try: + Record(a=1, c=8) + self.fail('Should have failed') + except AttributeError: + # Expected + pass + + try: + Record('xyz', a=1, b=2) + self.fail('Should have failed') + except TypeError: + # Expected + pass + + def test_serialize_json(self): + class Example(Record): + a = Integer() + b = Integer() + + self.assertEqual(Example.schema(), { + "name": "Example", + "type": "record", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "int"}, + ] + }) + + s = JsonSchema(Example) + r = Example(a=1, b=2) + data = s.encode(r) + self.assertEqual(json.loads(data), {'a': 1, 'b': 2}) + + r2 = s.decode(data) + self.assertEqual(r2.__class__.__name__, 'Example') + self.assertEqual(r2, r) + + def test_serialize_avro(self): + class Example(Record): + a = Integer() + b = Integer() + + self.assertEqual(Example.schema(), { + "name": "Example", + "type": "record", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "int"}, + ] + }) + + s = AvroSchema(Example) + r = Example(a=1, b=2) + data = s.encode(r) + + r2 = s.decode(data) + self.assertEqual(r2.__class__.__name__, 'Example') + self.assertEqual(r2, r) + + def test_serialize_wrong_types(self): + class Example(Record): + a = Integer() + b = Integer() + + class Foo(Record): + x = Integer() + y = Integer() + + s = JsonSchema(Example) + try: + data = s.encode(Foo(x=1, y=2)) + self.fail('Should have failed') + except TypeError: + pass # expected + + try: + data = s.encode('hello') + self.fail('Should have failed') + except TypeError: + pass # expected + + def test_defaults(self): + class Example(Record): + a = Integer(default=5) + b = Integer() + c = String(default='hello') + + r = Example() + self.assertEqual(r.a, 5) + self.assertEqual(r.b, None) + self.assertEqual(r.c, 'hello') + + #### + + def test_json_schema(self): + + class Example(Record): + a = Integer() + b = Integer() + + # Incompatible variation of the class + class BadExample(Record): + a = String() + b = Integer() + + client = pulsar.Client(self.serviceUrl) + producer = client.create_producer( + 'my-json-python-topic', + schema=JsonSchema(Example)) + + + # Validate that incompatible schema is rejected + try: + client.subscribe('my-json-python-topic', 'sub-1', + schema=JsonSchema(BadExample)) + self.fail('Should have failed') + except Exception as e: + pass # Expected + + try: + client.subscribe('my-json-python-topic', 'sub-1', + schema=StringSchema(BadExample)) + self.fail('Should have failed') + except Exception as e: + pass # Expected + + try: + client.subscribe('my-json-python-topic', 'sub-1', + schema=AvroSchema(BadExample)) + self.fail('Should have failed') + except Exception as e: + pass # Expected + + consumer = client.subscribe('my-json-python-topic', 'sub-1', + schema=JsonSchema(Example)) + + r = Example(a=1, b=2) + producer.send(r) + + msg = consumer.receive() + + self.assertEqual(r, msg.value()) + client.close() + + def test_string_schema(self): + client = pulsar.Client(self.serviceUrl) + producer = client.create_producer( + 'my-string-python-topic', + schema=StringSchema()) + + + # Validate that incompatible schema is rejected + try: + class Example(Record): + a = Integer() + b = Integer() + + client.create_producer('my-string-python-topic', + schema=JsonSchema(Example)) + self.fail('Should have failed') + except Exception as e: + pass # Expected + + consumer = client.subscribe('my-string-python-topic', 'sub-1', + schema=StringSchema()) + + producer.send("Hello") + + msg = consumer.receive() + + self.assertEqual("Hello", msg.value()) + self.assertEqual(b"Hello", msg.data()) + client.close() + + + def test_bytes_schema(self): + client = pulsar.Client(self.serviceUrl) + producer = client.create_producer( + 'my-bytes-python-topic', + schema=BytesSchema()) + + + # Validate that incompatible schema is rejected + try: + class Example(Record): + a = Integer() + b = Integer() + + client.create_producer('my-bytes-python-topic', + schema=JsonSchema(Example)) + self.fail('Should have failed') + except Exception as e: + pass # Expected + + consumer = client.subscribe('my-bytes-python-topic', 'sub-1', + schema=BytesSchema()) + + producer.send(b"Hello") + + msg = consumer.receive() + + self.assertEqual(b"Hello", msg.value()) + client.close() + + def test_avro_schema(self): + + class Example(Record): + a = Integer() + b = Integer() + + # Incompatible variation of the class + class BadExample(Record): + a = String() + b = Integer() + + client = pulsar.Client(self.serviceUrl) + producer = client.create_producer( + 'my-avro-python-topic', + schema=AvroSchema(Example)) + + # Validate that incompatible schema is rejected + try: + client.subscribe('my-avro-python-topic', 'sub-1', + schema=AvroSchema(BadExample)) + self.fail('Should have failed') + except Exception as e: + pass # Expected + + try: + client.subscribe('my-avro-python-topic', 'sub-2', + schema=JsonSchema(Example)) + self.fail('Should have failed') + except Exception as e: + pass # Expected + + consumer = client.subscribe('my-avro-python-topic', 'sub-3', + schema=AvroSchema(Example)) + + r = Example(a=1, b=2) + producer.send(r) + + msg = consumer.receive() + + self.assertEqual(r, msg.value()) + client.close() + +if __name__ == '__main__': + main() diff --git a/pulsar-client-cpp/python/setup.py b/pulsar-client-cpp/python/setup.py index ce8d2594cf953..e2e876e5c66ee 100644 --- a/pulsar-client-cpp/python/setup.py +++ b/pulsar-client-cpp/python/setup.py @@ -60,7 +60,7 @@ def build_extension(self, ext): setup( name="pulsar-client", version=VERSION, - packages=['pulsar', 'pulsar.functions'], + packages=['pulsar', 'pulsar.schema', 'pulsar.functions'], cmdclass={'build_ext': my_build_ext}, ext_modules=[Extension('_pulsar', [])], diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index c1bfeef9c8ee4..9b1d6d04e5da4 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -100,6 +100,8 @@ void export_config() { class_("ProducerConfiguration") .def("producer_name", &ProducerConfiguration::getProducerName, return_value_policy()) .def("producer_name", &ProducerConfiguration::setProducerName, return_self<>()) + .def("schema", &ProducerConfiguration::getSchema, return_value_policy()) + .def("schema", &ProducerConfiguration::setSchema, return_self<>()) .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout) .def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_self<>()) .def("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId) @@ -128,6 +130,8 @@ void export_config() { class_("ConsumerConfiguration") .def("consumer_type", &ConsumerConfiguration::getConsumerType) .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>()) + .def("schema", &ConsumerConfiguration::getSchema, return_value_policy()) + .def("schema", &ConsumerConfiguration::setSchema, return_self<>()) .def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>()) .def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize) .def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize) @@ -148,6 +152,8 @@ void export_config() { class_("ReaderConfiguration") .def("message_listener", &ReaderConfiguration_setReaderListener, return_self<>()) + .def("schema", &ReaderConfiguration::getSchema, return_value_policy()) + .def("schema", &ReaderConfiguration::setSchema, return_self<>()) .def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize) .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize) .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy()) diff --git a/pulsar-client-cpp/python/src/enums.cc b/pulsar-client-cpp/python/src/enums.cc index aeb5c0c73aae9..aa60ab41bfc37 100644 --- a/pulsar-client-cpp/python/src/enums.cc +++ b/pulsar-client-cpp/python/src/enums.cc @@ -75,4 +75,22 @@ void export_enums() { .value("UnsupportedVersionError", ResultUnsupportedVersionError) ; + enum_("SchemaType", "Supported schema types") + .value("NONE", NONE) + .value("STRING", STRING) + .value("INT8", INT8) + .value("INT16", INT16) + .value("INT32", INT32) + .value("INT64", INT64) + .value("FLOAT", FLOAT) + .value("DOUBLE", DOUBLE) + .value("BYTES", BYTES) + .value("JSON", JSON) + .value("PROTOBUF", PROTOBUF) + .value("AVRO", AVRO) + .value("AUTO_CONSUME", AUTO_CONSUME) + .value("AUTO_PUBLISH", AUTO_PUBLISH) + .value("KEY_VALUE", KEY_VALUE) + ; + } diff --git a/pulsar-client-cpp/python/src/pulsar.cc b/pulsar-client-cpp/python/src/pulsar.cc index f3ceefd1cc0e5..b26a25208b16c 100644 --- a/pulsar-client-cpp/python/src/pulsar.cc +++ b/pulsar-client-cpp/python/src/pulsar.cc @@ -26,6 +26,7 @@ void export_reader(); void export_config(); void export_enums(); void export_authentication(); +void export_schema(); static void translateException(const PulsarException& ex) { @@ -51,4 +52,5 @@ BOOST_PYTHON_MODULE(_pulsar) export_config(); export_enums(); export_authentication(); + export_schema(); } diff --git a/pulsar-client-cpp/python/src/schema.cc b/pulsar-client-cpp/python/src/schema.cc new file mode 100644 index 0000000000000..397ec658d23da --- /dev/null +++ b/pulsar-client-cpp/python/src/schema.cc @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "utils.h" + +void export_schema() { + using namespace boost::python; + + class_("SchemaInfo", + init()) + .def("schema_type", &SchemaInfo::getSchemaType) + .def("name", &SchemaInfo::getName, return_value_policy()) + .def("schema", &SchemaInfo::getSchema, return_value_policy()) + ; +} From f03a0d77ba4837787dd8be29cb3c196f46009058 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 21 Jan 2019 07:17:17 -0800 Subject: [PATCH 2/5] Added missing license headers --- .../python/pulsar/schema/__init__.py | 19 +++++++++++++++++++ .../python/pulsar/schema/schema.py | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py index dd5eb1e88afa5..096e64a7d9a69 100644 --- a/pulsar-client-cpp/python/pulsar/schema/__init__.py +++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + from .definition import * from .schema import * diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py index d9ee74062f4a7..91250a8492289 100644 --- a/pulsar-client-cpp/python/pulsar/schema/schema.py +++ b/pulsar-client-cpp/python/pulsar/schema/schema.py @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + from abc import abstractmethod import json From 081588ab2eacdc6e9a8741ffaebb08ae2daddc47 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 24 Jan 2019 12:24:59 -0800 Subject: [PATCH 3/5] Fixed metaclass declaration to work for py2 and py3 --- pulsar-client-cpp/python/pulsar/schema/definition.py | 3 ++- pulsar-client-cpp/python/setup.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py index f86ebdbd9b7e2..47bc9c856a8ee 100644 --- a/pulsar-client-cpp/python/pulsar/schema/definition.py +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -20,6 +20,7 @@ from abc import abstractmethod, ABCMeta from enum import Enum, EnumMeta from collections import OrderedDict +from six import with_metaclass def _check_record_or_field(x): @@ -52,7 +53,7 @@ def _get_fields(cls, dct): return fields -class Record(metaclass=RecordMeta): +class Record(with_metaclass(RecordMeta, object)): def __init__(self, *args, **kwargs): if args: diff --git a/pulsar-client-cpp/python/setup.py b/pulsar-client-cpp/python/setup.py index e2e876e5c66ee..637ee663c1120 100644 --- a/pulsar-client-cpp/python/setup.py +++ b/pulsar-client-cpp/python/setup.py @@ -71,6 +71,7 @@ def build_extension(self, ext): url="http://pulsar.apache.org/", install_requires=[ 'grpcio', 'protobuf', + 'six', # functions dependencies "prometheus_client", "ratelimit" From ef7126deb1943a1a7321da8058fd7a03bf28df43 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 24 Jan 2019 16:54:04 -0800 Subject: [PATCH 4/5] More python2 compat fixes --- .../python/pulsar/schema/definition.py | 2 +- pulsar-client-cpp/python/setup.py | 28 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py index 47bc9c856a8ee..4658c03c0dd61 100644 --- a/pulsar-client-cpp/python/pulsar/schema/definition.py +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -34,7 +34,7 @@ def __new__(metacls, name, parents, dct): if name != 'Record': # Do not apply this logic to the base class itself dct['_fields'] = RecordMeta._get_fields(dct) - return super().__new__(metacls, name, parents, dct) + return type.__new__(metacls, name, parents, dct) @classmethod def _get_fields(cls, dct): diff --git a/pulsar-client-cpp/python/setup.py b/pulsar-client-cpp/python/setup.py index 637ee663c1120..becc9f2d3c41b 100644 --- a/pulsar-client-cpp/python/setup.py +++ b/pulsar-client-cpp/python/setup.py @@ -20,6 +20,7 @@ from setuptools import setup from distutils.core import Extension import subprocess +import sys from distutils.command import build_ext @@ -41,6 +42,10 @@ def get_version(): VERSION = get_version() +if sys.version_info[0] == 2: + PY2 = True +else: + PY2 = False # This is a workaround to have setuptools to include # the already compiled _pulsar.so library @@ -57,6 +62,19 @@ def build_extension(self, ext): shutil.copyfile('_pulsar.so', self.get_ext_fullpath(ext.name)) +dependencies = [ + 'grpcio', 'protobuf', + 'six', + + # functions dependencies + "prometheus_client", + "ratelimit" +] + +if PY2: + # Python 2 compat dependencies + dependencies += ['enum34'] + setup( name="pulsar-client", version=VERSION, @@ -68,12 +86,6 @@ def build_extension(self, ext): author_email="dev@pulsar.apache.org", description="Apache Pulsar Python client library", license="Apache License v2.0", - url="http://pulsar.apache.org/", - install_requires=[ - 'grpcio', 'protobuf', - 'six', - # functions dependencies - "prometheus_client", - "ratelimit" - ], + url="https://pulsar.apache.org/", + install_requires=dependencies, ) From 9c41975362afad404e942d24394a0b600612bb39 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 25 Jan 2019 11:32:45 -0800 Subject: [PATCH 5/5] Added missing pip dependency on fastavro --- pulsar-client-cpp/python/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client-cpp/python/setup.py b/pulsar-client-cpp/python/setup.py index becc9f2d3c41b..db165aeb6ca41 100644 --- a/pulsar-client-cpp/python/setup.py +++ b/pulsar-client-cpp/python/setup.py @@ -65,6 +65,7 @@ def build_extension(self, ext): dependencies = [ 'grpcio', 'protobuf', 'six', + 'fastavro', # functions dependencies "prometheus_client",