diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 96a9f6cf80a5b..ea536efe8212f 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -102,7 +102,14 @@ def send_callback(res, msg_id): import logging import _pulsar -from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401 +from _pulsar import ( + Result, + CompressionType, + ConsumerType, + InitialPosition, + PartitionsRoutingMode, + BatchingType, +) # noqa: F401 from pulsar.exceptions import * @@ -110,10 +117,12 @@ def send_callback(res, msg_id): from pulsar.functions.context import Context from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe from pulsar import schema + _schema = schema import re -_retype = type(re.compile('x')) + +_retype = type(re.compile("x")) import certifi from datetime import timedelta @@ -127,10 +136,10 @@ class MessageId: def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) - 'Represents the earliest message stored in a topic' + "Represents the earliest message stored in a topic" earliest = _pulsar.MessageId.earliest - 'Represents the latest message published on a topic' + "Represents the latest message published on a topic" latest = _pulsar.MessageId.latest def ledger_id(self): @@ -237,7 +246,6 @@ def _wrap(_message): class MessageBatch: - def __init__(self): self._msg_batch = _pulsar.MessageBatch() @@ -261,6 +269,7 @@ class Authentication: Authentication provider object. Used to load authentication from an external shared library. """ + def __init__(self, dynamicLibPath, authParamsString): """ Create the authentication provider instance. @@ -272,8 +281,8 @@ def __init__(self, dynamicLibPath, authParamsString): * `authParamsString`: Comma-separated list of provider-specific configuration params """ - _check_type(str, dynamicLibPath, 'dynamicLibPath') - _check_type(str, authParamsString, 'authParamsString') + _check_type(str, dynamicLibPath, "dynamicLibPath") + _check_type(str, authParamsString, "authParamsString") self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString) @@ -281,6 +290,7 @@ class AuthenticationTLS(Authentication): """ TLS Authentication implementation """ + def __init__(self, certificate_path, private_key_path): """ Create the TLS authentication provider instance. @@ -290,8 +300,8 @@ def __init__(self, certificate_path, private_key_path): * `certificatePath`: Path to the public certificate * `privateKeyPath`: Path to private TLS key """ - _check_type(str, certificate_path, 'certificate_path') - _check_type(str, private_key_path, 'private_key_path') + _check_type(str, certificate_path, "certificate_path") + _check_type(str, private_key_path, "private_key_path") self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path) @@ -299,6 +309,7 @@ class AuthenticationToken(Authentication): """ Token based authentication implementation """ + def __init__(self, token): """ Create the token authentication provider instance. @@ -317,6 +328,7 @@ class AuthenticationAthenz(Authentication): """ Athenz Authentication implementation """ + def __init__(self, auth_params_string): """ Create the Athenz authentication provider instance. @@ -325,13 +337,15 @@ def __init__(self, auth_params_string): * `auth_params_string`: JSON encoded configuration for Athenz client """ - _check_type(str, auth_params_string, 'auth_params_string') + _check_type(str, auth_params_string, "auth_params_string") self.auth = _pulsar.AuthenticationAthenz(auth_params_string) + class AuthenticationOauth2(Authentication): """ Oauth2 Authentication implementation """ + def __init__(self, auth_params_string): """ Create the Oauth2 authentication provider instance. @@ -340,9 +354,10 @@ def __init__(self, auth_params_string): * `auth_params_string`: JSON encoded configuration for Oauth2 client """ - _check_type(str, auth_params_string, 'auth_params_string') + _check_type(str, auth_params_string, "auth_params_string") self.auth = _pulsar.AuthenticationOauth2(auth_params_string) + class Client: """ The Pulsar client. A single client instance can be used to create producers @@ -352,20 +367,22 @@ class Client: producers and consumers. """ - def __init__(self, service_url, - authentication=None, - operation_timeout_seconds=30, - io_threads=1, - message_listener_threads=1, - concurrent_lookup_requests=50000, - log_conf_file_path=None, - use_tls=False, - tls_trust_certs_file_path=None, - tls_allow_insecure_connection=False, - tls_validate_hostname=False, - logger=None, - connection_timeout_ms=10000, - ): + def __init__( + self, + service_url, + authentication=None, + operation_timeout_seconds=30, + io_threads=1, + message_listener_threads=1, + concurrent_lookup_requests=50000, + log_conf_file_path=None, + use_tls=False, + tls_trust_certs_file_path=None, + tls_allow_insecure_connection=False, + tls_validate_hostname=False, + logger=None, + connection_timeout_ms=10000, + ): """ Create a new Pulsar client instance. @@ -413,19 +430,19 @@ def __init__(self, service_url, * `connection_timeout_ms`: Set timeout in milliseconds on TCP connections. """ - _check_type(str, service_url, 'service_url') - _check_type_or_none(Authentication, authentication, 'authentication') - _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') - _check_type(int, connection_timeout_ms, 'connection_timeout_ms') - _check_type(int, io_threads, 'io_threads') - _check_type(int, message_listener_threads, 'message_listener_threads') - _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') - _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') - _check_type(bool, use_tls, 'use_tls') - _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') - _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') - _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') - _check_type_or_none(logging.Logger, logger, 'logger') + _check_type(str, service_url, "service_url") + _check_type_or_none(Authentication, authentication, "authentication") + _check_type(int, operation_timeout_seconds, "operation_timeout_seconds") + _check_type(int, connection_timeout_ms, "connection_timeout_ms") + _check_type(int, io_threads, "io_threads") + _check_type(int, message_listener_threads, "message_listener_threads") + _check_type(int, concurrent_lookup_requests, "concurrent_lookup_requests") + _check_type_or_none(str, log_conf_file_path, "log_conf_file_path") + _check_type(bool, use_tls, "use_tls") + _check_type_or_none(str, tls_trust_certs_file_path, "tls_trust_certs_file_path") + _check_type(bool, tls_allow_insecure_connection, "tls_allow_insecure_connection") + _check_type(bool, tls_validate_hostname, "tls_validate_hostname") + _check_type_or_none(logging.Logger, logger, "logger") conf = _pulsar.ClientConfiguration() if authentication: @@ -439,7 +456,7 @@ def __init__(self, service_url, conf.log_conf_file_path(log_conf_file_path) if logger: conf.set_logger(logger) - if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): + if use_tls or service_url.startswith("pulsar+ssl://") or service_url.startswith("https://"): conf.use_tls(True) if tls_trust_certs_file_path: conf.tls_trust_certs_file_path(tls_trust_certs_file_path) @@ -450,26 +467,28 @@ def __init__(self, service_url, self._client = _pulsar.Client(service_url, conf) self._consumers = [] - def create_producer(self, topic, - producer_name=None, - schema=schema.BytesSchema(), - initial_sequence_id=None, - send_timeout_millis=30000, - compression_type=CompressionType.NONE, - max_pending_messages=1000, - max_pending_messages_across_partitions=50000, - block_if_queue_full=False, - batching_enabled=False, - batching_max_messages=1000, - batching_max_allowed_size_in_bytes=128*1024, - batching_max_publish_delay_ms=10, - message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, - lazy_start_partitioned_producers=False, - properties=None, - batching_type=BatchingType.Default, - encryption_key=None, - crypto_key_reader=None - ): + def create_producer( + self, + topic, + producer_name=None, + schema=schema.BytesSchema(), + initial_sequence_id=None, + send_timeout_millis=30000, + compression_type=CompressionType.NONE, + max_pending_messages=1000, + max_pending_messages_across_partitions=50000, + block_if_queue_full=False, + batching_enabled=False, + batching_max_messages=1000, + batching_max_allowed_size_in_bytes=128 * 1024, + batching_max_publish_delay_ms=10, + message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, + lazy_start_partitioned_producers=False, + properties=None, + batching_type=BatchingType.Default, + encryption_key=None, + crypto_key_reader=None, + ): """ Create a new producer on a given topic. @@ -553,24 +572,24 @@ def create_producer(self, topic, Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer """ - _check_type(str, topic, 'topic') - _check_type_or_none(str, producer_name, 'producer_name') - _check_type(_schema.Schema, schema, 'schema') - _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') - _check_type(int, send_timeout_millis, 'send_timeout_millis') - _check_type(CompressionType, compression_type, 'compression_type') - _check_type(int, max_pending_messages, 'max_pending_messages') - _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') - _check_type(bool, block_if_queue_full, 'block_if_queue_full') - _check_type(bool, batching_enabled, 'batching_enabled') - _check_type(int, batching_max_messages, 'batching_max_messages') - _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') - _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') - _check_type_or_none(dict, properties, 'properties') - _check_type(BatchingType, batching_type, 'batching_type') - _check_type_or_none(str, encryption_key, 'encryption_key') - _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') - _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') + _check_type(str, topic, "topic") + _check_type_or_none(str, producer_name, "producer_name") + _check_type(_schema.Schema, schema, "schema") + _check_type_or_none(int, initial_sequence_id, "initial_sequence_id") + _check_type(int, send_timeout_millis, "send_timeout_millis") + _check_type(CompressionType, compression_type, "compression_type") + _check_type(int, max_pending_messages, "max_pending_messages") + _check_type(int, max_pending_messages_across_partitions, "max_pending_messages_across_partitions") + _check_type(bool, block_if_queue_full, "block_if_queue_full") + _check_type(bool, batching_enabled, "batching_enabled") + _check_type(int, batching_max_messages, "batching_max_messages") + _check_type(int, batching_max_allowed_size_in_bytes, "batching_max_allowed_size_in_bytes") + _check_type(int, batching_max_publish_delay_ms, "batching_max_publish_delay_ms") + _check_type_or_none(dict, properties, "properties") + _check_type(BatchingType, batching_type, "batching_type") + _check_type_or_none(str, encryption_key, "encryption_key") + _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") + _check_type(bool, lazy_start_partitioned_producers, "lazy_start_partitioned_producers") conf = _pulsar.ProducerConfiguration() conf.send_timeout_millis(send_timeout_millis) @@ -605,23 +624,26 @@ def create_producer(self, topic, p._client = self._client return p - def subscribe(self, topic, subscription_name, - consumer_type=ConsumerType.Exclusive, - schema=schema.BytesSchema(), - message_listener=None, - receiver_queue_size=1000, - max_total_receiver_queue_size_across_partitions=50000, - consumer_name=None, - unacked_messages_timeout_ms=None, - broker_consumer_stats_cache_time_ms=30000, - negative_ack_redelivery_delay_ms=60000, - is_read_compacted=False, - properties=None, - pattern_auto_discovery_period=60, - initial_position=InitialPosition.Latest, - crypto_key_reader=None, - replicate_subscription_state_enabled=False - ): + def subscribe( + self, + topic, + subscription_name, + consumer_type=ConsumerType.Exclusive, + schema=schema.BytesSchema(), + message_listener=None, + receiver_queue_size=1000, + max_total_receiver_queue_size_across_partitions=50000, + consumer_name=None, + unacked_messages_timeout_ms=None, + broker_consumer_stats_cache_time_ms=30000, + negative_ack_redelivery_delay_ms=60000, + is_read_compacted=False, + properties=None, + pattern_auto_discovery_period=60, + initial_position=InitialPosition.Latest, + crypto_key_reader=None, + replicate_subscription_state_enabled=False, + ): """ Subscribe to the given topic and subscription combination. @@ -700,21 +722,22 @@ def my_listener(consumer, message): Set whether the subscription status should be replicated. Default: `False`. """ - _check_type(str, subscription_name, 'subscription_name') - _check_type(ConsumerType, consumer_type, 'consumer_type') - _check_type(_schema.Schema, schema, 'schema') - _check_type(int, receiver_queue_size, 'receiver_queue_size') - _check_type(int, max_total_receiver_queue_size_across_partitions, - 'max_total_receiver_queue_size_across_partitions') - _check_type_or_none(str, consumer_name, 'consumer_name') - _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') - _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') - _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') - _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') - _check_type(bool, is_read_compacted, 'is_read_compacted') - _check_type_or_none(dict, properties, 'properties') - _check_type(InitialPosition, initial_position, 'initial_position') - _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') + _check_type(str, subscription_name, "subscription_name") + _check_type(ConsumerType, consumer_type, "consumer_type") + _check_type(_schema.Schema, schema, "schema") + _check_type(int, receiver_queue_size, "receiver_queue_size") + _check_type( + int, max_total_receiver_queue_size_across_partitions, "max_total_receiver_queue_size_across_partitions" + ) + _check_type_or_none(str, consumer_name, "consumer_name") + _check_type_or_none(int, unacked_messages_timeout_ms, "unacked_messages_timeout_ms") + _check_type(int, broker_consumer_stats_cache_time_ms, "broker_consumer_stats_cache_time_ms") + _check_type(int, negative_ack_redelivery_delay_ms, "negative_ack_redelivery_delay_ms") + _check_type(int, pattern_auto_discovery_period, "pattern_auto_discovery_period") + _check_type(bool, is_read_compacted, "is_read_compacted") + _check_type_or_none(dict, properties, "properties") + _check_type(InitialPosition, initial_position, "initial_position") + _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -760,15 +783,18 @@ def my_listener(consumer, message): self._consumers.append(c) return c - def create_reader(self, topic, start_message_id, - schema=schema.BytesSchema(), - reader_listener=None, - receiver_queue_size=1000, - reader_name=None, - subscription_role_prefix=None, - is_read_compacted=False, - crypto_key_reader=None - ): + def create_reader( + self, + topic, + start_message_id, + schema=schema.BytesSchema(), + reader_listener=None, + receiver_queue_size=1000, + reader_name=None, + subscription_role_prefix=None, + is_read_compacted=False, + crypto_key_reader=None, + ): """ Create a reader on a particular topic @@ -821,14 +847,14 @@ def my_listener(reader, message): Symmetric encryption class implementation, configuring public key encryption messages for the producer and private key decryption messages for the consumer """ - _check_type(str, topic, 'topic') - _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') - _check_type(_schema.Schema, schema, 'schema') - _check_type(int, receiver_queue_size, 'receiver_queue_size') - _check_type_or_none(str, reader_name, 'reader_name') - _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') - _check_type(bool, is_read_compacted, 'is_read_compacted') - _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') + _check_type(str, topic, "topic") + _check_type(_pulsar.MessageId, start_message_id, "start_message_id") + _check_type(_schema.Schema, schema, "schema") + _check_type(int, receiver_queue_size, "receiver_queue_size") + _check_type_or_none(str, reader_name, "reader_name") + _check_type_or_none(str, subscription_role_prefix, "subscription_role_prefix") + _check_type(bool, is_read_compacted, "is_read_compacted") + _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") conf = _pulsar.ReaderConfiguration() if reader_listener: @@ -862,7 +888,7 @@ def get_topic_partitions(self, topic): :param topic: the topic name to lookup :return: a list of partition name """ - _check_type(str, topic, 'topic') + _check_type(str, topic, "topic") return self._client.get_topic_partitions(topic) def shutdown(self): @@ -912,16 +938,18 @@ def last_sequence_id(self): """ return self._producer.last_sequence_id() - def send(self, content, - properties=None, - partition_key=None, - sequence_id=None, - replication_clusters=None, - disable_replication=False, - event_timestamp=None, - deliver_at=None, - deliver_after=None, - ): + def send( + self, + content, + properties=None, + partition_key=None, + sequence_id=None, + replication_clusters=None, + disable_replication=False, + event_timestamp=None, + deliver_at=None, + deliver_after=None, + ): """ Publish a message on the topic. Blocks until the message is acknowledged @@ -958,21 +986,32 @@ def send(self, content, Specify a delay in timedelta for the delivery of the messages. """ - msg = self._build_msg(content, properties, partition_key, sequence_id, - replication_clusters, disable_replication, event_timestamp, - deliver_at, deliver_after) + msg = self._build_msg( + content, + properties, + partition_key, + sequence_id, + replication_clusters, + disable_replication, + event_timestamp, + deliver_at, + deliver_after, + ) return MessageId.deserialize(self._producer.send(msg)) - def send_async(self, content, callback, - properties=None, - partition_key=None, - sequence_id=None, - replication_clusters=None, - disable_replication=False, - event_timestamp=None, - deliver_at=None, - deliver_after=None, - ): + def send_async( + self, + content, + callback, + properties=None, + partition_key=None, + sequence_id=None, + replication_clusters=None, + disable_replication=False, + event_timestamp=None, + deliver_at=None, + deliver_after=None, + ): """ Send a message asynchronously. @@ -1020,12 +1059,19 @@ def callback(res, msg_id): * `deliver_after`: Specify a delay in timedelta for the delivery of the messages. """ - msg = self._build_msg(content, properties, partition_key, sequence_id, - replication_clusters, disable_replication, event_timestamp, - deliver_at, deliver_after) + msg = self._build_msg( + content, + properties, + partition_key, + sequence_id, + replication_clusters, + disable_replication, + event_timestamp, + deliver_at, + deliver_after, + ) self._producer.send_async(msg, callback) - def flush(self): """ Flush all the messages buffered in the client and wait until all messages have been @@ -1033,27 +1079,35 @@ def flush(self): """ self._producer.flush() - def close(self): """ Close the producer. """ self._producer.close() - def _build_msg(self, content, properties, partition_key, sequence_id, - replication_clusters, disable_replication, event_timestamp, - deliver_at, deliver_after): + def _build_msg( + self, + content, + properties, + partition_key, + sequence_id, + replication_clusters, + disable_replication, + event_timestamp, + deliver_at, + deliver_after, + ): data = self._schema.encode(content) - _check_type(bytes, data, 'data') - _check_type_or_none(dict, properties, 'properties') - _check_type_or_none(str, partition_key, 'partition_key') - _check_type_or_none(int, sequence_id, 'sequence_id') - _check_type_or_none(list, replication_clusters, 'replication_clusters') - _check_type(bool, disable_replication, 'disable_replication') - _check_type_or_none(int, event_timestamp, 'event_timestamp') - _check_type_or_none(int, deliver_at, 'deliver_at') - _check_type_or_none(timedelta, deliver_after, 'deliver_after') + _check_type(bytes, data, "data") + _check_type_or_none(dict, properties, "properties") + _check_type_or_none(str, partition_key, "partition_key") + _check_type_or_none(int, sequence_id, "sequence_id") + _check_type_or_none(list, replication_clusters, "replication_clusters") + _check_type(bool, disable_replication, "disable_replication") + _check_type_or_none(int, event_timestamp, "event_timestamp") + _check_type_or_none(int, deliver_at, "deliver_at") + _check_type_or_none(timedelta, deliver_after, "deliver_after") mb = _pulsar.MessageBuilder() mb.content(data) @@ -1123,7 +1177,7 @@ def receive(self, timeout_millis=None): if timeout_millis is None: msg = self._consumer.receive() else: - _check_type(int, timeout_millis, 'timeout_millis') + _check_type(int, timeout_millis, "timeout_millis") msg = self._consumer.receive(timeout_millis) m = Message() @@ -1261,7 +1315,7 @@ def read_next(self, timeout_millis=None): if timeout_millis is None: msg = self._reader.read_next() else: - _check_type(int, timeout_millis, 'timeout_millis') + _check_type(int, timeout_millis, "timeout_millis") msg = self._reader.read_next(timeout_millis) m = Message() @@ -1273,7 +1327,7 @@ def has_message_available(self): """ Check if there is any message available to read from the current position. """ - return self._reader.has_message_available(); + return self._reader.has_message_available() def seek(self, messageid): """ @@ -1296,10 +1350,12 @@ def close(self): self._reader.close() self._client._consumers.remove(self) + class CryptoKeyReader: """ Default crypto key reader implementation """ + def __init__(self, public_key_path, private_key_path): """ Create crypto key reader. @@ -1309,20 +1365,21 @@ def __init__(self, public_key_path, private_key_path): * `public_key_path`: Path to the public key * `private_key_path`: Path to private key """ - _check_type(str, public_key_path, 'public_key_path') - _check_type(str, private_key_path, 'private_key_path') + _check_type(str, public_key_path, "public_key_path") + _check_type(str, private_key_path, "private_key_path") self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) + def _check_type(var_type, var, name): if not isinstance(var, var_type): - raise ValueError("Argument %s is expected to be of type '%s' and not '%s'" - % (name, var_type.__name__, type(var).__name__)) + raise ValueError( + "Argument %s is expected to be of type '%s' and not '%s'" % (name, var_type.__name__, type(var).__name__) + ) def _check_type_or_none(var_type, var, name): if var is not None and not isinstance(var, var_type): - raise ValueError("Argument %s is expected to be either None or of type '%s'" - % (name, var_type.__name__)) + raise ValueError("Argument %s is expected to be either None or of type '%s'" % (name, var_type.__name__)) def _listener_wrapper(listener, schema): @@ -1333,4 +1390,5 @@ def wrapper(consumer, msg): m._message = msg m._schema = schema listener(c, m) + return wrapper diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py index fa9647913cb25..17441fe35fe12 100644 --- a/pulsar-client-cpp/python/pulsar/functions/context.py +++ b/pulsar-client-cpp/python/pulsar/functions/context.py @@ -42,151 +42,161 @@ """ from abc import abstractmethod + class Context(object): - """Interface defining information available at process time""" - @abstractmethod - def get_message_id(self): - """Return the messageid of the current message that we are processing""" - pass - - @abstractmethod - def get_message_key(self): - """Return the key of the current message that we are processing""" - pass - - @abstractmethod - def get_message_eventtime(self): - """Return the event time of the current message that we are processing""" - pass - - @abstractmethod - def get_message_properties(self): - """Return the message properties kv map of the current message that we are processing""" - pass - - @abstractmethod - def get_current_message_topic_name(self): - """Returns the topic name of the message that we are processing""" - pass - - @abstractmethod - def get_function_tenant(self): - """Returns the tenant of the message that's being processed""" - pass - - @abstractmethod - def get_function_namespace(self): - """Returns the namespace of the message that's being processed""" - - @abstractmethod - def get_function_name(self): - """Returns the function name that we are a part of""" - pass - - @abstractmethod - def get_function_id(self): - """Returns the function id that we are a part of""" - pass - - @abstractmethod - def get_instance_id(self): - """Returns the instance id that is executing the function""" - pass - - @abstractmethod - def get_function_version(self): - """Returns the version of function that we are executing""" - pass - - @abstractmethod - def get_logger(self): - """Returns the logger object that can be used to do logging""" - pass - - @abstractmethod - def get_user_config_value(self, key): - """Returns the value of the user-defined config. If the key doesn't exist, None is returned""" - pass - - @abstractmethod - def get_user_config_map(self): - """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)""" - pass - - @abstractmethod - def get_secret(self, secret_name): - """Returns the secret value associated with the name. None if nothing was found""" - pass - - @abstractmethod - def get_partition_key(self): - """Returns partition key of the input message is one exists""" - pass - - - @abstractmethod - def record_metric(self, metric_name, metric_value): - """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" - pass - - @abstractmethod - def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None): - """Publishes message to topic_name by first serializing the message using serde_class_name serde - The message will have properties specified if any - - The available options for message_conf: - - properties, - partition_key, - sequence_id, - replication_clusters, - disable_replication, - event_timestamp - - """ - pass - - @abstractmethod - def get_input_topics(self): - """Returns the input topics of function""" - pass - - @abstractmethod - def get_output_topic(self): - """Returns the output topic of function""" - pass - - @abstractmethod - def get_output_serde_class_name(self): - """return output Serde class""" - pass - - @abstractmethod - def ack(self, msgid, topic): - """ack this message id""" - pass - - @abstractmethod - def incr_counter(self, key, amount): - """incr the counter of a given key in the managed state""" - pass - - @abstractmethod - def get_counter(self, key): - """get the counter of a given key in the managed state""" - pass - - @abstractmethod - def del_counter(self, key): - """delete the counter of a given key in the managed state""" - pass - - @abstractmethod - def put_state(self, key, value): - """update the value of a given key in the managed state""" - pass - - @abstractmethod - def get_state(self, key): - """get the value of a given key in the managed state""" - pass + """Interface defining information available at process time""" + + @abstractmethod + def get_message_id(self): + """Return the messageid of the current message that we are processing""" + pass + + @abstractmethod + def get_message_key(self): + """Return the key of the current message that we are processing""" + pass + + @abstractmethod + def get_message_eventtime(self): + """Return the event time of the current message that we are processing""" + pass + + @abstractmethod + def get_message_properties(self): + """Return the message properties kv map of the current message that we are processing""" + pass + + @abstractmethod + def get_current_message_topic_name(self): + """Returns the topic name of the message that we are processing""" + pass + + @abstractmethod + def get_function_tenant(self): + """Returns the tenant of the message that's being processed""" + pass + + @abstractmethod + def get_function_namespace(self): + """Returns the namespace of the message that's being processed""" + + @abstractmethod + def get_function_name(self): + """Returns the function name that we are a part of""" + pass + + @abstractmethod + def get_function_id(self): + """Returns the function id that we are a part of""" + pass + + @abstractmethod + def get_instance_id(self): + """Returns the instance id that is executing the function""" + pass + + @abstractmethod + def get_function_version(self): + """Returns the version of function that we are executing""" + pass + + @abstractmethod + def get_logger(self): + """Returns the logger object that can be used to do logging""" + pass + + @abstractmethod + def get_user_config_value(self, key): + """Returns the value of the user-defined config. If the key doesn't exist, None is returned""" + pass + + @abstractmethod + def get_user_config_map(self): + """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)""" + pass + + @abstractmethod + def get_secret(self, secret_name): + """Returns the secret value associated with the name. None if nothing was found""" + pass + + @abstractmethod + def get_partition_key(self): + """Returns partition key of the input message is one exists""" + pass + + @abstractmethod + def record_metric(self, metric_name, metric_value): + """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)""" + pass + + @abstractmethod + def publish( + self, + topic_name, + message, + serde_class_name="serde.IdentitySerDe", + properties=None, + compression_type=None, + callback=None, + message_conf=None, + ): + """Publishes message to topic_name by first serializing the message using serde_class_name serde + The message will have properties specified if any + + The available options for message_conf: + + properties, + partition_key, + sequence_id, + replication_clusters, + disable_replication, + event_timestamp + + """ + pass + + @abstractmethod + def get_input_topics(self): + """Returns the input topics of function""" + pass + + @abstractmethod + def get_output_topic(self): + """Returns the output topic of function""" + pass + + @abstractmethod + def get_output_serde_class_name(self): + """return output Serde class""" + pass + + @abstractmethod + def ack(self, msgid, topic): + """ack this message id""" + pass + + @abstractmethod + def incr_counter(self, key, amount): + """incr the counter of a given key in the managed state""" + pass + + @abstractmethod + def get_counter(self, key): + """get the counter of a given key in the managed state""" + pass + + @abstractmethod + def del_counter(self, key): + """delete the counter of a given key in the managed state""" + pass + + @abstractmethod + def put_state(self, key, value): + """update the value of a given key in the managed state""" + pass + + @abstractmethod + def get_state(self, key): + """get the value of a given key in the managed state""" + pass diff --git a/pulsar-client-cpp/python/pulsar/functions/function.py b/pulsar-client-cpp/python/pulsar/functions/function.py index dde92b8a55fec..f3ddd8e5487c4 100644 --- a/pulsar-client-cpp/python/pulsar/functions/function.py +++ b/pulsar-client-cpp/python/pulsar/functions/function.py @@ -44,9 +44,11 @@ """ from abc import abstractmethod + class Function(object): - """Interface for Pulsar Function""" - @abstractmethod - def process(self, input, context): - """Process input message""" - pass \ No newline at end of file + """Interface for Pulsar Function""" + + @abstractmethod + def process(self, input, context): + """Process input message""" + pass diff --git a/pulsar-client-cpp/python/pulsar/functions/serde.py b/pulsar-client-cpp/python/pulsar/functions/serde.py index 968c1c9bc3aae..a29a0ccf85148 100644 --- a/pulsar-client-cpp/python/pulsar/functions/serde.py +++ b/pulsar-client-cpp/python/pulsar/functions/serde.py @@ -47,42 +47,48 @@ import pickle + class SerDe(object): - """Interface for Serialization/Deserialization""" - @abstractmethod - def serialize(self, input): - """Serialize input message into bytes""" - pass + """Interface for Serialization/Deserialization""" + + @abstractmethod + def serialize(self, input): + """Serialize input message into bytes""" + pass + + @abstractmethod + def deserialize(self, input_bytes): + """Serialize input_bytes into an object""" + pass - @abstractmethod - def deserialize(self, input_bytes): - """Serialize input_bytes into an object""" - pass class PickleSerDe(SerDe): - """Pickle based serializer""" - def serialize(self, input): - return pickle.dumps(input) + """Pickle based serializer""" + + def serialize(self, input): + return pickle.dumps(input) + + def deserialize(self, input_bytes): + return pickle.loads(input_bytes) - def deserialize(self, input_bytes): - return pickle.loads(input_bytes) class IdentitySerDe(SerDe): - """Simple Serde that just conversion to string and back""" - def __init__(self): - self._types = [int, float, complex, str] - - def serialize(self, input): - if type(input) in self._types: - return str(input).encode('utf-8') - if type(input) == bytes: - return input - raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) - - def deserialize(self, input_bytes): - for typ in self._types: - try: - return typ(input_bytes.decode('utf-8')) - except: - pass - return input_bytes + """Simple Serde that just conversion to string and back""" + + def __init__(self): + self._types = [int, float, complex, str] + + def serialize(self, input): + if type(input) in self._types: + return str(input).encode("utf-8") + if type(input) == bytes: + return input + raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input)) + + def deserialize(self, input_bytes): + for typ in self._types: + try: + return typ(input_bytes.decode("utf-8")) + except: + pass + return input_bytes diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py index fd778f3293677..67343dc63c500 100644 --- a/pulsar-client-cpp/python/pulsar/schema/definition.py +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -25,17 +25,16 @@ def _check_record_or_field(x): - if (type(x) is type and not issubclass(x, Record)) \ - and not isinstance(x, Field): - raise Exception('Argument ' + x + ' is not a Record or a Field') + if (type(x) is type and not issubclass(x, Record)) and not isinstance(x, Field): + raise Exception("Argument " + x + " is not a Record or a Field") class RecordMeta(type): def __new__(metacls, name, parents, dct): - if name != 'Record': + if name != "Record": # Do not apply this logic to the base class itself - dct['_fields'] = RecordMeta._get_fields(dct) - dct['_required'] = False + dct["_fields"] = RecordMeta._get_fields(dct) + dct["_required"] = False return type.__new__(metacls, name, parents, dct) @classmethod @@ -75,16 +74,26 @@ def __init__(self, default=None, required_default=False, required=False, *args, copied = copy.copy(value) copied.__init__(**kwargs[k]) self.__setattr__(k, copied) - elif isinstance(value, Array) and isinstance(kwargs[k], list) and len(kwargs[k]) > 0 \ - and isinstance(value.array_type, Record) and isinstance(kwargs[k][0], dict): + elif ( + isinstance(value, Array) + and isinstance(kwargs[k], list) + and len(kwargs[k]) > 0 + and isinstance(value.array_type, Record) + and isinstance(kwargs[k][0], dict) + ): arr = [] for item in kwargs[k]: copied = copy.copy(value.array_type) copied.__init__(**item) arr.append(copied) self.__setattr__(k, arr) - elif isinstance(value, Map) and isinstance(kwargs[k], dict) and len(kwargs[k]) > 0 \ - and isinstance(value.value_type, Record) and isinstance(list(kwargs[k].values())[0], dict): + elif ( + isinstance(value, Map) + and isinstance(kwargs[k], dict) + and len(kwargs[k]) > 0 + and isinstance(value.value_type, Record) + and isinstance(list(kwargs[k].values())[0], dict) + ): dic = {} for mapKey, mapValue in kwargs[k].items(): copied = copy.copy(value.value_type) @@ -107,9 +116,9 @@ def schema(cls): @classmethod def schema_info(cls, defined_names): - namespace_prefix = '' + namespace_prefix = "" if cls._avro_namespace is not None: - namespace_prefix = cls._avro_namespace + '.' + namespace_prefix = cls._avro_namespace + "." namespace_name = namespace_prefix + cls.__name__ if namespace_name in defined_names: @@ -117,13 +126,10 @@ def schema_info(cls, defined_names): defined_names.add(namespace_name) - schema = { - 'type': 'record', - 'name': str(cls.__name__) - } + schema = {"type": "record", "name": str(cls.__name__)} if cls._avro_namespace is not None: - schema['namespace'] = cls._avro_namespace - schema['fields'] = [] + schema["namespace"] = cls._avro_namespace + schema["fields"] = [] if cls._sorted_fields: fields = sorted(cls._fields.keys()) @@ -131,29 +137,30 @@ def schema_info(cls, defined_names): fields = cls._fields.keys() for name in fields: field = cls._fields[name] - field_type = field.schema_info(defined_names) \ - if field._required else ['null', field.schema_info(defined_names)] - schema['fields'].append({ - 'name': name, - 'default': field.default(), - 'type': field_type - }) if field.required_default() else schema['fields'].append({ - 'name': name, - 'type': field_type, - }) + field_type = ( + field.schema_info(defined_names) if field._required else ["null", field.schema_info(defined_names)] + ) + schema["fields"].append( + {"name": name, "default": field.default(), "type": field_type} + ) if field.required_default() else schema["fields"].append( + { + "name": name, + "type": field_type, + } + ) return schema def __setattr__(self, key, value): - if key == '_default': + if key == "_default": super(Record, self).__setattr__(key, value) - elif key == '_required_default': + elif key == "_required_default": super(Record, self).__setattr__(key, value) - elif key == '_required': + elif key == "_required": super(Record, self).__setattr__(key, value) else: if key not in self._fields: - raise AttributeError('Cannot set undeclared field ' + key + ' on record') + raise AttributeError("Cannot set undeclared field " + key + " on record") # Check that type of value matches the field type field = self._fields[key] @@ -179,12 +186,13 @@ def python_type(self): return self.__class__ def validate_type(self, name, val): - if not val and not self._required: + if val is None and not self._required: return self.default() if not isinstance(val, self.__class__): - raise TypeError("Invalid type '%s' for sub-record field '%s'. Expected: %s" % ( - type(val), name, self.__class__)) + raise TypeError( + "Invalid type '%s' for sub-record field '%s'. Expected: %s" % (type(val), name, self.__class__) + ) return val def default(self): @@ -200,7 +208,7 @@ def required_default(self): class Field(object): def __init__(self, default=None, required=False, required_default=False): if default is not None: - default = self.validate_type('default', default) + default = self.validate_type("default", default) self._default = default self._required_default = required_default self._required = required @@ -214,7 +222,7 @@ def python_type(self): pass def validate_type(self, name, val): - if not val and not self._required: + if val is None and not self._required: return self.default() if type(val) != self.python_type(): @@ -240,20 +248,20 @@ def required_default(self): class Null(Field): def type(self): - return 'null' + return "null" def python_type(self): return type(None) def validate_type(self, name, val): if val is not None: - raise TypeError('Field ' + name + ' is set to be None') + raise TypeError("Field " + name + " is set to be None") return val class Boolean(Field): def type(self): - return 'boolean' + return "boolean" def python_type(self): return bool @@ -267,7 +275,7 @@ def default(self): class Integer(Field): def type(self): - return 'int' + return "int" def python_type(self): return int @@ -281,7 +289,7 @@ def default(self): class Long(Field): def type(self): - return 'long' + return "long" def python_type(self): return int @@ -295,7 +303,7 @@ def default(self): class Float(Field): def type(self): - return 'float' + return "float" def python_type(self): return float @@ -309,7 +317,7 @@ def default(self): class Double(Field): def type(self): - return 'double' + return "double" def python_type(self): return float @@ -323,7 +331,7 @@ def default(self): class Bytes(Field): def type(self): - return 'bytes' + return "bytes" def python_type(self): return bytes @@ -337,7 +345,7 @@ def default(self): class String(Field): def type(self): - return 'string' + return "string" def python_type(self): return str @@ -345,10 +353,10 @@ def python_type(self): def validate_type(self, name, val): t = type(val) - if not val and not self._required: + if val is None and not self._required: return self.default() - if not (t is str or t.__name__ == 'unicode'): + if not (t is str or t.__name__ == "unicode"): raise TypeError("Invalid type '%s' for field '%s'. Expected a string" % (t, name)) return val @@ -358,8 +366,10 @@ def default(self): else: return None + # Complex types + class _Enum(Field): def __init__(self, enum_type): if not issubclass(enum_type, Enum): @@ -371,7 +381,7 @@ def __init__(self, enum_type): super(_Enum, self).__init__() def type(self): - return 'enum' + return "enum" def python_type(self): return self.enum_type @@ -386,14 +396,17 @@ def validate_type(self, name, val): return self.enum_type.__members__[val] else: raise TypeError( - "Invalid enum value '%s' for field '%s'. Expected: %s" % (val, name, self.enum_type.__members__.keys())) + "Invalid enum value '%s' for field '%s'. Expected: %s" + % (val, name, self.enum_type.__members__.keys()) + ) elif type(val) is int: # The enum was passed as an int, we need to check it against the possible values if val in self.values: return self.values[val] else: raise TypeError( - "Invalid enum value '%s' for field '%s'. Expected: %s" % (val, name, self.values.keys())) + "Invalid enum value '%s' for field '%s'. Expected: %s" % (val, name, self.values.keys()) + ) elif type(val) != self.python_type(): raise TypeError("Invalid type '%s' for field '%s'. Expected: %s" % (type(val), name, self.python_type())) else: @@ -406,11 +419,7 @@ def schema_info(self, defined_names): if self.enum_type.__name__ in defined_names: return self.enum_type.__name__ defined_names.add(self.enum_type.__name__) - return { - 'type': self.type(), - 'name': self.enum_type.__name__, - 'symbols': [x.name for x in self.enum_type] - } + return {"type": self.type(), "name": self.enum_type.__name__, "symbols": [x.name for x in self.enum_type]} def default(self): if self._default is not None: @@ -426,7 +435,7 @@ def __init__(self, array_type, default=None, required=False, required_default=Fa super(Array, self).__init__(default=default, required=required, required_default=required_default) def type(self): - return 'array' + return "array" def python_type(self): return list @@ -439,8 +448,7 @@ def validate_type(self, name, val): for x in val: if type(x) != self.array_type.python_type(): - raise TypeError('Array field ' + name + ' items should all be of type ' - + self.array_type.python_type()) + raise TypeError("Array field " + name + " items should all be of type " + self.array_type.python_type()) return val def schema(self): @@ -448,9 +456,10 @@ def schema(self): def schema_info(self, defined_names): return { - 'type': self.type(), - 'items': self.array_type.schema_info(defined_names) if isinstance(self.array_type, (Array, Map, Record)) - else self.array_type.type() + "type": self.type(), + "items": self.array_type.schema_info(defined_names) + if isinstance(self.array_type, (Array, Map, Record)) + else self.array_type.type(), } def default(self): @@ -467,7 +476,7 @@ def __init__(self, value_type, default=None, required=False, required_default=Fa super(Map, self).__init__(default=default, required=required, required_default=required_default) def type(self): - return 'map' + return "map" def python_type(self): return dict @@ -480,10 +489,11 @@ def validate_type(self, name, val): for k, v in val.items(): if type(k) != str and not is_unicode(k): - raise TypeError('Map keys for field ' + name + ' should all be strings') + raise TypeError("Map keys for field " + name + " should all be strings") if type(v) != self.value_type.python_type(): - raise TypeError('Map values for field ' + name + ' should all be of type ' - + self.value_type.python_type()) + raise TypeError( + "Map values for field " + name + " should all be of type " + self.value_type.python_type() + ) return val @@ -492,9 +502,10 @@ def schema(self): def schema_info(self, defined_names): return { - 'type': self.type(), - 'values': self.value_type.schema_info(defined_names) if isinstance(self.value_type, (Array, Map, Record)) - else self.value_type.type() + "type": self.type(), + "values": self.value_type.schema_info(defined_names) + if isinstance(self.value_type, (Array, Map, Record)) + else self.value_type.type(), } def default(self): @@ -507,4 +518,4 @@ def default(self): # Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2 # and also make it work well with Python3. def is_unicode(x): - return 'encode' in dir(x) and type(x.encode()) == str + return "encode" in dir(x) and type(x.encode()) == str diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py index 083efc353596b..867668a630b99 100644 --- a/pulsar-client-cpp/python/pulsar/schema/schema.py +++ b/pulsar-client-cpp/python/pulsar/schema/schema.py @@ -27,8 +27,7 @@ class Schema(object): def __init__(self, record_cls, schema_type, schema_definition, schema_name): self._record_cls = record_cls - self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name, - json.dumps(schema_definition, indent=True)) + self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name, json.dumps(schema_definition, indent=True)) @abstractmethod def encode(self, obj): @@ -43,13 +42,14 @@ def schema_info(self): def _validate_object_type(self, obj): if not isinstance(obj, self._record_cls): - raise TypeError('Invalid record obj of type ' + str(type(obj)) - + ' - expected type is ' + str(self._record_cls)) + raise TypeError( + "Invalid record obj of type " + str(type(obj)) + " - expected type is " + str(self._record_cls) + ) class BytesSchema(Schema): def __init__(self): - super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES') + super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, "BYTES") def encode(self, data): self._validate_object_type(data) @@ -61,21 +61,19 @@ def decode(self, data): class StringSchema(Schema): def __init__(self): - super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING') + super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, "STRING") def encode(self, obj): self._validate_object_type(obj) - return obj.encode('utf-8') + return obj.encode("utf-8") def decode(self, data): - return data.decode('utf-8') + return data.decode("utf-8") class JsonSchema(Schema): - def __init__(self, record_cls): - super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON, - record_cls.schema(), 'JSON') + super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON, record_cls.schema(), "JSON") def _get_serialized_value(self, o): if isinstance(o, enum.Enum): @@ -85,11 +83,16 @@ def _get_serialized_value(self, o): def encode(self, obj): self._validate_object_type(obj) - del obj.__dict__['_default'] - del obj.__dict__['_required'] - del obj.__dict__['_required_default'] - - return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8') + # Copy the dict of the object so we don't modify the provided object which was passed by reference + data = obj.__dict__.copy() + if "_default" in data: + del data["_default"] + if "_required" in data: + del data["_required"] + if "_required_default" in data: + del data["_required_default"] + + return json.dumps(data, default=self._get_serialized_value, indent=True).encode("utf-8") def decode(self, data): return self._record_cls(**json.loads(data)) diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py index e76fc51affbe8..8d3e79490d2e1 100644 --- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py +++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py @@ -26,15 +26,16 @@ try: import fastavro + HAS_AVRO = True except ModuleNotFoundError: HAS_AVRO = False if HAS_AVRO: + class AvroSchema(Schema): def __init__(self, record_cls): - super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, - record_cls.schema(), 'AVRO') + super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, record_cls.schema(), "AVRO") self._schema = record_cls.schema() def _get_serialized_value(self, x): @@ -70,11 +71,15 @@ def decode(self, data): d = fastavro.schemaless_reader(buffer, self._schema) return self._record_cls(**d) + else: + class AvroSchema(Schema): def __init__(self, _record_cls): - raise Exception("Avro library support was not found. Make sure to install Pulsar client " + - "with Avro support: pip3 install 'pulsar-client[avro]'") + raise Exception( + "Avro library support was not found. Make sure to install Pulsar client " + + "with Avro support: pip3 install 'pulsar-client[avro]'" + ) def encode(self, obj): pass