Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pulsar-client-cpp/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" "${PYTHON_INCLUDE_DIRS}")
ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc src/consumer.cc
src/config.cc src/enums.cc src/client.cc
src/message.cc src/authentication.cc
src/reader.cc)
src/reader.cc src/schema.cc)
SET(CMAKE_SHARED_LIBRARY_PREFIX )
SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)

Expand Down
77 changes: 67 additions & 10 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def send_callback(res, msg):
from pulsar.functions.function import Function
from pulsar.functions.context import Context
from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
from pulsar import schema
_schema = schema

import re
_retype = type(re.compile('x'))
Expand Down Expand Up @@ -143,10 +145,16 @@ class Message:

def data(self):
"""
Returns object typed bytes with the content of the message.
Returns object typed bytes with the payload of the message.
"""
return self._message.data()

def value(self):
"""
Returns object with the de-serialized version of the message content
"""
return self._schema.decode(self._message.data())

def properties(self):
"""
Return the properties attached to the message. Properties are
Expand Down Expand Up @@ -206,6 +214,7 @@ def __init__(self, dynamicLibPath, authParamsString):
_check_type(str, authParamsString, 'authParamsString')
self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)


class AuthenticationTLS(Authentication):
"""
TLS Authentication implementation
Expand Down Expand Up @@ -241,6 +250,7 @@ def __init__(self, token):
raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
self.auth = _pulsar.AuthenticationToken(token)


class AuthenticationAthenz(Authentication):
"""
Athenz Authentication implementation
Expand Down Expand Up @@ -345,6 +355,7 @@ def __init__(self, service_url,

def create_producer(self, topic,
producer_name=None,
schema=schema.BytesSchema(),
initial_sequence_id=None,
send_timeout_millis=30000,
compression_type=CompressionType.NONE,
Expand Down Expand Up @@ -374,6 +385,12 @@ def create_producer(self, topic,
with `Producer.producer_name()`. When specifying a name, it is app to
the user to ensure that, for a given topic, the producer name is unique
across all Pulsar's clusters.
* `schema`:
Define the schema of the data that will be published by this producer.
The schema will be used for two purposes:
- Validate the data format against the topic defined schema
- Perform serialization/deserialization between data and objects
An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
* `initial_sequence_id`:
Set the baseline for the sequence ids for messages
published by the producer. First message will be using
Expand Down Expand Up @@ -405,6 +422,7 @@ def create_producer(self, topic,
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
_check_type(_schema.Schema, schema, 'schema')
_check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
_check_type(int, send_timeout_millis, 'send_timeout_millis')
_check_type(CompressionType, compression_type, 'compression_type')
Expand Down Expand Up @@ -436,12 +454,16 @@ def create_producer(self, topic,
for k, v in properties.items():
conf.property(k, v)

conf.schema(schema.schema_info())

p = Producer()
p._producer = self._client.create_producer(topic, conf)
p._schema = schema
return p

def subscribe(self, topic, subscription_name,
consumer_type=ConsumerType.Exclusive,
schema=schema.BytesSchema(),
message_listener=None,
receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000,
Expand All @@ -468,6 +490,8 @@ def subscribe(self, topic, subscription_name,

* `consumer_type`:
Select the subscription type to be used when subscribing to the topic.
* `schema`:
Define the schema of the data that will be received by this consumer.
* `message_listener`:
Sets a message listener for the consumer. When the listener is set,
the application will receive messages through it. Calls to
Expand Down Expand Up @@ -515,6 +539,7 @@ def my_listener(consumer, message):
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type(int, max_total_receiver_queue_size_across_partitions,
'max_total_receiver_queue_size_across_partitions')
Expand All @@ -528,7 +553,7 @@ def my_listener(consumer, message):
conf.consumer_type(consumer_type)
conf.read_compacted(is_read_compacted)
if message_listener:
conf.message_listener(message_listener)
conf.message_listener(_listener_wrapper(message_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
if consumer_name:
Expand All @@ -540,6 +565,8 @@ def my_listener(consumer, message):
for k, v in properties.items():
conf.property(k, v)

conf.schema(schema.schema_info())

c = Consumer()
if isinstance(topic, str):
# Single topic
Expand All @@ -554,10 +581,12 @@ def my_listener(consumer, message):
raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")

c._client = self
c._schema = schema
self._consumers.append(c)
return c

def create_reader(self, topic, start_message_id,
schema=schema.BytesSchema(),
reader_listener=None,
receiver_queue_size=1000,
reader_name=None,
Expand Down Expand Up @@ -587,6 +616,8 @@ def create_reader(self, topic, start_message_id,

**Options**

* `schema`:
Define the schema of the data that will be received by this reader.
* `reader_listener`:
Sets a message listener for the reader. When the listener is set,
the application will receive messages through it. Calls to
Expand All @@ -610,21 +641,25 @@ def my_listener(reader, message):
"""
_check_type(str, topic, 'topic')
_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type_or_none(str, reader_name, 'reader_name')
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')

conf = _pulsar.ReaderConfiguration()
if reader_listener:
conf.reader_listener(reader_listener)
conf.reader_listener(_listener_wrapper(reader_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
if reader_name:
conf.reader_name(reader_name)
if subscription_role_prefix:
conf.subscription_role_prefix(subscription_role_prefix)
conf.schema(schema.schema_info())

c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
c._client = self
c._schema = schema
self._consumers.append(c)
return c

Expand Down Expand Up @@ -781,7 +816,9 @@ def close(self):

def _build_msg(self, content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp):
_check_type(bytes, content, 'content')
data = self._schema.encode(content)

_check_type(bytes, data, 'data')
_check_type_or_none(dict, properties, 'properties')
_check_type_or_none(str, partition_key, 'partition_key')
_check_type_or_none(int, sequence_id, 'sequence_id')
Expand All @@ -790,7 +827,7 @@ def _build_msg(self, content, properties, partition_key, sequence_id,
_check_type_or_none(int, event_timestamp, 'event_timestamp')

mb = _pulsar.MessageBuilder()
mb.content(content)
mb.content(data)
if properties:
for k, v in properties.items():
mb.property(k, v)
Expand Down Expand Up @@ -850,10 +887,15 @@ def receive(self, timeout_millis=None):
available within the timeout.
"""
if timeout_millis is None:
return self._consumer.receive()
msg = self._consumer.receive()
else:
_check_type(int, timeout_millis, 'timeout_millis')
return self._consumer.receive(timeout_millis)
msg = self._consumer.receive(timeout_millis)

m = Message()
m._message = msg
m._schema = self._schema
return m

def acknowledge(self, message):
"""
Expand Down Expand Up @@ -957,10 +999,15 @@ def read_next(self, timeout_millis=None):
available within the timeout.
"""
if timeout_millis is None:
return self._reader.read_next()
msg = self._reader.read_next()
else:
_check_type(int, timeout_millis, 'timeout_millis')
return self._reader.read_next(timeout_millis)
msg = self._reader.read_next(timeout_millis)

m = Message()
m._message = msg
m._schema = self._schema
return m

def has_message_available(self):
"""
Expand All @@ -978,10 +1025,20 @@ def close(self):

def _check_type(var_type, var, name):
if not isinstance(var, var_type):
raise ValueError("Argument %s is expected to be of type '%s'" % (name, var_type.__name__))
raise ValueError("Argument %s is expected to be of type '%s' and not '%s'"
% (name, var_type.__name__, type(var).__name__))


def _check_type_or_none(var_type, var, name):
if var is not None and not isinstance(var, var_type):
raise ValueError("Argument %s is expected to be either None or of type '%s'"
% (name, var_type.__name__))


def _listener_wrapper(listener, schema):
def wrapper(c, msg):
m = Message()
m._message = msg
m._schema = schema
listener(c, m)
return wrapper
24 changes: 24 additions & 0 deletions pulsar-client-cpp/python/pulsar/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#


from .definition import *
from .schema import *


Loading