Skip to content

Commit 643864f

Browse files
committed
Add topic example
1 parent b0480c6 commit 643864f

File tree

6 files changed

+223
-0
lines changed

6 files changed

+223
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.devdevx.examples.springbootrabbitmq.topic.config;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.core.Binding;
6+
import org.springframework.amqp.core.BindingBuilder;
7+
import org.springframework.amqp.core.Queue;
8+
import org.springframework.amqp.core.TopicExchange;
9+
import org.springframework.beans.factory.annotation.Qualifier;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
15+
@Configuration
16+
@ConditionalOnProperty(name = "app.rabbitmq.example", havingValue = "topic")
17+
public class TopicRmqConfig {
18+
19+
private static final Logger log = LoggerFactory.getLogger(TopicRmqConfig.class);
20+
21+
@Value("${app.rabbitmq.topic.exchange}")
22+
private String exchangeName;
23+
24+
@Value("${app.rabbitmq.topic.creation-queue}")
25+
private String creationQueueName;
26+
27+
@Value("${app.rabbitmq.topic.users-queue}")
28+
private String usersQueueName;
29+
30+
@Value("${app.rabbitmq.topic.events-queue}")
31+
private String eventsQueueName;
32+
33+
@Bean
34+
TopicExchange exchange() {
35+
log.info("Creating exchange: {}", exchangeName);
36+
return new TopicExchange(exchangeName);
37+
}
38+
39+
@Bean
40+
Queue creationQueue() {
41+
log.info("Creating queue: {}", creationQueueName);
42+
return new Queue(creationQueueName, false);
43+
}
44+
45+
@Bean
46+
Queue usersQueue() {
47+
log.info("Creating queue: {}", usersQueueName);
48+
return new Queue(usersQueueName, false);
49+
}
50+
51+
@Bean
52+
Queue eventsQueue() {
53+
log.info("Creating queue: {}", eventsQueueName);
54+
return new Queue(eventsQueueName, false);
55+
}
56+
57+
@Bean
58+
Binding bindingCreation(@Qualifier("creationQueue") Queue queue, TopicExchange exchange) {
59+
log.info("Binding queue '{}' to the exchange '{}'", queue.getName(), exchange.getName());
60+
return BindingBuilder.bind(queue).to(exchange).with("*.created.#");
61+
}
62+
63+
@Bean
64+
Binding bindingUsers(@Qualifier("usersQueue") Queue queue, TopicExchange exchange) {
65+
log.info("Binding queue '{}' to the exchange '{}'", queue.getName(), exchange.getName());
66+
return BindingBuilder.bind(queue).to(exchange).with("user.#");
67+
}
68+
69+
@Bean
70+
Binding bindingEvents(@Qualifier("eventsQueue") Queue queue, TopicExchange exchange) {
71+
log.info("Binding queue '{}' to the exchange '{}'", queue.getName(), exchange.getName());
72+
return BindingBuilder.bind(queue).to(exchange).with("#");
73+
}
74+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.devdevx.examples.springbootrabbitmq.topic.config;
2+
3+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
4+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
5+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
12+
@Profile("producer")
13+
@Configuration
14+
@ConditionalOnProperty(name = "app.rabbitmq.example", havingValue = "topic")
15+
public class TopicRmqProduConfig {
16+
17+
@Value("${app.rabbitmq.topic.exchange}")
18+
private String exchangeName;
19+
20+
@Bean
21+
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
22+
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
23+
rabbitTemplate.setMessageConverter(converter);
24+
rabbitTemplate.setExchange(exchangeName);
25+
return rabbitTemplate;
26+
}
27+
28+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.devdevx.examples.springbootrabbitmq.topic.messaging;
2+
3+
public class TopicMessage {
4+
private Integer id;
5+
private String message;
6+
7+
public TopicMessage() {
8+
}
9+
10+
public TopicMessage(Integer id, String message) {
11+
this.id = id;
12+
this.message = message;
13+
}
14+
15+
public Integer getId() {
16+
return id;
17+
}
18+
19+
public void setId(Integer id) {
20+
this.id = id;
21+
}
22+
23+
public String getMessage() {
24+
return message;
25+
}
26+
27+
public void setMessage(String message) {
28+
this.message = message;
29+
}
30+
31+
@Override
32+
public String toString() {
33+
return "TopicMessage{" +
34+
"id=" + id +
35+
", message='" + message + '\'' +
36+
'}';
37+
}
38+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.devdevx.examples.springbootrabbitmq.topic.messaging;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
8+
import org.springframework.context.annotation.Profile;
9+
import org.springframework.stereotype.Component;
10+
11+
@Profile("consumer")
12+
@Component
13+
@ConditionalOnProperty(name = "app.rabbitmq.example", havingValue = "topic")
14+
public class TopicMessageReceiver {
15+
16+
private static final Logger log = LoggerFactory.getLogger(TopicMessageReceiver.class);
17+
18+
@Value("${app.rabbitmq.topic.creation-queue}")
19+
private String creationQueueName;
20+
21+
@Value("${app.rabbitmq.topic.users-queue}")
22+
private String usersQueueName;
23+
24+
@Value("${app.rabbitmq.topic.events-queue}")
25+
private String eventsQueueName;
26+
27+
@RabbitListener(queues = "${app.rabbitmq.topic.creation-queue}")
28+
public void receiveCreation(TopicMessage message) {
29+
log.info("Message received on queue '{}' : {}", creationQueueName, message);
30+
}
31+
32+
@RabbitListener(queues = "${app.rabbitmq.topic.users-queue}")
33+
public void receiveUser(TopicMessage message) {
34+
log.info("Message received on queue '{}' : {}", usersQueueName, message);
35+
}
36+
37+
@RabbitListener(queues = "${app.rabbitmq.topic.events-queue}")
38+
public void receiveEvent(TopicMessage message) {
39+
log.info("Message received on queue '{}' : {}", eventsQueueName, message);
40+
}
41+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.devdevx.examples.springbootrabbitmq.topic.messaging;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
6+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
7+
import org.springframework.context.annotation.Profile;
8+
import org.springframework.scheduling.annotation.Scheduled;
9+
import org.springframework.stereotype.Component;
10+
11+
import java.util.Random;
12+
13+
@Profile("producer")
14+
@Component
15+
@ConditionalOnProperty(name = "app.rabbitmq.example", havingValue = "topic")
16+
public class TopicMessageSender {
17+
18+
private static final Logger log = LoggerFactory.getLogger(TopicMessageSender.class);
19+
20+
private RabbitTemplate rabbitTemplate;
21+
22+
private int id = 0;
23+
private Random random = new Random();
24+
private String[] routingKeys = {"user.created", "user.updated", "profile.created", "external.api.error"};
25+
26+
public TopicMessageSender(RabbitTemplate rabbitTemplate) {
27+
this.rabbitTemplate = rabbitTemplate;
28+
}
29+
30+
@Scheduled(fixedDelayString = "${app.rabbitmq.delay-ms}")
31+
public void send() {
32+
String routingKey = routingKeys[random.nextInt(routingKeys.length)];
33+
TopicMessage message = new TopicMessage(id++, routingKey);
34+
log.info("Sending message: {} with routing key {}", message, routingKey);
35+
rabbitTemplate.convertAndSend(routingKey, message);
36+
}
37+
}

src/main/resources/application.properties

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,9 @@ app.rabbitmq.headers.exchange=headers-exchange
1313
app.rabbitmq.headers.queue-fast=queue-fast
1414
app.rabbitmq.headers.queue-slow=queue-slow
1515

16+
app.rabbitmq.topic.exchange=topic-exchange
17+
app.rabbitmq.topic.creation-queue=creation-queue
18+
app.rabbitmq.topic.users-queue=users-queue
19+
app.rabbitmq.topic.events-queue=events-queue
20+
1621
app.rabbitmq.delay-ms=10000

0 commit comments

Comments
 (0)