-
Notifications
You must be signed in to change notification settings - Fork 19
Spring cloud stream functional bindings preview #828
Spring cloud stream functional bindings preview #828
Conversation
77d0da9
to
76347aa
Compare
0df5f5d
to
2cf79a5
Compare
14ef2b0
to
008aedd
Compare
012c0df
to
2b11be4
Compare
e6d5e44
to
f91a867
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
25f28de
to
d3ee169
Compare
6baaf04
to
a48f09d
Compare
a48f09d
to
8d71a23
Compare
@Bean(INTERNAL_CHANNEL_PREFIX + ExampleConnectorChannels.EXAMPLE_CONNECTOR_CONSUMER) | ||
@Override | ||
public SubscribableChannel exampleConnectorConsumer() { | ||
return MessageChannels.publishSubscribe(ExampleConnectorChannels.EXAMPLE_CONNECTOR_CONSUMER).get(); | ||
} | ||
|
||
@Bean(INTERNAL_CHANNEL_PREFIX + HeadersConnectorChannels.HEADERS_CONNECTOR_CONSUMER) | ||
@Override | ||
public SubscribableChannel headersConnectorConsumer() { | ||
return MessageChannels.publishSubscribe(HeadersConnectorChannels.HEADERS_CONNECTOR_CONSUMER).get(); | ||
} | ||
|
||
@Bean(INTERNAL_CHANNEL_PREFIX + MoviesDescriptionConnectorChannels.MOVIES_DESCRIPTION_CONSUMER) | ||
@Override | ||
public SubscribableChannel moviesDescriptionConsumer() { | ||
return MessageChannels.publishSubscribe(MoviesDescriptionConnectorChannels.MOVIES_DESCRIPTION_CONSUMER).get(); | ||
} | ||
|
||
@Bean(INTERNAL_CHANNEL_PREFIX + MultiInstanceConnector.Channels.CHANNEL) | ||
@Override | ||
public SubscribableChannel miCloudConnectorInput() { | ||
return MessageChannels.publishSubscribe(MultiInstanceConnector.Channels.CHANNEL).get(); | ||
} | ||
|
||
@Bean(INTERNAL_CHANNEL_PREFIX + TestBpmnErrorConnector.Channels.CHANNEL) | ||
@Override | ||
public SubscribableChannel testBpmnErrorConnectorInput() { | ||
return MessageChannels.publishSubscribe(TestBpmnErrorConnector.Channels.CHANNEL).get(); | ||
} | ||
|
||
@Bean(INTERNAL_CHANNEL_PREFIX + TestErrorConnector.Channels.CHANNEL) | ||
@Override | ||
public SubscribableChannel testErrorConnectorInput() { | ||
return MessageChannels.publishSubscribe(TestErrorConnector.Channels.CHANNEL).get(); | ||
} | ||
|
||
@Bean(INTERNAL_CHANNEL_PREFIX + RestConnector.Channels.POST) | ||
@Override | ||
public SubscribableChannel restConnectorPost() { | ||
return MessageChannels.publishSubscribe(RestConnector.Channels.POST).get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's refactor this code using functional channel bindings.
@@ -56,7 +55,12 @@ public MultiInstanceConnector( | |||
this.connectorProperties = connectorProperties; | |||
} | |||
|
|||
@StreamListener(value = Channels.CHANNEL) | |||
@Override | |||
public Void apply(IntegrationRequest event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The apply
method can be implemented directly.
@@ -48,7 +46,12 @@ public RestConnector(IntegrationResultSender integrationResultSender, ConnectorP | |||
this.connectorProperties = connectorProperties; | |||
} | |||
|
|||
@StreamListener(Channels.POST) | |||
@Override | |||
public Void apply(IntegrationRequest event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The apply
method can be implemented directly.
SubscribableChannel testBpmnErrorConnectorInput(); | ||
} | ||
|
||
@StreamListener(value = Channels.CHANNEL) | ||
@Override | ||
public Void apply(IntegrationRequest event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The apply
method can be implemented directly.
@StreamListener(value = Channels.CHANNEL) | ||
public void handle(IntegrationRequest integrationRequest) throws InterruptedException { | ||
@Override | ||
public Void apply(IntegrationRequest event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The apply
method can be implemented directly.
public Void apply(IntegrationRequest event) { | ||
performTask(event); | ||
return null; | ||
} | ||
|
||
public void performTask(IntegrationRequest event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The apply method can be implemented directly or via separate Connector function bean that uses ExampleConnector consumer as a delegate.
@Component | ||
@EnableBinding(ExampleConnectorChannels.class) | ||
public class ExampleConnector { | ||
@ConnectorBinding(input = ExampleConnectorChannels.EXAMPLE_CONNECTOR_CONSUMER, condition = "", outputHeader = "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to reset outputHeader
attribute?
The changes were merged in PR #875 |
This PR fixes Activiti/Activiti#4196 replacing the deprecated annotations from
spring-cloud-stream
with the new functional one provided byspring-cloud-function
.The annotation
@StreamListener
and@EnableBindings
has been replaced with custom annotations defined in Activiti-Cloud#910:@FunctionalBinding
@ConditionalFunctionalBinding
@ConnectorBinding