From bdbce13104ec299fa7a15bf5a820b88569b647d2 Mon Sep 17 00:00:00 2001 From: Brian Candler Date: Thu, 31 Oct 2019 17:46:08 +0000 Subject: [PATCH] Python API: add get_input_topics() Fixes #5525 --- pulsar-client-cpp/python/pulsar/functions/context.py | 5 +++++ pulsar-functions/instance/src/main/python/contextimpl.py | 3 +++ site2/docs/functions-api.md | 1 + site2/docs/functions-develop.md | 2 ++ 4 files changed, 11 insertions(+) diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py index b1ee3a670cf65..fa9647913cb25 100644 --- a/pulsar-client-cpp/python/pulsar/functions/context.py +++ b/pulsar-client-cpp/python/pulsar/functions/context.py @@ -146,6 +146,11 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p """ pass + @abstractmethod + def get_input_topics(self): + """Returns the input topics of function""" + pass + @abstractmethod def get_output_topic(self): """Returns the output topic of function""" diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 146af2c061366..95afea73eaca3 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -135,6 +135,9 @@ def record_metric(self, metric_name, metric_value): self.user_metrics_map[metric_name].observe(metric_value) + def get_input_topics(self): + return list(self.instance_config.function_details.source.inputSpecs.keys()) + def get_output_topic(self): return self.instance_config.function_details.output diff --git a/site2/docs/functions-api.md b/site2/docs/functions-api.md index d460c1fa7a59c..9abd2ed75745d 100644 --- a/site2/docs/functions-api.md +++ b/site2/docs/functions-api.md @@ -588,6 +588,7 @@ Method | What it provides `get_user_config_map` | Returns the entire user-defined config as a dict `record_metric` | Records a per-key [metric](#python-metrics) `publish` | Publishes a message to the specified Pulsar topic +`get_input_topics` | The name(s) of the input(topics), as a list `get_output_topic` | The name of the output topic `get_output_serde_class_name` | The name of the output [SerDe](#python-serde) class `ack` | [Acks](reference-terminology.md#acknowledgment-ack) the message being processed to Pulsar diff --git a/site2/docs/functions-develop.md b/site2/docs/functions-develop.md index 5308295e2b6b7..24131013d22b4 100644 --- a/site2/docs/functions-develop.md +++ b/site2/docs/functions-develop.md @@ -356,6 +356,8 @@ class ContextImpl(pulsar.Context): ... def record_metric(self, metric_name, metric_value): ... + def get_input_topics(self): + ... def get_output_topic(self): ... def get_output_serde_class_name(self):