Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pulsar-client-cpp/python/pulsar/functions/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p
"""
pass

@abstractmethod
def get_input_topics(self):
"""Returns the input topics of function"""
pass

@abstractmethod
def get_output_topic(self):
"""Returns the output topic of function"""
Expand Down
3 changes: 3 additions & 0 deletions pulsar-functions/instance/src/main/python/contextimpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ def record_metric(self, metric_name, metric_value):

self.user_metrics_map[metric_name].observe(metric_value)

def get_input_topics(self):
return list(self.instance_config.function_details.source.inputSpecs.keys())

def get_output_topic(self):
return self.instance_config.function_details.output

Expand Down
1 change: 1 addition & 0 deletions site2/docs/functions-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ Method | What it provides
`get_user_config_map` | Returns the entire user-defined config as a dict
`record_metric` | Records a per-key [metric](#python-metrics)
`publish` | Publishes a message to the specified Pulsar topic
`get_input_topics` | The name(s) of the input(topics), as a list
`get_output_topic` | The name of the output topic
`get_output_serde_class_name` | The name of the output [SerDe](#python-serde) class
`ack` | [Acks](reference-terminology.md#acknowledgment-ack) the message being processed to Pulsar
Expand Down
2 changes: 2 additions & 0 deletions site2/docs/functions-develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ class ContextImpl(pulsar.Context):
...
def record_metric(self, metric_name, metric_value):
...
def get_input_topics(self):
...
def get_output_topic(self):
...
def get_output_serde_class_name(self):
Expand Down