Skip to content

Commit

Permalink
Merge pull request #237 from SolaceProducts/stage-3.1.0
Browse files Browse the repository at this point in the history
# Global Changes
* Solace PubSub+ Messaging API for Java (JCSMP) upgraded to `10.21.0`
* Spring Boot upgraded to `3.1.5`
* Spring Cloud upgraded to `2022.0.4`

# Specific Project Changes
## Solace Spring Cloud Stream Binder
* Added health indicators to capture flow health
  * closes #145
* Added support for Solace PubSub+ partitioned queues
* Fixed potential error channel name collisions
  • Loading branch information
Nephery committed Oct 26, 2023
2 parents 6c24459 + 0987e93 commit 8f9fad9
Show file tree
Hide file tree
Showing 47 changed files with 1,690 additions and 374 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Solace Spring Cloud Build</name>
Expand All @@ -21,7 +21,7 @@
<repoName>SolaceProducts</repoName>

<!-- This is the version of Spring Cloud we have targeted for this build -->
<spring.cloud.version>2022.0.2</spring.cloud.version>
<spring.cloud.version>2022.0.4</spring.cloud.version>

<!-- This is the version of Solace Spring Boot we have targeted for this build -->
<!-- This also dictates the expected version of Spring Boot -->
Expand All @@ -30,9 +30,9 @@

<!-- Override spring-boot version from solace-spring-boot to latest patch version -->
<!-- Remove this if the next version of solace-spring-boot works fine -->
<spring.boot.version>3.0.6</spring.boot.version>
<spring.boot.version>3.1.5</spring.boot.version>

<solace.spring.cloud.stream-starter.version>4.0.1-SNAPSHOT</solace.spring.cloud.stream-starter.version>
<solace.spring.cloud.stream-starter.version>4.1.0-SNAPSHOT</solace.spring.cloud.stream-starter.version>

<solace.integration.test.support.version>1.0.2</solace.integration.test.support.version>
<solace.integration.test.support.fetch_checkout.skip>false</solace.integration.test.support.fetch_checkout.skip>
Expand Down Expand Up @@ -231,7 +231,7 @@
</execution>
</executions>
<configuration>
<source>8</source>
<source>17</source>
<detectJavaApiLink>false</detectJavaApiLink>
</configuration>
</plugin>
Expand Down
7 changes: 4 additions & 3 deletions solace-spring-cloud-bom/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Consult the table below to determine which version of the BOM you need to use:
| 2021.0.4 | 2.4.0 | 2.7.x |
| 2021.0.6 | 2.5.0 | 2.7.x |
| 2022.0.2 | 3.0.0 | 3.0.x |
| 2022.0.4 | 3.1.0 | 3.1.x |

## Including the BOM

Expand All @@ -37,7 +38,7 @@ In addition to showing how to include the BOM, the following snippets also shows
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-bom</artifactId>
<version>3.0.0</version>
<version>3.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -65,7 +66,7 @@ apply plugin: 'io.spring.dependency-management'
dependencyManagement {
imports {
mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:3.0.0"
mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:3.1.0"
}
}
Expand All @@ -77,7 +78,7 @@ dependencies {
### Using it with Gradle 5
```groovy
dependencies {
implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:3.0.0"))
implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:3.1.0"))
implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace")
}
```
4 changes: 2 additions & 2 deletions solace-spring-cloud-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>solace-spring-cloud-bom</artifactId>
<packaging>pom</packaging>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>

<name>Solace Spring Cloud BOM</name>
<description>BOM for Solace Spring Cloud</description>
Expand Down
8 changes: 4 additions & 4 deletions solace-spring-cloud-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -23,8 +23,8 @@
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>

<solace.jcsmp.version>10.20.0</solace.jcsmp.version>
<solace.jms.version>10.20.0</solace.jms.version>
<solace.jcsmp.version>10.21.0</solace.jcsmp.version>
<solace.jms.version>10.21.0</solace.jms.version>
</properties>

<dependencyManagement>
Expand All @@ -36,7 +36,7 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-bom</artifactId>
<version>2.20.0</version>
<version>2.21.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
= Spring Cloud Stream Binder for Solace PubSub+
:revnumber: 4.0.0
:revnumber: 4.1.0
:toc: preamble
:toclevels: 3
:icons: font
:scst-version: 4.0.2
:scst-version: 4.0.4

// Github-Specific Settings
ifdef::env-github[]
Expand Down Expand Up @@ -35,8 +35,6 @@ In Solace, the above setup is called topic-to-queue mapping. So a typical messag
+
NOTE: Round-robin distribution only occurs if the consumer group's queue is configured for non-exclusive access. If the queue has exclusive access, then only one consumer will receive messages.

NOTE: Partitioning is not yet supported by this version of the binder.

IMPORTANT: Since consumer bindings always consumes from queues it is required that Assured Delivery is enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud). Additionally, the client username's client profile must be allowed to send and receive guaranteed messages.

For the sake of brevity, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/[Spring's documentation]. This document will solely focus on discussing components unique to Solace.
Expand Down Expand Up @@ -456,6 +454,17 @@ See: <<Overview>> for more info on how this binder uses topic-to-queue mapping t
+
NOTE: Does not apply when `destinationType=queue`.

==== Solace Connection Health-Check Properties

These properties configure the Solace connection's health indicator configurable under `solace.health-check.connection`.

reconnectAttemptsUntilDown::
The number of session reconnect attempts until the health goes `DOWN`. This will happen regardless if the underlying session is actually still reconnecting. Setting this to `0` will disable this feature.
+
This feature operates independently of the PubSub+ session reconnect feature. Meaning that if PubSub+ session reconnect is configured to retry less than the value given to this property, then this feature effectively does nothing.
+
Default: `0`

=== Solace Message Headers

Solace-defined Spring headers to get/set Solace metadata from/to Spring `Message` headers.
Expand Down Expand Up @@ -639,6 +648,12 @@ The consolidated list of message headers for a batch of messages where the heade
|
| Present and true to indicate when the PubSub+ message payload was null.

| solace_scst_partitionKey
| String
| Write
|
| The partition key for PubSub+ partitioned queues.

| solace_scst_serializedPayload
| Boolean
| Internal Binder Use Only
Expand Down Expand Up @@ -844,6 +859,34 @@ See <<Native Payload Types>> for more info regarding this binder's natively supp

To create a batch of messages, the binder will consume messages from the PubSub+ broker until either a maximum batch size or timeout has been achieved. After which, the binder will compose the batch message and send it to the consumer handler for processing. Both these batching parameters can be configured using the `batchMaxSize` and `batchTimeout` consumer config options.

== Partitioning

[NOTE]
====
The Solace PubSub+ broker supports partitioning natively.
The partitioning abstraction as described in the https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#partitioning[Spring Cloud Stream documentation] is not supported.
====

To publish messages that are intended for partitioned queues, you must provide a partition key by setting the `solace_scst_partitionKey` message header (accessible through the `SolaceBinderHeaders.PARTITION_KEY` constant).

For example:

[source,java]
----
public class MyMessageBuilder {
public Message<String> buildMeAMessage() {
return MessageBuilder.withPayload("payload")
.setHeader(SolaceBinderHeaders.PARTITION_KEY, "partition-key")
.build();
}
}
----

As for consuming messages from partitioned queues, this is handled transparently by the PubSub+ broker. That is to say, consuming messages from a partitioned queue is no different from consuming messages from any other queue.

See https://docs.solace.com/Messaging/Guaranteed-Msg/Queues.htm#partitioned-queues[Partitioned Queues] for more.

== Manual Message Acknowledgment

Message handlers can disable auto-acknowledgement and manually invoke the acknowledgement callback as follows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-parent</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
<relativePath>../../solace-spring-cloud-parent/pom.xml</relativePath>
</parent>

<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>4.0.1-SNAPSHOT</version>
<version>4.1.0-SNAPSHOT</version>
<packaging>jar</packaging>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-parent</artifactId>
<version>3.0.1-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>
<relativePath>../../solace-spring-cloud-parent/pom.xml</relativePath>
</parent>

<artifactId>spring-cloud-stream-binder-solace-core</artifactId>
<version>4.0.1-SNAPSHOT</version>
<version>4.1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Solace Spring Cloud Stream Binder Core</name>
Expand Down Expand Up @@ -106,6 +106,16 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.solace.spring.cloud.stream.binder.health;

import com.solace.spring.cloud.stream.binder.health.contributors.BindingHealthContributor;
import com.solace.spring.cloud.stream.binder.health.contributors.FlowsHealthContributor;
import com.solace.spring.cloud.stream.binder.health.contributors.SolaceBinderHealthContributor;
import com.solace.spring.cloud.stream.binder.health.handlers.SolaceFlowHealthEventHandler;
import com.solace.spring.cloud.stream.binder.health.indicators.FlowHealthIndicator;
import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;

import java.util.Optional;

/**
* <p>Proxy class for the Solace binder to access health components.
* Always use this instead of directly using health components in Solace binder code.</p>
* <p>Allows for the Solace binder to still function correctly without actuator on the classpath.</p>
*/
public class SolaceBinderHealthAccessor {
private final SolaceBinderHealthContributor solaceBinderHealthContributor;
private static final String FLOW_ID_CONCURRENCY_IDX_PREFIX = "flow-";

public SolaceBinderHealthAccessor(SolaceBinderHealthContributor solaceBinderHealthContributor) {
this.solaceBinderHealthContributor = solaceBinderHealthContributor;
}

public void addFlow(String bindingName, int concurrencyIdx, FlowReceiverContainer flowReceiverContainer) {
FlowHealthIndicator flowHealthIndicator = new FlowHealthIndicator();
Optional.ofNullable(solaceBinderHealthContributor.getSolaceBindingsHealthContributor())
.map(b -> b.getContributor(bindingName))
.orElseGet(() -> {
BindingHealthContributor newBindingHealth = new BindingHealthContributor(new FlowsHealthContributor());
solaceBinderHealthContributor.getSolaceBindingsHealthContributor()
.addBindingContributor(bindingName, newBindingHealth);
return newBindingHealth;
})
.getFlowsHealthContributor()
.addFlowContributor(createFlowIdFromConcurrencyIdx(concurrencyIdx), flowHealthIndicator);
flowReceiverContainer.setEventHandler(new SolaceFlowHealthEventHandler(
flowReceiverContainer.getXMLMessageMapper(),
flowReceiverContainer.getId().toString(),
flowHealthIndicator));
}

public void removeFlow(String bindingName, int concurrencyIdx) {
Optional.ofNullable(solaceBinderHealthContributor.getSolaceBindingsHealthContributor())
.map(b -> b.getContributor(bindingName))
.map(BindingHealthContributor::getFlowsHealthContributor)
.ifPresent(b -> b.removeFlowContributor(createFlowIdFromConcurrencyIdx(concurrencyIdx)));
}

private String createFlowIdFromConcurrencyIdx(int concurrencyIdx) {
return FLOW_ID_CONCURRENCY_IDX_PREFIX + concurrencyIdx;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.solace.spring.cloud.stream.binder.health.base;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.lang.Nullable;

import java.lang.reflect.InvocationTargetException;
import java.util.Optional;

@NoArgsConstructor
public class SolaceHealthIndicator implements HealthIndicator {
private static final String STATUS_RECONNECTING = "RECONNECTING";
private static final String INFO = "info";
private static final String RESPONSE_CODE = "responseCode";
@Setter(AccessLevel.PACKAGE)
private volatile Health health;
private static final Log logger = LogFactory.getLog(SolaceHealthIndicator.class);

private static void logDebugStatus(String status) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Solace connection/flow status is %s", status));
}
}
protected void healthUp() {
health = Health.up().build();
logDebugStatus(String.valueOf(Status.UP));
}
protected <T> void healthReconnecting(@Nullable T eventArgs) {
health = addEventDetails(Health.status(STATUS_RECONNECTING), eventArgs).build();
logDebugStatus(STATUS_RECONNECTING);
}

protected <T> void healthDown(@Nullable T eventArgs) {
health = addEventDetails(Health.down(), eventArgs).build();
logDebugStatus(String.valueOf(Status.DOWN));
}

public <T> Health.Builder addEventDetails(Health.Builder builder, @Nullable T eventArgs) {
if (eventArgs == null) {
return builder;
}

try {
Optional.ofNullable(eventArgs.getClass().getMethod("getException").invoke(eventArgs))
.ifPresent(ex -> builder.withException((Throwable) ex));
Optional.of(eventArgs.getClass().getMethod("getResponseCode").invoke(eventArgs))
.filter(c -> ((int) c) != 0)
.ifPresent(c -> builder.withDetail(RESPONSE_CODE, c));
Optional.ofNullable(eventArgs.getClass().getMethod("getInfo").invoke(eventArgs))
.filter(t -> StringUtils.isNotBlank(String.valueOf(t)))
.ifPresent(info -> builder.withDetail(INFO, info));
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e);
}

return builder;
}

@Override
public Health health() {
return health;
}
}
Loading

0 comments on commit 8f9fad9

Please sign in to comment.