Skip to content

Commit

Permalink
Merge pull request spring-projects#41851 from
Browse files Browse the repository at this point in the history
* spring-projectsgh-41851:
  Polish "Add support for Pulsar default tenant/namespace"
  Add support for Pulsar default tenant/namespace

Closes spring-projectsgh-41851
  • Loading branch information
wilkinsona committed Aug 20, 2024
2 parents 5c76189 + 3bbbef7 commit df89351
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarReaderFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -88,24 +89,31 @@ public class PulsarAutoConfiguration {
@ConditionalOnMissingBean(PulsarProducerFactory.class)
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false")
DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider,
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
customizersProvider);
return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
lambdaSafeCustomizers, topicResolver);
DefaultPulsarProducerFactory<?> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver);
topicBuilderProvider.ifAvailable(producerFactory::setTopicBuilder);
return producerFactory;
}

@Bean
@ConditionalOnMissingBean(PulsarProducerFactory.class)
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
CachingPulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider,
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache();
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
customizersProvider);
return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(),
cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity());
CachingPulsarProducerFactory<?> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient,
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver,
cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(),
cacheProperties.getInitialCapacity());
topicBuilderProvider.ifAvailable(producerFactory::setTopicBuilder);
return producerFactory;
}

private List<ProducerBuilderCustomizer<Object>> lambdaSafeProducerBuilderCustomizers(
Expand Down Expand Up @@ -138,13 +146,17 @@ PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
@Bean
@ConditionalOnMissingBean(PulsarConsumerFactory.class)
DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider,
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
List<ConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeConsumerBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyConsumerBuilderCustomizers(customizers, builder));
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
DefaultPulsarConsumerFactory<?> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
lambdaSafeCustomizers);
topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder);
return consumerFactory;
}

@Bean
Expand Down Expand Up @@ -181,13 +193,17 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
@Bean
@ConditionalOnMissingBean(PulsarReaderFactory.class)
DefaultPulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient,
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider,
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
List<ReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeReaderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyReaderBuilderCustomizers(customizers, builder));
return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers);
DefaultPulsarReaderFactory<?> readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient,
lambdaSafeCustomizers);
topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder);
return readerFactory;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.beans.factory.ObjectProvider;
Expand All @@ -34,13 +35,15 @@
import org.springframework.boot.util.LambdaSafe;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.pulsar.core.DefaultPulsarClientFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
import org.springframework.pulsar.core.PulsarClientFactory;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -176,4 +179,13 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p
properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures());
}

@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.pulsar.defaults.topic.enabled", havingValue = "true", matchIfMissing = true)
PulsarTopicBuilder pulsarTopicBuilder() {
return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTopic().getTenant(),
this.properties.getDefaults().getTopic().getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ public static class Defaults {
*/
private List<TypeMapping> typeMappings = new ArrayList<>();

private final Topic topic = new Topic();

public List<TypeMapping> getTypeMappings() {
return this.typeMappings;
}
Expand All @@ -265,6 +267,10 @@ public void setTypeMappings(List<TypeMapping> typeMappings) {
this.typeMappings = typeMappings;
}

public Topic getTopic() {
return this.topic;
}

/**
* A mapping from message type to topic and/or schema info to use (at least one of
* {@code topicName} or {@code schemaInfo} must be specified.
Expand Down Expand Up @@ -301,6 +307,40 @@ public record SchemaInfo(SchemaType schemaType, Class<?> messageKeyType) {

}

public static class Topic {

/**
* Default tenant to use when producing or consuming messages against a
* non-fully-qualified topic URL. When not specified Pulsar uses a default
* tenant of 'public'.
*/
private String tenant;

/**
* Default namespace to use when producing or consuming messages against a
* non-fully-qualified topic URL. When not specified Pulsar uses a default
* namespace of 'default'.
*/
private String namespace;

public String getTenant() {
return this.tenant;
}

public void setTenant(String tenant) {
this.tenant = tenant;
}

public String getNamespace() {
return this.namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

}

}

public static class Function {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,13 +41,15 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
import org.springframework.pulsar.reactive.config.annotation.EnableReactivePulsar;
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory;
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory;
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory.Builder;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer;
Expand Down Expand Up @@ -112,17 +114,19 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach
@ConditionalOnMissingBean(ReactivePulsarSenderFactory.class)
DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
ObjectProvider<ReactiveMessageSenderCache> reactiveMessageSenderCache, TopicResolver topicResolver,
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider,
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
List<ReactiveMessageSenderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageSenderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReactiveMessageSenderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyMessageSenderBuilderCustomizers(customizers, builder));
return DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient)
Builder<Object> senderFactoryBuilder = DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient)
.withDefaultConfigCustomizers(lambdaSafeCustomizers)
.withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable())
.withTopicResolver(topicResolver)
.build();
.withTopicResolver(topicResolver);
topicBuilderProvider.ifAvailable(senderFactoryBuilder::withTopicBuilder);
return senderFactoryBuilder.build();
}

@SuppressWarnings("unchecked")
Expand All @@ -136,13 +140,17 @@ private void applyMessageSenderBuilderCustomizers(List<ReactiveMessageSenderBuil
@ConditionalOnMissingBean(ReactivePulsarConsumerFactory.class)
DefaultReactivePulsarConsumerFactory<?> reactivePulsarConsumerFactory(
ReactivePulsarClient pulsarReactivePulsarClient,
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider,
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
List<ReactiveMessageConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReactiveMessageConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder));
return new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers);
DefaultReactivePulsarConsumerFactory<?> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
pulsarReactivePulsarClient, lambdaSafeCustomizers);
topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder);
return consumerFactory;
}

@SuppressWarnings("unchecked")
Expand All @@ -167,13 +175,17 @@ DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainer
@Bean
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider,
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
List<ReactiveMessageReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageReaderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReactiveMessageReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder));
return new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers);
DefaultReactivePulsarReaderFactory<?> readerFactory = new DefaultReactivePulsarReaderFactory<>(
reactivePulsarClient, lambdaSafeCustomizers);
topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder);
return readerFactory;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,12 @@
"name": "spring.neo4j.uri",
"defaultValue": "bolt://localhost:7687"
},
{
"name": "spring.pulsar.defaults.topic.enabled",
"type": "java.lang.Boolean",
"description": "Whether to enable default tenant and namespace support for topics.",
"defaultValue": true
},
{
"name": "spring.pulsar.function.enabled",
"type": "java.lang.Boolean",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarReaderFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -126,6 +127,7 @@ void autoConfiguresBeans() {
.hasSingleBean(PulsarConnectionDetails.class)
.hasSingleBean(DefaultPulsarClientFactory.class)
.hasSingleBean(PulsarClient.class)
.hasSingleBean(PulsarTopicBuilder.class)
.hasSingleBean(PulsarAdministration.class)
.hasSingleBean(DefaultSchemaResolver.class)
.hasSingleBean(DefaultTopicResolver.class)
Expand All @@ -141,6 +143,12 @@ void autoConfiguresBeans() {
.hasSingleBean(PulsarReaderEndpointRegistry.class));
}

@Test
void topicDefaultsCanBeDisabled() {
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
.run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class));
}

@Nested
class ProducerFactoryTests {

Expand Down Expand Up @@ -219,7 +227,17 @@ void injectsExpectedBeans() {
"spring.pulsar.producer.cache.enabled=false")
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)));
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))
.extracting("topicBuilder")
.isNotNull());
}

@Test
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
.extracting("topicBuilder")
.isNull());
}

@ParameterizedTest
Expand Down Expand Up @@ -375,7 +393,18 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
@Test
void injectsExpectedBeans() {
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.extracting("topicBuilder")
.isNotNull());
}

@Test
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.extracting("topicBuilder")
.isNull());
}

@Test
Expand Down Expand Up @@ -574,7 +603,17 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
@Test
void injectsExpectedBeans() {
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.extracting("topicBuilder")
.isNotNull());
}

@Test
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
.extracting("topicBuilder")
.isNull());
}

@Test
Expand Down
Loading

0 comments on commit df89351

Please sign in to comment.