Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
410 changes: 234 additions & 176 deletions pulsar-client-cpp/python/pulsar/__init__.py

Large diffs are not rendered by default.

304 changes: 157 additions & 147 deletions pulsar-client-cpp/python/pulsar/functions/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,151 +42,161 @@
"""
from abc import abstractmethod


class Context(object):
"""Interface defining information available at process time"""
@abstractmethod
def get_message_id(self):
"""Return the messageid of the current message that we are processing"""
pass

@abstractmethod
def get_message_key(self):
"""Return the key of the current message that we are processing"""
pass

@abstractmethod
def get_message_eventtime(self):
"""Return the event time of the current message that we are processing"""
pass

@abstractmethod
def get_message_properties(self):
"""Return the message properties kv map of the current message that we are processing"""
pass

@abstractmethod
def get_current_message_topic_name(self):
"""Returns the topic name of the message that we are processing"""
pass

@abstractmethod
def get_function_tenant(self):
"""Returns the tenant of the message that's being processed"""
pass

@abstractmethod
def get_function_namespace(self):
"""Returns the namespace of the message that's being processed"""

@abstractmethod
def get_function_name(self):
"""Returns the function name that we are a part of"""
pass

@abstractmethod
def get_function_id(self):
"""Returns the function id that we are a part of"""
pass

@abstractmethod
def get_instance_id(self):
"""Returns the instance id that is executing the function"""
pass

@abstractmethod
def get_function_version(self):
"""Returns the version of function that we are executing"""
pass

@abstractmethod
def get_logger(self):
"""Returns the logger object that can be used to do logging"""
pass

@abstractmethod
def get_user_config_value(self, key):
"""Returns the value of the user-defined config. If the key doesn't exist, None is returned"""
pass

@abstractmethod
def get_user_config_map(self):
"""Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)"""
pass

@abstractmethod
def get_secret(self, secret_name):
"""Returns the secret value associated with the name. None if nothing was found"""
pass

@abstractmethod
def get_partition_key(self):
"""Returns partition key of the input message is one exists"""
pass


@abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
pass

@abstractmethod
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
"""Publishes message to topic_name by first serializing the message using serde_class_name serde
The message will have properties specified if any

The available options for message_conf:

properties,
partition_key,
sequence_id,
replication_clusters,
disable_replication,
event_timestamp

"""
pass

@abstractmethod
def get_input_topics(self):
"""Returns the input topics of function"""
pass

@abstractmethod
def get_output_topic(self):
"""Returns the output topic of function"""
pass

@abstractmethod
def get_output_serde_class_name(self):
"""return output Serde class"""
pass

@abstractmethod
def ack(self, msgid, topic):
"""ack this message id"""
pass

@abstractmethod
def incr_counter(self, key, amount):
"""incr the counter of a given key in the managed state"""
pass

@abstractmethod
def get_counter(self, key):
"""get the counter of a given key in the managed state"""
pass

@abstractmethod
def del_counter(self, key):
"""delete the counter of a given key in the managed state"""
pass

@abstractmethod
def put_state(self, key, value):
"""update the value of a given key in the managed state"""
pass

@abstractmethod
def get_state(self, key):
"""get the value of a given key in the managed state"""
pass
"""Interface defining information available at process time"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The re-indent and changes to " vs ' should be in a different PR.

We should have one commit that just fixed the single problem described.


@abstractmethod
def get_message_id(self):
"""Return the messageid of the current message that we are processing"""
pass

@abstractmethod
def get_message_key(self):
"""Return the key of the current message that we are processing"""
pass

@abstractmethod
def get_message_eventtime(self):
"""Return the event time of the current message that we are processing"""
pass

@abstractmethod
def get_message_properties(self):
"""Return the message properties kv map of the current message that we are processing"""
pass

@abstractmethod
def get_current_message_topic_name(self):
"""Returns the topic name of the message that we are processing"""
pass

@abstractmethod
def get_function_tenant(self):
"""Returns the tenant of the message that's being processed"""
pass

@abstractmethod
def get_function_namespace(self):
"""Returns the namespace of the message that's being processed"""

@abstractmethod
def get_function_name(self):
"""Returns the function name that we are a part of"""
pass

@abstractmethod
def get_function_id(self):
"""Returns the function id that we are a part of"""
pass

@abstractmethod
def get_instance_id(self):
"""Returns the instance id that is executing the function"""
pass

@abstractmethod
def get_function_version(self):
"""Returns the version of function that we are executing"""
pass

@abstractmethod
def get_logger(self):
"""Returns the logger object that can be used to do logging"""
pass

@abstractmethod
def get_user_config_value(self, key):
"""Returns the value of the user-defined config. If the key doesn't exist, None is returned"""
pass

@abstractmethod
def get_user_config_map(self):
"""Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)"""
pass

@abstractmethod
def get_secret(self, secret_name):
"""Returns the secret value associated with the name. None if nothing was found"""
pass

@abstractmethod
def get_partition_key(self):
"""Returns partition key of the input message is one exists"""
pass

@abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
pass

@abstractmethod
def publish(
self,
topic_name,
message,
serde_class_name="serde.IdentitySerDe",
properties=None,
compression_type=None,
callback=None,
message_conf=None,
):
"""Publishes message to topic_name by first serializing the message using serde_class_name serde
The message will have properties specified if any

The available options for message_conf:

properties,
partition_key,
sequence_id,
replication_clusters,
disable_replication,
event_timestamp

"""
pass

@abstractmethod
def get_input_topics(self):
"""Returns the input topics of function"""
pass

@abstractmethod
def get_output_topic(self):
"""Returns the output topic of function"""
pass

@abstractmethod
def get_output_serde_class_name(self):
"""return output Serde class"""
pass

@abstractmethod
def ack(self, msgid, topic):
"""ack this message id"""
pass

@abstractmethod
def incr_counter(self, key, amount):
"""incr the counter of a given key in the managed state"""
pass

@abstractmethod
def get_counter(self, key):
"""get the counter of a given key in the managed state"""
pass

@abstractmethod
def del_counter(self, key):
"""delete the counter of a given key in the managed state"""
pass

@abstractmethod
def put_state(self, key, value):
"""update the value of a given key in the managed state"""
pass

@abstractmethod
def get_state(self, key):
"""get the value of a given key in the managed state"""
pass
12 changes: 7 additions & 5 deletions pulsar-client-cpp/python/pulsar/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
"""
from abc import abstractmethod


class Function(object):
"""Interface for Pulsar Function"""
@abstractmethod
def process(self, input, context):
"""Process input message"""
pass
"""Interface for Pulsar Function"""

@abstractmethod
def process(self, input, context):
"""Process input message"""
pass
Loading