Skip to content

Commit

Permalink
PulsarFunctionsTests - use InstanceUtils.getDefaultSubscriptionName i…
Browse files Browse the repository at this point in the history
…nstead of an hardcoded subscription name
  • Loading branch information
eolivelli committed Jun 8, 2021
1 parent 96afcf9 commit a742d13
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
Expand Down Expand Up @@ -220,7 +221,7 @@ private <T extends GenericContainer> void runSinkTester(SinkTester<T> tester, bo

ensureSubscriptionCreated(
inputTopicName,
String.format("public/default/%s", sinkName),
InstanceUtils.getDefaultSubscriptionName("public", "default", sinkName),
tester.getInputTopicSchema());

// submit the sink connector
Expand Down Expand Up @@ -1762,7 +1763,7 @@ private static <T> void submitFunction(Runtime runtime,
commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));

ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
ensureSubscriptionCreated(inputTopicName, InstanceUtils.getDefaultSubscriptionName("public", "default", functionName), inputTopicSchema);
}

private static void updateFunctionParallelism(String functionName, int parallelism) throws Exception {
Expand Down Expand Up @@ -1965,6 +1966,8 @@ private static void checkSubscriptionsCleanup(String topic) throws Exception {
"stats",
topic);
TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class);
log.info("topicStats {}", result.getStdout());
log.info("topicStats subs {}", topicStats.subscriptions);
assertEquals(topicStats.subscriptions.size(), 0);

} catch (ContainerExecException e) {
Expand Down Expand Up @@ -2731,7 +2734,7 @@ private static void submitJavaLoggingFunction(String inputTopicName,
commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));

ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
ensureSubscriptionCreated(inputTopicName, InstanceUtils.getDefaultSubscriptionName("public", "default", functionName), schema);
}

private static void publishAndConsumeMessages(String inputTopic,
Expand Down

0 comments on commit a742d13

Please sign in to comment.