Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update aqmp config #310

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 26 additions & 52 deletions partials/AmqpConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;

@Configuration
public class Config {
Expand All @@ -33,16 +27,22 @@ public class Config {
@Value("${amqp.broker.password}")
private String password;

{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
@Value("${amqp.exchange.{{- channelName -}}}")

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;

{% endif %}{% endfor %}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Value("${amqp.queue.{{- channelName -}}}")
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endif %}

{% if channel.hasPublish() %}
@Value("${amqp.{{- channelName -}}.queue}")
private String {{channelName}}Queue;
{% endif %}

{% endif %}{% endfor %}
{% endfor %}

@Bean
public ConnectionFactory connectionFactory() {
Expand All @@ -53,65 +53,39 @@ public ConnectionFactory connectionFactory() {
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public Declarables exchanges() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}

@Bean
public Declarables queues() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}

// consumer

@Autowired
MessageHandlerService messageHandlerService;
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}

@Bean
public IntegrationFlow {{channelName | camelCase}}Flow() {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory(), {{channelName}}Queue))
.handle(messageHandlerService::handle{{channelName | upperFirst}})
.get();
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
{% endif %}{% endfor %}

// publisher

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}

@Bean
public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() {
return new DirectChannel();
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
@ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel")
public AmqpOutboundEndpoint {{channelName | camelCase}}Outbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setExchangeName({{channelName}}Exchange);
outbound.setRoutingKey("#");
return outbound;
public AmqpTemplate template() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
{% endif %}{% endfor %}
}
{% endmacro %}
43 changes: 43 additions & 0 deletions partials/AmqpPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{% macro amqpPublisher(asyncapi, params) %}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}


@Service
public class PublisherService {
@Autowired
private RabbitTemplate template;

{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endif %}
{% endfor %}

{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
{% if channel.hasSubscribe() %}
{%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}
public void {{channel.subscribe().id() | camelCase}}(){
{{schemaName}} {{channelName}}Payload = new {{schemaName}}();
template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload);
}

{% endif %}
{% endfor %}

}

{% endmacro %}
1 change: 1 addition & 0 deletions template/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ repositories {
dependencies {
{%- if asyncapi | isProtocol('amqp') %}
implementation('org.springframework.integration:spring-integration-amqp')
implementation('org.springframework.integration:spring-integration-amqp')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's doubled?

{% endif -%}
{%- if asyncapi | isProtocol('mqtt') %}
implementation('org.springframework.integration:spring-integration-mqtt')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public void run(String... args) {
{%- for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}(){% else %}"Hello World from {{channelName}}"{% endif %});
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}()
{% elif asyncapi | isProtocol('amqp') %}{% else %}"Hello World from {{channelName}}"{% endif %});
{% endfor -%}
{% endif -%}
{%- endfor %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
{%- endif %}
{%- endfor %}
{% endif %}
{% if asyncapi | isProtocol('amqp') and hasPublish %}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
@Service
public class MessageHandlerService {

Expand All @@ -52,9 +62,21 @@ public class MessageHandlerService {
}
{%- endif %}
{% endfor %}

{% elif asyncapi | isProtocol('amqp') %}
{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
{% if channel.hasPublish() %}
{%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
@RabbitListener(queues = "${amqp.{{- channelName -}}.queue}")
public void {{channel.publish().id() | camelCase}}({{schemaName}} {{channelName}}Payload ){
LOGGER.info("Message received from {{- channelName -}} : " + {{channelName}}Payload);
}
{% endif %}
{% endfor %}

{% else %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{% if channel.hasPublish() %}
{% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %}
* {{line | safe}}{% endfor %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package {{ params['userJavaPackage'] }}.service;
{%- from "partials/CommonPublisher.java" import commonPublisher -%}
{%- from "partials/KafkaPublisher.java" import kafkaPublisher -%}
{%- from "partials/AmqpPublisher.java" import amqpPublisher -%}
{%- if asyncapi | isProtocol('kafka') -%}
{{- kafkaPublisher(asyncapi, params) -}}
{%- elif asyncapi | isProtocol('amqp') -%}
{{- amqpPublisher(asyncapi, params) -}}
{%- else -%}
{{- commonPublisher(asyncapi) -}}
{%- endif -%}
22 changes: 8 additions & 14 deletions template/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,21 @@
{%- endif -%}
{%- endfor -%}

{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %}
{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %}
amqp:
broker: {% for line in server.description() | splitByLines %}
# {{line | safe}}{% endfor %}
host: {{server.url() | replace(':{port}', '') }}
host: {% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %}
port: {% if server.variable('port') %}{{server.variable('port').defaultValue()}}{% endif %}
username:
username: {% if server.variable('username') %}{{server.variable('username').defaultValue()}}{% endif %}
password:
exchange:
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() and channel.subscribe().binding('amqp') %}
{{channelName}}: {{channel.subscribe().binding('amqp').exchange.name}}
{% endif %}
{% endfor %}
queue:
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() and channel.publish().binding('amqp') %}
{{channelName}}: {{channel.publish().binding('amqp').queue.name}}
{% endif %}
{{channelName}}:
{% if channel.hasSubscribe() %} exchange: {{channel.subscribe().binding('amqp').exchange.name}} {% endif %}
{% if channel.hasSubscribe() %} routingKey: {{channel.subscribe().binding('amqp').routingKey}}{% endif %}
{% if channel.hasPublish() %} queue: {{channel.publish().binding('amqp').queue.name}}{% endif %}
{% endfor %}
{% endif %}
{% endif %}

{% if server.protocol() == 'mqtt' %}
mqtt:
Expand Down
2 changes: 2 additions & 0 deletions tests/__snapshots__/kafka.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ import org.springframework.messaging.handler.annotation.Payload;

import com.asyncapi.model.LightMeasuredPayload;


@Service
public class MessageHandlerService {

Expand All @@ -154,6 +155,7 @@ public class MessageHandlerService {
}



}
"
`;
Expand Down
14 changes: 8 additions & 6 deletions tests/__snapshots__/mqtt.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;


@Service
public class MessageHandlerService {

private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class);



/**
* The topic on which measured values may be produced and consumed.
*/
Expand All @@ -223,11 +224,11 @@ public class MessageHandlerService {
}












}
Expand Down Expand Up @@ -1058,13 +1059,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;


@Service
public class MessageHandlerService {

private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class);



/**
* The topic on which measured values may be produced and consumed.
*/
Expand All @@ -1074,7 +1076,7 @@ public class MessageHandlerService {
}





}
Expand Down
2 changes: 2 additions & 0 deletions tests/__snapshots__/oneOf.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.springframework.messaging.handler.annotation.Payload;
import com.asyncapi.model.AnonymousSchema1;
import com.asyncapi.model.AnonymousSchema7;


@Service
public class MessageHandlerService {

Expand All @@ -32,6 +33,7 @@ public class MessageHandlerService {
}



}
"
`;
Expand Down
2 changes: 2 additions & 0 deletions tests/__snapshots__/parameters.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.springframework.messaging.handler.annotation.Payload;

import com.asyncapi.model.LightMeasuredPayload;


@Service
public class MessageHandlerService {

Expand All @@ -31,6 +32,7 @@ public class MessageHandlerService {
}



}
"
`;
Expand Down