diff --git a/bom/pom.xml b/bom/pom.xml index 4c60da75d48..ff2d5a235eb 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -1137,6 +1137,11 @@ helidon-messaging-aq ${helidon.version} + + io.helidon.messaging.wls-jms + helidon-messaging-wls-jms + ${helidon.version} + io.helidon.microprofile.tests diff --git a/docs/mp/reactivemessaging/weblogic.adoc b/docs/mp/reactivemessaging/weblogic.adoc new file mode 100644 index 00000000000..710b2ceb43d --- /dev/null +++ b/docs/mp/reactivemessaging/weblogic.adoc @@ -0,0 +1,215 @@ +/////////////////////////////////////////////////////////////////////////////// + + Copyright (c) 2022 Oracle and/or its affiliates. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +/////////////////////////////////////////////////////////////////////////////// + += Weblogic JMS Connector +:toc: +:toc-placement: preamble +:description: Reactive Messaging support for Weblogic JMS in Helidon MP +:keywords: helidon, mp, messaging, jms, weblogic, wls, thin +:feature-name: Weblogic JMS connector +:microprofile-bundle: false +:rootdir: {docdir}/../.. + +include::{rootdir}/includes/mp.adoc[] +include::{rootdir}/includes/dependencies.adoc[] + +[source,xml] +---- + + io.helidon.messaging.wls-jms + helidon-messaging-wls-jms + +---- + +== Reactive Weblogic JMS Connector + +Connecting streams to Weblogic JMS with Reactive Messaging couldn't be easier. +This connector extends Helidon JMS connector with special handling for Weblogic T3 thin client. + +Connecting to Weblogic JMS connection factories requires proprietary T3 thin client library which can be obtained from +Weblogic installation. + +WARNING: Avoid placing `wlthint3client.jar` on Helidon classpath, client library location needs to be +configured and loaded by Helidon messaging connector. + +=== Configuration + +Connector name: `helidon-weblogic-jms` + +.Attributes +|=== +|`jms-factory` | JNDI name of the JMS factory configured in Weblogic +|`url` | t3, t3s or http address of Weblogic server +|`thin-jar` | Filepath to the Weblogic thin T3 client jar(wlthint3client.jar), can be usually found within Weblogic installation +`WL_HOME/server/lib/wlthint3client.jar` +|`principal` | Weblogic initial context principal(user) +|`credential` | Weblogic initial context credential(password) +|`type` | Possible values are: `queue`, `topic`. Default value is: `topic` +|`destination` | Queue or topic name in WebLogic CDI Syntax(CDI stands for Create Destination Identifier) +|`acknowledge-mode` |Possible values are: `AUTO_ACKNOWLEDGE`- session automatically acknowledges a client’s receipt of a message, +`CLIENT_ACKNOWLEDGE` - receipt of a message is acknowledged only when `Message.ack()` is called manually, +`DUPS_OK_ACKNOWLEDGE` - session lazily acknowledges the delivery of messages. Default value: `AUTO_ACKNOWLEDGE` +|`transacted` | Indicates whether the session will use a local transaction. Default value: `false` +|`message-selector` | JMS API message selector expression based on a subset of the SQL92. +Expression can only access headers and properties, not the payload. +|`client-id` | Client identifier for JMS connection. +|`client-id` | Client identifier for JMS connection. +|`durable` | True for creating durable consumer (only for topic). Default value: `false` +|`subscriber-name` | Subscriber name for durable consumer used to identify subscription. +|`non-local` | If true then any messages published to the topic using this session's connection, +or any other connection with the same client identifier, +will not be added to the durable subscription. Default value: `false` +|`named-factory` | Select in case factory is injected as a named bean or configured with name. +|`poll-timeout` | Timeout for polling for next message in every poll cycle in millis. Default value: `50` +|`period-executions` | Period for executing poll cycles in millis. Default value: `100` +|`session-group-id` | When multiple channels share same `session-group-id`, they share same JMS session and same JDBC connection as well. +|`producer.unit-of-order` | All messages from same unit of order will be processed sequentially in the order they were created. +|=== + +Configuration is straight forward. Use JNDI for localizing and configuring of JMS ConnectionFactory +from Weblogic. Notice the destination property which is used to define queue with +https://docs.oracle.com/cd/E24329_01/web.1211/e24387/lookup.htm#JMSPG915[WebLogic CDI Syntax](CDI stands for Create Destination Identifier). + +[source,yaml] +.Example config: +---- +mp: + messaging: + connector: + helidon-weblogic-jms: + # JMS factory configured in Weblogic + jms-factory: jms/TestConnectionFactory + # Path to the WLS Thin T3 client jar + thin-jar: wlthint3client.jar + url: t3://localhost:7001 + incoming: + from-wls: + connector: helidon-weblogic-jms + # WebLogic CDI Syntax(CDI stands for Create Destination Identifier) + destination: ./TestJMSModule!TestQueue + outgoing: + to-wls: + connector: helidon-weblogic-jms + destination: ./TestJMSModule!TestQueue +---- + +When configuring destination with WebLogic CDI, the following syntax needs to be applied: + +.Non-Distributed Destinations +`jms-server-name/jms-module-name!destination-name` + +In our example we are replacing jms-server-name with `.` as we don’t have to look up the server we are connected to. + +.Uniform Distributed Destinations (UDDs) +`jms-server-name/jms-module-name!jms-server-name@udd-name` + +Destination for UDD doesn't have `./` prefix, because distributed destinations can be served by multiple servers within a cluster. + +=== Consuming + +[source,java] +.Consuming one by one unwrapped value: +---- +@Incoming("from-wls") +public void consumeWls(String msg) { + System.out.println("Weblogic says: " + msg); +} +---- + +[source,java] +.Consuming one by one, manual ack: +---- +@Incoming("from-wls") +@Acknowledgment(Acknowledgment.Strategy.MANUAL) +public CompletionStage consumewls(JmsMessage msg) { + System.out.println("Weblogic says: " + msg.getPayload()); + return msg.ack(); +} +---- + +=== Producing + +[source,java] +.Producing to Weblogic JMS: +---- +@Outgoing("to-wls") +public PublisherBuilder produceToWls() { + return ReactiveStreams.of("test1", "test2"); +} +---- + +[source,java] +.Example of more advanced producing to Weblogic JMS: +---- +@Outgoing("to-wls") +public PublisherBuilder produceToJms() { + return ReactiveStreams.of("test1", "test2") + .map(s -> JmsMessage.builder(s) + .correlationId(UUID.randomUUID().toString()) + .property("stringProp", "cool property") + .property("byteProp", 4) + .property("intProp", 5) + .onAck(() -> System.out.println("Acked!")) + .build()); +} +---- +[source,java] +.Example of even more advanced producing to Weblogic JMS with custom mapper: +---- +@Outgoing("to-wls") +public PublisherBuilder produceToJms() { + return ReactiveStreams.of("test1", "test2") + .map(s -> JmsMessage.builder(s) + .customMapper((p, session) -> { + TextMessage textMessage = session.createTextMessage(p); + textMessage.setStringProperty("custom-mapped-property", "XXX" + p); + return textMessage; + }) + .build() + ); +} +---- + +=== Secured t3 over SSL(t3s) +For initiating SSL secured t3 connection, trust keystore with WLS public certificate is needed. +Standard WLS installation has pre-configured Demo trust store: `WL_HOME/server/lib/DemoTrust.jks`, +we can store it locally for connecting WLS over t3s. + +[source,yaml] +.Example config: +---- +mp: + messaging: + connector: + helidon-weblogic-jms: + jms-factory: jms/TestConnectionFactory + thin-jar: wlthint3client.jar + # Notice t3s protocol is used + url: t3s://localhost:7002 +---- + +Helidon application needs to be aware about our WLS SSL public certificate. + +[source,bash] +.Running example with WLS truststore +---- +java --add-opens=java.base/java.io=ALL-UNNAMED \ +-Djavax.net.ssl.trustStore=DemoTrust.jks \ +-Djavax.net.ssl.trustStorePassword=DemoTrustKeyStorePassPhrase \ +-jar ./target/helidon-wls-jms-demo.jar +---- \ No newline at end of file diff --git a/docs/sitegen.yaml b/docs/sitegen.yaml index f60c6c5367b..447b7f53726 100644 --- a/docs/sitegen.yaml +++ b/docs/sitegen.yaml @@ -409,6 +409,7 @@ backend: - "kafka.adoc" - "jms.adoc" - "aq.adoc" + - "weblogic.adoc" - "mock.adoc" - type: "PAGE" title: "REST Client" diff --git a/examples/messaging/pom.xml b/examples/messaging/pom.xml index 297f5f0521b..bc079601fd7 100644 --- a/examples/messaging/pom.xml +++ b/examples/messaging/pom.xml @@ -39,5 +39,6 @@ jms-websocket-mp jms-websocket-se oracle-aq-websocket-mp + weblogic-jms-mp diff --git a/examples/messaging/weblogic-jms-mp/README.md b/examples/messaging/weblogic-jms-mp/README.md new file mode 100644 index 00000000000..56414c125ef --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/README.md @@ -0,0 +1,24 @@ +# Helidon Messaging with Oracle Weblogic Example + +## Prerequisites +* JDK 17+ +* Maven +* Docker +* Account at https://container-registry.oracle.com/ with accepted Oracle Standard Terms and Restrictions for Weblogic. + +## Run Weblogic in docker +1. You will need to do a docker login to Oracle container registry with account which previously + accepted Oracle Standard Terms and Restrictions for Weblogic: + `docker login container-registry.oracle.com` +2. Run `bash buildAndRunWeblogic.sh` to build and run example Weblogic container. + * After example JMS resources are deployed, Weblogic console should be available at http://localhost:7001/console with `admin`/`Welcome1` +3. To obtain wlthint3client.jar necessary for connecting to Weblogic execute + `bash extractThinClientLib.sh`, file will be copied to `./weblogic` folder. + +## Build & Run +To run Helidon with thin client, flag `--add-opens=java.base/java.io=ALL-UNNAMED` is needed to +open java.base module to thin client internals. +1. `mvn clean package` +2. `java --add-opens=java.base/java.io=ALL-UNNAMED --enable-preview -jar ./target/weblogic-jms-mp.jar` +3. Visit http://localhost:8080 and try to send and receive messages over Weblogic JMS queue. + diff --git a/examples/messaging/weblogic-jms-mp/buildAndRunWeblogic.sh b/examples/messaging/weblogic-jms-mp/buildAndRunWeblogic.sh new file mode 100644 index 00000000000..11317c060af --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/buildAndRunWeblogic.sh @@ -0,0 +1,53 @@ +#!/bin/bash -e +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cd ./weblogic + +# Attempt Oracle container registry login. +# You need to accept the licence agreement for Weblogic Server at https://container-registry.oracle.com/ +# Search for weblogic and accept the Oracle Standard Terms and Restrictions +docker login container-registry.oracle.com + +docker build -t wls-admin . + +docker run --rm -d \ + -p 7001:7001 \ + -p 7002:7002 \ + --name wls-admin \ + --hostname wls-admin \ + wls-admin + +printf "Waiting for WLS to start ." +while true; +do + if docker logs wls-admin | grep -q "Server state changed to RUNNING"; then + break; + fi + printf "." + sleep 5 +done +printf " [READY]\n" + +echo Deploying example JMS queues +docker exec wls-admin \ +/bin/bash \ +/u01/oracle/wlserver/common/bin/wlst.sh \ +/u01/oracle/setupTestJMSQueue.py; + +echo Example JMS queues deployed! +echo Console avaiable at http://localhost:7001/console with admin/Welcome1 +echo 'Stop Weblogic server with "docker stop wls-admin"' \ No newline at end of file diff --git a/examples/messaging/weblogic-jms-mp/extractThinClientLib.sh b/examples/messaging/weblogic-jms-mp/extractThinClientLib.sh new file mode 100644 index 00000000000..b5d6ff1b9ec --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/extractThinClientLib.sh @@ -0,0 +1,22 @@ +#!/bin/bash -e +# +# Copyright (c) 2018, 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# Copy wlthint3client.jar from docker container +docker cp wls-admin:/u01/oracle/wlserver/server/lib/wlthint3client.jar ./weblogic/wlthint3client.jar +# Copy DemoTrust.jks from docker container(needed if you want to try t3s protocol) +docker cp wls-admin:/u01/oracle/wlserver/server/lib/DemoTrust.jks ./weblogic/DemoTrust.jks \ No newline at end of file diff --git a/examples/messaging/weblogic-jms-mp/pom.xml b/examples/messaging/weblogic-jms-mp/pom.xml new file mode 100644 index 00000000000..7b50359c23c --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/pom.xml @@ -0,0 +1,88 @@ + + + + 4.0.0 + + io.helidon.applications + helidon-mp + 4.0.0-SNAPSHOT + ../../../applications/mp/pom.xml + + io.helidon.examples.messaging.wls + weblogic-jms-mp + 1.0-SNAPSHOT + weblogic-jms-mp + + + + io.helidon.microprofile.bundles + helidon-microprofile + + + + io.helidon.microprofile.messaging + helidon-microprofile-messaging + + + io.helidon.messaging.wls-jms + helidon-messaging-wls-jms + + + + org.glassfish.jersey.media + jersey-media-sse + + + + org.jboss + jandex + runtime + true + + + jakarta.activation + jakarta.activation-api + runtime + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-libs + + + + + org.jboss.jandex + jandex-maven-plugin + + + make-index + + + + + + diff --git a/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/FrankResource.java b/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/FrankResource.java new file mode 100644 index 00000000000..1e0062b14d7 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/FrankResource.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.examples.messaging.mp; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import io.helidon.messaging.connectors.jms.JmsMessage; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.sse.Sse; +import jakarta.ws.rs.sse.SseBroadcaster; +import jakarta.ws.rs.sse.SseEventSink; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.glassfish.jersey.media.sse.OutboundEvent; + +/** + * SSE Jax-Rs resource for message publishing and consuming. + */ +@Path("/frank") +@ApplicationScoped +public class FrankResource { + + @Inject + @Channel("to-wls") + private Emitter emitter; + private SseBroadcaster sseBroadcaster; + + /** + * Consuming JMS messages from Weblogic and sending them to the client over SSE. + * + * @param msg dequeued message + * @return completion stage marking end of the processing + */ + @Incoming("from-wls") + public CompletionStage receive(JmsMessage msg) { + if (sseBroadcaster == null) { + System.out.println("No SSE client subscribed yet: " + msg.getPayload()); + return CompletableFuture.completedStage(null); + } + sseBroadcaster.broadcast(new OutboundEvent.Builder().data(msg.getPayload()).build()); + return CompletableFuture.completedStage(null); + } + + /** + * Send message to Weblogic JMS queue. + * + * @param msg message to be sent + */ + @POST + @Path("/send/{msg}") + public void send(@PathParam("msg") String msg) { + emitter.send(msg); + } + + /** + * Register SSE client to listen for messages coming from Weblogic JMS. + * + * @param eventSink client sink + * @param sse SSE context + */ + @GET + @Path("sse") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void listenToEvents(@Context SseEventSink eventSink, @Context Sse sse) { + if (sseBroadcaster == null) { + sseBroadcaster = sse.newBroadcaster(); + } + sseBroadcaster.register(eventSink); + } +} diff --git a/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/package-info.java b/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/package-info.java new file mode 100644 index 00000000000..372cec04320 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/src/main/java/io/helidon/examples/messaging/mp/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Helidon MP Reactive Messaging with Weblogic JMS. + */ +package io.helidon.examples.messaging.mp; diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/META-INF/beans.xml b/examples/messaging/weblogic-jms-mp/src/main/resources/META-INF/beans.xml new file mode 100644 index 00000000000..80727f9c7fd --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/favicon.ico b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/favicon.ico new file mode 100644 index 00000000000..d91659fdb53 Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/favicon.ico differ diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-1.png b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-1.png new file mode 100644 index 00000000000..bbba0aef8a6 Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-1.png differ diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-2.png b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-2.png new file mode 100644 index 00000000000..0b1096b07c1 Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/arrow-2.png differ diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/cloud.png b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/cloud.png new file mode 100644 index 00000000000..3e04833c08b Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/cloud.png differ diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/frank.png b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/frank.png new file mode 100644 index 00000000000..51a13d8db8b Binary files /dev/null and b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/img/frank.png differ diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/index.html b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/index.html new file mode 100644 index 00000000000..0000f2e203e --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/index.html @@ -0,0 +1,101 @@ + + + + + + + Helidon Reactive Messaging + + + + + +
+
+
+ +
+
Send
+
+
+
+
+
REST call /frank/send/{msg}
+
+
+
SSE messages received
+
+
+
+
+ + + + + \ No newline at end of file diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/main.css b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/main.css new file mode 100644 index 00000000000..496dde4fe6c --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/src/main/resources/WEB/main.css @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#root { + background-color: #36ABF2; + font-family: Roboto,sans-serif; + color: #fff; + position: absolute; + overflow-x: hidden; + -ms-overflow-style: none; /* Internet Explorer 10+ */ + scrollbar-width: none; /* Firefox */ + top: 0; + left: 0; + width: 100%; + height: 100%; +} +#root::-webkit-scrollbar { + display: none; /* Safari and Chrome */ +} + +#helidon { + width: 509px; + height: 273px; + position: relative; + left: -509px; + z-index: 4; + background: url('img/frank.png'); +} + +#rest-tip { + position: relative; + top: -80px; + left: 160px; +} + +#rest-tip-arrow { + width: 205px; + height: 304px; + z-index: 4; + top: -20px; + background: url('img/arrow-1.png'); +} +#rest-tip-label { + position: absolute; + white-space: nowrap; + font-size: 18px; + font-weight: bold; + z-index: 4; + left: -60px; +} + +#sse-tip { + position: absolute; + overflow: hidden; + display: flex; + width: auto; + height: auto; + top: 5%; + right: 10%; + z-index: 0; +} + +#sse-tip-arrow { + position: relative; + top: -30px; + width: 296px; + height: 262px; + z-index: 4; + background: url('img/arrow-2.png'); +} +#sse-tip-label { + position: relative; + white-space: nowrap; + font-size: 18px; + font-weight: bold; + z-index: 4; +} + +#producer { + float: left; + position: relative; + width: 300px; + height: 100%; + margin: 50px; + padding: 10px; + z-index: 99; +} + +#msgBox { + position: absolute; + width: 300px; + top: 25%; + right: 3%; + height: 100%; + margin: 50px; + padding: 10px; + z-index: 20; +} + +#input { + width: 210px; + height: 22px; + top: 58px; + left: 30px; + background-color: white; + border-radius: 10px; + border-style: solid; + border-color: white; + position: absolute; + z-index: 10; +} + +#inputCloud { + position: relative; + width: 310px; + height: 150px; + background: url('img/cloud.png'); +} + +#msg { + background-color: #D2EBFC; + color: #1A9BF4; + border-radius: 10px; + width: 300px; + height: 50px; + margin: 5px; + display: flex; + padding-left: 10px; + justify-content: center; + align-items: center; + z-index: 99; +} + +#submit { + font-weight: bold; + background-color: aqua; + color: #1A9BF4; + border-radius: 12px; + width: 100px; + height: 30px; + display: flex; + justify-content: center; + align-items: center; + margin: 5px; + cursor: pointer; +} + +#snippet { + position: absolute; + top: 15%; + left: 30%; + width: 40%; + z-index: 5; +} + +.hljs { + border-radius: 10px; + font-size: 12px; +} \ No newline at end of file diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/application.yaml b/examples/messaging/weblogic-jms-mp/src/main/resources/application.yaml new file mode 100644 index 00000000000..28a147e57a5 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/src/main/resources/application.yaml @@ -0,0 +1,44 @@ +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server: + port: 8080 + host: 0.0.0.0 + static: + classpath: + location: /WEB + welcome: index.html + +mp: + messaging: + connector: + helidon-weblogic-jms: + # JMS factory configured in Weblogic + jms-factory: jms/TestConnectionFactory + # Path to the WLS Thin T3 client jar(extract it from docker container with extractThinClientLib.sh) + thin-jar: weblogic/wlthint3client.jar + url: "t3://localhost:7001" + producer.unit-of-order: kec1 + incoming: + from-wls: + connector: helidon-weblogic-jms + # WebLogic CDI Syntax(CDI stands for Create Destination Identifier) + destination: ./TestJMSModule!TestQueue + outgoing: + to-wls: + connector: helidon-weblogic-jms + # Same queue is used for simplifying test case + destination: ./TestJMSModule!TestQueue diff --git a/examples/messaging/weblogic-jms-mp/src/main/resources/logging.properties b/examples/messaging/weblogic-jms-mp/src/main/resources/logging.properties new file mode 100644 index 00000000000..1ac57eb5b92 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/src/main/resources/logging.properties @@ -0,0 +1,30 @@ +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Example Logging Configuration File +# For more information see $JAVA_HOME/jre/lib/logging.properties + +# Send messages to the console +handlers=io.helidon.logging.jul.HelidonConsoleHandler + +# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread +java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n + +# Global logging level. Can be overridden by specific loggers +.level=INFO + +# Component specific log levels +#io.helidon.level=INFO diff --git a/examples/messaging/weblogic-jms-mp/weblogic/Dockerfile b/examples/messaging/weblogic-jms-mp/weblogic/Dockerfile new file mode 100644 index 00000000000..6e79458ebd5 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/weblogic/Dockerfile @@ -0,0 +1,54 @@ +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# ORACLE DOCKERFILES PROJECT +# -------------------------- +# This docker file is customized, originaly taken from https://github.com/oracle/docker-images +# and extends the Oracle WebLogic image by creating a sample domain. +# +# Base image is available at https://container-registry.oracle.com/ +# +FROM container-registry.oracle.com/middleware/weblogic:14.1.1.0-dev-11 + +ENV ORACLE_HOME=/u01/oracle \ + USER_MEM_ARGS="-Djava.security.egd=file:/dev/./urandom" \ + SCRIPT_FILE=/u01/oracle/createAndStartEmptyDomain.sh \ + HEALTH_SCRIPT_FILE=/u01/oracle/get_healthcheck_url.sh \ + PATH=$PATH:${JAVA_HOME}/bin:/u01/oracle/oracle_common/common/bin:/u01/oracle/wlserver/common/bin + +ENV DOMAIN_NAME="${DOMAIN_NAME:-base_domain}" \ + ADMIN_LISTEN_PORT="${ADMIN_LISTEN_PORT:-7001}" \ + ADMIN_NAME="${ADMIN_NAME:-AdminServer}" \ + DEBUG_FLAG=true \ + PRODUCTION_MODE=dev \ + ADMINISTRATION_PORT_ENABLED="${ADMINISTRATION_PORT_ENABLED:-true}" \ + ADMINISTRATION_PORT="${ADMINISTRATION_PORT:-9002}" + +COPY container-scripts/createAndStartEmptyDomain.sh container-scripts/get_healthcheck_url.sh /u01/oracle/ +COPY container-scripts/create-wls-domain.py container-scripts/setupTestJMSQueue.py /u01/oracle/ +COPY properties/domain.properties /u01/oracle/properties/ + +USER root + +RUN chmod +xr $SCRIPT_FILE $HEALTH_SCRIPT_FILE && \ + chown oracle:root $SCRIPT_FILE /u01/oracle/create-wls-domain.py $HEALTH_SCRIPT_FILE + +USER oracle + +HEALTHCHECK --start-period=10s --timeout=30s --retries=3 CMD curl -k -s --fail `$HEALTH_SCRIPT_FILE` || exit 1 +WORKDIR ${ORACLE_HOME} + +CMD ["/u01/oracle/createAndStartEmptyDomain.sh"] \ No newline at end of file diff --git a/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/create-wls-domain.py b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/create-wls-domain.py new file mode 100644 index 00000000000..e24167e1fb1 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/create-wls-domain.py @@ -0,0 +1,104 @@ +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# WebLogic on Docker Default Domain +# +# Domain, as defined in DOMAIN_NAME, will be created in this script. Name defaults to 'base_domain'. +# +# Since : October, 2014 +# Author: monica.riccelli@oracle.com +# ============================================== +domain_name = os.environ.get("DOMAIN_NAME", "base_domain") +admin_name = os.environ.get("ADMIN_NAME", "AdminServer") +admin_listen_port = int(os.environ.get("ADMIN_LISTEN_PORT", "7001")) +domain_path = '/u01/oracle/user_projects/domains/%s' % domain_name +production_mode = os.environ.get("PRODUCTION_MODE", "prod") +administration_port_enabled = os.environ.get("ADMINISTRATION_PORT_ENABLED", "true") +administration_port = int(os.environ.get("ADMINISTRATION_PORT", "9002")) + +print('domain_name : [%s]' % domain_name); +print('admin_listen_port : [%s]' % admin_listen_port); +print('domain_path : [%s]' % domain_path); +print('production_mode : [%s]' % production_mode); +print('admin name : [%s]' % admin_name); +print('administration_port_enabled : [%s]' % administration_port_enabled); +print('administration_port : [%s]' % administration_port); + +# Open default domain template +# ============================ +readTemplate("/u01/oracle/wlserver/common/templates/wls/wls.jar") + +set('Name', domain_name) +setOption('DomainName', domain_name) + +# Set Administration Port +# ======================= +if administration_port_enabled != "false": + set('AdministrationPort', administration_port) + set('AdministrationPortEnabled', 'false') + +# Disable Admin Console +# -------------------- +# cmo.setConsoleEnabled(false) + +# Configure the Administration Server and SSL port. +# ================================================= +cd('/Servers/AdminServer') +set('Name', admin_name) +set('ListenAddress', '') +set('ListenPort', admin_listen_port) +if administration_port_enabled != "false": + create(admin_name, 'SSL') + cd('SSL/' + admin_name) + set('Enabled', 'True') + +# Define the user password for weblogic +# ===================================== +cd(('/Security/%s/User/weblogic') % domain_name) +cmo.setName(username) +cmo.setPassword(password) + +# Write the domain and close the domain template +# ============================================== +setOption('OverwriteDomain', 'true') +setOption('ServerStartMode',production_mode) + +# Create Node Manager +# =================== +#cd('/NMProperties') +#set('ListenAddress','') +#set('ListenPort',5556) +#set('CrashRecoveryEnabled', 'true') +#set('NativeVersionEnabled', 'true') +#set('StartScriptEnabled', 'false') +#set('SecureListener', 'false') +#set('LogLevel', 'FINEST') + +# Set the Node Manager user name and password +# =========================================== +#cd('/SecurityConfiguration/%s' % domain_name) +#set('NodeManagerUsername', username) +#set('NodeManagerPasswordEncrypted', password) + +# Write Domain +# ============ +writeDomain(domain_path) +closeTemplate() + +# Exit WLST +# ========= +exit() diff --git a/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/createAndStartEmptyDomain.sh b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/createAndStartEmptyDomain.sh new file mode 100644 index 00000000000..1d1a3e4eaff --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/createAndStartEmptyDomain.sh @@ -0,0 +1,87 @@ +#!/bin/bash +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# If AdminServer.log does not exists, container is starting for 1st time +# So it should start NM and also associate with AdminServer +# Otherwise, only start NM (container restarted) +########### SIGTERM handler ############ +function _term() { + echo "Stopping container." + echo "SIGTERM received, shutting down the server!" + ${DOMAIN_HOME}/bin/stopWebLogic.sh +} + +########### SIGKILL handler ############ +function _kill() { + echo "SIGKILL received, shutting down the server!" + kill -9 $childPID +} + +# Set SIGTERM handler +trap _term SIGTERM + +# Set SIGKILL handler +trap _kill SIGKILL + +#Define DOMAIN_HOME +export DOMAIN_HOME=/u01/oracle/user_projects/domains/$DOMAIN_NAME +echo "Domain Home is: " $DOMAIN_HOME + +mkdir -p $ORACLE_HOME/properties +# Create Domain only if 1st execution +if [ ! -e ${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log ]; then + echo "Create Domain" + PROPERTIES_FILE=/u01/oracle/properties/domain.properties + if [ ! -e "$PROPERTIES_FILE" ]; then + echo "A properties file with the username and password needs to be supplied." + exit + fi + + # Get Username + USER=`awk '{print $1}' $PROPERTIES_FILE | grep username | cut -d "=" -f2` + if [ -z "$USER" ]; then + echo "The domain username is blank. The Admin username must be set in the properties file." + exit + fi + # Get Password + PASS=`awk '{print $1}' $PROPERTIES_FILE | grep password | cut -d "=" -f2` + if [ -z "$PASS" ]; then + echo "The domain password is blank. The Admin password must be set in the properties file." + exit + fi + + # Create an empty domain + wlst.sh -skipWLSModuleScanning -loadProperties $PROPERTIES_FILE /u01/oracle/create-wls-domain.py + mkdir -p ${DOMAIN_HOME}/servers/${ADMIN_NAME}/security/ + chmod -R g+w ${DOMAIN_HOME} + echo "username=${USER}" >> $DOMAIN_HOME/servers/${ADMIN_NAME}/security/boot.properties + echo "password=${PASS}" >> $DOMAIN_HOME/servers/${ADMIN_NAME}/security/boot.properties + ${DOMAIN_HOME}/bin/setDomainEnv.sh + # Setup JMS examples +# wlst.sh -skipWLSModuleScanning -loadProperties $PROPERTIES_FILE /u01/oracle/setupTestJMSQueue.py +fi + +# Start Admin Server and tail the logs +${DOMAIN_HOME}/startWebLogic.sh +if [ -e ${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log ]; then + echo "${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log" +fi +touch ${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log +tail -f ${DOMAIN_HOME}/servers/${ADMIN_NAME}/logs/${ADMIN_NAME}.log + +childPID=$! +wait $childPID diff --git a/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/get_healthcheck_url.sh b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/get_healthcheck_url.sh new file mode 100644 index 00000000000..5eb3ded88e4 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/get_healthcheck_url.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if [ "$ADMINISTRATION_PORT_ENABLED" = "true" ] ; then + echo "https://{localhost:$ADMINISTRATION_PORT}/weblogic/ready" ; +else + echo "http://{localhost:$ADMIN_LISTEN_PORT}/weblogic/ready" ; +fi diff --git a/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/setupTestJMSQueue.py b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/setupTestJMSQueue.py new file mode 100644 index 00000000000..3269b782ae0 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/weblogic/container-scripts/setupTestJMSQueue.py @@ -0,0 +1,115 @@ +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os.path +import sys + +System.setProperty("weblogic.security.SSL.ignoreHostnameVerification", "true") + +connect("admin","Welcome1","t3://localhost:7001") +adm_name=get('AdminServerName') +sub_deployment_name="TestJMSSubdeployment" +jms_module_name="TestJMSModule" +queue_name="TestQueue" +factory_name="TestConnectionFactory" +jms_server_name="TestJMSServer" + + +def createJMSServer(adm_name, jms_server_name): + cd('/JMSServers') + if (len(ls(returnMap='true')) == 0): + print 'No JMS Server found, creating ' + jms_server_name + cd('/') + cmo.createJMSServer(jms_server_name) + cd('/JMSServers/'+jms_server_name) + cmo.addTarget(getMBean("/Servers/" + adm_name)) + + +def createJMSModule(jms_module_name, adm_name, sub_deployment_name): + print "Creating JMS module " + jms_module_name + cd('/JMSServers') + jms_servers=ls(returnMap='true') + cd('/') + module = create(jms_module_name, "JMSSystemResource") + module.addTarget(getMBean("Servers/"+adm_name)) + cd('/SystemResources/'+jms_module_name) + module.createSubDeployment(sub_deployment_name) + cd('/SystemResources/'+jms_module_name+'/SubDeployments/'+sub_deployment_name) + + list=[] + for i in jms_servers: + list.append(ObjectName(str('com.bea:Name='+i+',Type=JMSServer'))) + set('Targets',jarray.array(list, ObjectName)) + +def getJMSModulePath(jms_module_name): + jms_module_path = "/JMSSystemResources/"+jms_module_name+"/JMSResource/"+jms_module_name + return jms_module_path + +def createJMSQueue(jms_module_name,jms_queue_name): + print "Creating JMS queue " + jms_queue_name + jms_module_path = getJMSModulePath(jms_module_name) + cd(jms_module_path) + cmo.createQueue(jms_queue_name) + cd(jms_module_path+'/Queues/'+jms_queue_name) + cmo.setJNDIName("jms/" + jms_queue_name) + cmo.setSubDeploymentName(sub_deployment_name) + +def createDistributedJMSQueue(jms_module_name,jms_queue_name): + print "Creating distributed JMS queue " + jms_queue_name + jms_module_path = getJMSModulePath(jms_module_name) + cd(jms_module_path) + cmo.createDistributedQueue(jms_queue_name) + cd(jms_module_path+'/DistributedQueues/'+jms_queue_name) + cmo.setJNDIName("jms/" + jms_queue_name) + +def addMemberQueue(udd_name,queue_name): + jms_module_path = getJMSModulePath(jms_module_name) + cd(jms_module_path+'/DistributedQueues/'+udd_name) + cmo.setLoadBalancingPolicy('Round-Robin') + cmo.createDistributedQueueMember(queue_name) + +def createJMSFactory(jms_module_name,jms_fact_name): + print "Creating JMS connection factory " + jms_fact_name + jms_module_path = getJMSModulePath(jms_module_name) + cd(jms_module_path) + cmo.createConnectionFactory(jms_fact_name) + cd(jms_module_path+'/ConnectionFactories/'+jms_fact_name) + cmo.setJNDIName("jms/" + jms_fact_name) + cmo.setSubDeploymentName(sub_deployment_name) + + + +edit() +startEdit() + +print "Server name: "+adm_name + +createJMSServer(adm_name,jms_server_name) +createJMSModule(jms_module_name,adm_name,sub_deployment_name) +createJMSFactory(jms_module_name,factory_name) +createJMSQueue(jms_module_name,queue_name) + +### Unified Distributed Destinations(UDD) example +createDistributedJMSQueue(jms_module_name,"udd_queue") +# Normally member queues would be in different sub-deployments +createJMSQueue(jms_module_name,"ms1@udd_queue") +createJMSQueue(jms_module_name,"ms2@udd_queue") +addMemberQueue("udd_queue", "ms1@udd_queue") +addMemberQueue("udd_queue", "ms2@udd_queue") + +save() +activate(block="true") +disconnect() \ No newline at end of file diff --git a/examples/messaging/weblogic-jms-mp/weblogic/properties/domain.properties b/examples/messaging/weblogic-jms-mp/weblogic/properties/domain.properties new file mode 100644 index 00000000000..6e9a5fc4b19 --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/weblogic/properties/domain.properties @@ -0,0 +1,35 @@ +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Env properties inherited from base image +DOMAIN_NAME=myDomain +ADMIN_LISTEN_PORT=7001 +ADMIN_NAME=myadmin +PRODUCTION_MODE=dev +DEBUG_FLAG=true +ADMINISTRATION_PORT_ENABLED=false +ADMINISTRATION_PORT=9002 +# Env properties for this image +ADMIN_HOST=AdminContainer +MANAGED_SERVER_PORT=8001 +MANAGED_SERVER_NAME_BASE=MS +CONFIGURED_MANAGED_SERVER_COUNT=2 +PRODUCTION_MODE_ENABLED=true +CLUSTER_NAME=cluster1 +CLUSTER_TYPE=DYNAMIC +DOMAIN_HOST_VOLUME=/Users/host/temp +username=admin +password=Welcome1 \ No newline at end of file diff --git a/examples/messaging/weblogic-jms-mp/weblogic/properties/domain_security.properties b/examples/messaging/weblogic-jms-mp/weblogic/properties/domain_security.properties new file mode 100644 index 00000000000..fcfb1d90fff --- /dev/null +++ b/examples/messaging/weblogic-jms-mp/weblogic/properties/domain_security.properties @@ -0,0 +1,18 @@ +# +# Copyright (c) 2022 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +username=admin +password=Welcome1 \ No newline at end of file diff --git a/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqConnectorImpl.java b/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqConnectorImpl.java index 5eb75fa34e4..131149ec0ad 100644 --- a/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqConnectorImpl.java +++ b/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqConnectorImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import io.helidon.config.Config; import io.helidon.config.ConfigValue; import io.helidon.messaging.MessagingException; +import io.helidon.messaging.NackHandler; import io.helidon.messaging.connectors.jms.ConnectionContext; import io.helidon.messaging.connectors.jms.JmsConnector; import io.helidon.messaging.connectors.jms.JmsMessage; @@ -167,15 +168,19 @@ private AQjmsConnectionFactory createAqFactory(Config c) throws javax.jms.JMSExc @Override - protected JmsMessage createMessage(jakarta.jms.Message message, + protected JmsMessage createMessage(NackHandler nackHandler, + jakarta.jms.Message message, Executor executor, SessionMetadata sessionMetadata) { - return new AqMessageImpl<>(super.createMessage(message, executor, sessionMetadata), sessionMetadata); + return new AqMessageImpl<>( + super.createMessage(nackHandler, message, executor, sessionMetadata), + sessionMetadata); } @Override protected BiConsumer, JMSException> sendingErrorHandler(Config config) { return (m, e) -> { + m.nack(e); throw new MessagingException("Error during sending Oracle AQ JMS message.", e); }; } diff --git a/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqMessageImpl.java b/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqMessageImpl.java index 1240e700762..aafe1b3971c 100644 --- a/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqMessageImpl.java +++ b/messaging/connectors/aq/src/main/java/io/helidon/messaging/connectors/aq/AqMessageImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ class AqMessageImpl implements AqMessage { if (jakartaSession == null) { this.session = null; } else { - this.session = ((JakartaSession) jakartaSession).unwrap(AQjmsSession.class); + this.session = ((JakartaSession) jakartaSession).unwrap(); } } diff --git a/messaging/connectors/aq/src/test/java/io/helidon/messaging/connectors/aq/AckTest.java b/messaging/connectors/aq/src/test/java/io/helidon/messaging/connectors/aq/AckTest.java index 7533c8fddc4..00d9aa2f667 100644 --- a/messaging/connectors/aq/src/test/java/io/helidon/messaging/connectors/aq/AckTest.java +++ b/messaging/connectors/aq/src/test/java/io/helidon/messaging/connectors/aq/AckTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ void ackPropagationTest() throws InterruptedException, JMSException { }).when(mockedMessage).acknowledge(); AqConnectorImpl aqConnector = new AqConnectorImpl(Map.of(), null, null); - JmsMessage jmsMessage = aqConnector.createMessage(mockedMessage, null, sessionMetadata); + JmsMessage jmsMessage = aqConnector.createMessage(null, mockedMessage, null, sessionMetadata); AqMessage aqMessage = new AqMessageImpl<>(jmsMessage, sessionMetadata); aqMessage.ack(); assertThat("Ack not propagated to JmsMessage", diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaDestination.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaDestination.java index c41dd204b7b..6dd6d1550e6 100644 --- a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaDestination.java +++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaDestination.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Oracle and/or its affiliates. + * Copyright (c) 2021, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,14 +20,20 @@ /** * Exposes Jakarta API, delegates to javax API. */ -class JakartaDestination implements Destination { +class JakartaDestination implements Destination, JakartaWrapper { private final T delegate; JakartaDestination(T delegate) { this.delegate = delegate; } - T unwrap() { + @Override + public T unwrap() { return delegate; } + + @Override + public String toString() { + return delegate.toString(); + } } diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaJms.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaJms.java index 082008514bc..96e4653fb6a 100644 --- a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaJms.java +++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaJms.java @@ -15,6 +15,10 @@ */ package io.helidon.messaging.connectors.jms.shim; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + import jakarta.jms.BytesMessage; import jakarta.jms.CompletionListener; import jakarta.jms.Connection; @@ -57,6 +61,7 @@ private JakartaJms() { * @return shimmed jakarta namespace instance */ public static BytesMessage create(javax.jms.BytesMessage delegate) { + if (delegate == null) return null; return new JakartaByteMessage(delegate); } /** @@ -65,6 +70,7 @@ public static BytesMessage create(javax.jms.BytesMessage delegate) { * @return shimmed jakarta namespace instance */ public static CompletionListener create(javax.jms.CompletionListener delegate) { + if (delegate == null) return null; return new JakartaCompletionListener(delegate); } /** @@ -73,6 +79,7 @@ public static CompletionListener create(javax.jms.CompletionListener delegate) { * @return shimmed jakarta namespace instance */ public static Connection create(javax.jms.Connection delegate) { + if (delegate == null) return null; return new JakartaConnection(delegate); } /** @@ -81,6 +88,7 @@ public static Connection create(javax.jms.Connection delegate) { * @return shimmed jakarta namespace instance */ public static ConnectionConsumer create(javax.jms.ConnectionConsumer delegate) { + if (delegate == null) return null; return new JakartaConnectionConsumer(delegate); } /** @@ -89,6 +97,7 @@ public static ConnectionConsumer create(javax.jms.ConnectionConsumer delegate) { * @return shimmed jakarta namespace instance */ public static ConnectionFactory create(javax.jms.ConnectionFactory delegate) { + if (delegate == null) return null; return new JakartaConnectionFactory(delegate); } /** @@ -97,6 +106,7 @@ public static ConnectionFactory create(javax.jms.ConnectionFactory delegate) { * @return shimmed jakarta namespace instance */ public static ConnectionMetaData create(javax.jms.ConnectionMetaData delegate) { + if (delegate == null) return null; return new JakartaConnectionMetaData(delegate); } /** @@ -105,6 +115,7 @@ public static ConnectionMetaData create(javax.jms.ConnectionMetaData delegate) { * @return shimmed jakarta namespace instance */ public static JMSConsumer create(javax.jms.JMSConsumer delegate) { + if (delegate == null) return null; return new JakartaConsumer(delegate); } /** @@ -113,6 +124,7 @@ public static JMSConsumer create(javax.jms.JMSConsumer delegate) { * @return shimmed jakarta namespace instance */ public static JMSContext create(javax.jms.JMSContext delegate) { + if (delegate == null) return null; return new JakartaContext(delegate); } /** @@ -121,6 +133,7 @@ public static JMSContext create(javax.jms.JMSContext delegate) { * @return shimmed jakarta namespace instance */ public static Destination create(javax.jms.Destination delegate) { + if (delegate == null) return null; return new JakartaDestination<>(delegate); } /** @@ -129,6 +142,7 @@ public static Destination create(javax.jms.Destination delegate) { * @return shimmed jakarta namespace instance */ public static ExceptionListener create(javax.jms.ExceptionListener delegate) { + if (delegate == null) return null; return new JakartaExceptionListener(delegate); } /** @@ -137,8 +151,32 @@ public static ExceptionListener create(javax.jms.ExceptionListener delegate) { * @return shimmed jakarta namespace instance */ public static MapMessage create(javax.jms.MapMessage delegate) { + if (delegate == null) return null; return new JakartaMapMessage(delegate); } + + /** + * Convenience method for shimming various javax JMS classes. + * + * @param obj to be shimmed or just typed + * @param expectedType expected type to shim to + * @return typed or shimmed object + * @param expected type to shim to + */ + public static T resolve(Object obj, Class expectedType) { + if (expectedType.isAssignableFrom(obj.getClass())) { + return (T) obj; + } + Map, Function> conversionMap = Map.of( + ConnectionFactory.class, o -> (T) JakartaJms.create((javax.jms.ConnectionFactory) o), + Destination.class, o -> (T) JakartaJms.create((javax.jms.Destination) o) + ); + return Optional.ofNullable(conversionMap.get(expectedType)) + .map(r -> r.apply(obj)) + .orElseThrow(() -> new IllegalStateException("Unexpected type of connection factory: " + obj.getClass())); + } + + /** * Create a jakarta wrapper for the provided javax instance. * @param delegate javax namespace instance @@ -171,6 +209,7 @@ public static Message create(javax.jms.Message delegate) { * @return shimmed jakarta namespace instance */ public static MessageConsumer create(javax.jms.MessageConsumer delegate) { + if (delegate == null) return null; return new JakartaMessageConsumer(delegate); } /** @@ -179,6 +218,7 @@ public static MessageConsumer create(javax.jms.MessageConsumer delegate) { * @return shimmed jakarta namespace instance */ public static MessageListener create(javax.jms.MessageListener delegate) { + if (delegate == null) return null; return new JakartaMessageListener(delegate); } /** @@ -187,6 +227,7 @@ public static MessageListener create(javax.jms.MessageListener delegate) { * @return shimmed jakarta namespace instance */ public static MessageProducer create(javax.jms.MessageProducer delegate) { + if (delegate == null) return null; return new JakartaMessageProducer(delegate); } /** @@ -195,6 +236,7 @@ public static MessageProducer create(javax.jms.MessageProducer delegate) { * @return shimmed jakarta namespace instance */ public static ObjectMessage create(javax.jms.ObjectMessage delegate) { + if (delegate == null) return null; return new JakartaObjectMessage(delegate); } /** @@ -203,6 +245,7 @@ public static ObjectMessage create(javax.jms.ObjectMessage delegate) { * @return shimmed jakarta namespace instance */ public static JMSProducer create(javax.jms.JMSProducer delegate) { + if (delegate == null) return null; return new JakartaProducer(delegate); } /** @@ -211,6 +254,7 @@ public static JMSProducer create(javax.jms.JMSProducer delegate) { * @return shimmed jakarta namespace instance */ public static Queue create(javax.jms.Queue delegate) { + if (delegate == null) return null; return new JakartaQueue(delegate); } /** @@ -219,6 +263,7 @@ public static Queue create(javax.jms.Queue delegate) { * @return shimmed jakarta namespace instance */ public static QueueBrowser create(javax.jms.QueueBrowser delegate) { + if (delegate == null) return null; return new JakartaQueueBrowser(delegate); } /** @@ -227,6 +272,7 @@ public static QueueBrowser create(javax.jms.QueueBrowser delegate) { * @return shimmed jakarta namespace instance */ public static Session create(javax.jms.Session delegate) { + if (delegate == null) return null; return new JakartaSession(delegate); } /** @@ -235,6 +281,7 @@ public static Session create(javax.jms.Session delegate) { * @return shimmed jakarta namespace instance */ public static ServerSessionPool create(javax.jms.ServerSessionPool delegate) { + if (delegate == null) return null; return new JakartaSessionPool(delegate); } /** @@ -243,6 +290,7 @@ public static ServerSessionPool create(javax.jms.ServerSessionPool delegate) { * @return shimmed jakarta namespace instance */ public static ServerSession create(javax.jms.ServerSession delegate) { + if (delegate == null) return null; return new JakartaServerSession(delegate); } /** @@ -251,6 +299,7 @@ public static ServerSession create(javax.jms.ServerSession delegate) { * @return shimmed jakarta namespace instance */ public static StreamMessage create(javax.jms.StreamMessage delegate) { + if (delegate == null) return null; return new JakartaStreamMessage(delegate); } /** @@ -259,6 +308,7 @@ public static StreamMessage create(javax.jms.StreamMessage delegate) { * @return shimmed jakarta namespace instance */ public static TemporaryQueue create(javax.jms.TemporaryQueue delegate) { + if (delegate == null) return null; return new JakartaTemporaryQueue(delegate); } /** @@ -267,6 +317,7 @@ public static TemporaryQueue create(javax.jms.TemporaryQueue delegate) { * @return shimmed jakarta namespace instance */ public static TemporaryTopic create(javax.jms.TemporaryTopic delegate) { + if (delegate == null) return null; return new JakartaTemporaryTopic(delegate); } /** @@ -275,6 +326,7 @@ public static TemporaryTopic create(javax.jms.TemporaryTopic delegate) { * @return shimmed jakarta namespace instance */ public static TextMessage create(javax.jms.TextMessage delegate) { + if (delegate == null) return null; return new JakartaTextMessage(delegate); } /** @@ -283,6 +335,7 @@ public static TextMessage create(javax.jms.TextMessage delegate) { * @return shimmed jakarta namespace instance */ public static Topic create(javax.jms.Topic delegate) { + if (delegate == null) return null; return new JakartaTopic(delegate); } /** @@ -291,6 +344,7 @@ public static Topic create(javax.jms.Topic delegate) { * @return shimmed jakarta namespace instance */ public static TopicSubscriber create(javax.jms.TopicSubscriber delegate) { + if (delegate == null) return null; return new JakartaTopicSubscriber(delegate); } } diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaMessage.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaMessage.java index adc91bf8d7b..4f1d7c14d54 100644 --- a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaMessage.java +++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaMessage.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Oracle and/or its affiliates. + * Copyright (c) 2021, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,10 +24,10 @@ import static io.helidon.messaging.connectors.jms.shim.ShimUtil.call; import static io.helidon.messaging.connectors.jms.shim.ShimUtil.run; -class JakartaMessage implements Message { - private final javax.jms.Message delegate; +class JakartaMessage implements Message, JakartaWrapper { + private final T delegate; - JakartaMessage(javax.jms.Message delegate) { + JakartaMessage(T delegate) { this.delegate = delegate; } @@ -276,7 +276,7 @@ public boolean isBodyAssignableTo(Class c) throws JMSException { return call(() -> delegate.isBodyAssignableTo(c)); } - public javax.jms.Message unwrap() { + public T unwrap() { return delegate; } } diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaMessageProducer.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaMessageProducer.java index 1a5fd5f44ac..c1e0fbcadf4 100644 --- a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaMessageProducer.java +++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaMessageProducer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Oracle and/or its affiliates. + * Copyright (c) 2021, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,10 +24,10 @@ import static io.helidon.messaging.connectors.jms.shim.ShimUtil.call; import static io.helidon.messaging.connectors.jms.shim.ShimUtil.run; -class JakartaMessageProducer implements MessageProducer { - private final javax.jms.MessageProducer delegate; +class JakartaMessageProducer implements MessageProducer, JakartaWrapper { + private final T delegate; - JakartaMessageProducer(javax.jms.MessageProducer delegate) { + JakartaMessageProducer(T delegate) { this.delegate = delegate; } @@ -162,4 +162,9 @@ public void send(Destination destination, timeToLive, JavaxJms.create(completionListener))); } + + @Override + public T unwrap() { + return delegate; + } } diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaProducer.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaProducer.java index f27f4051150..4f0da581595 100644 --- a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaProducer.java +++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaProducer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Oracle and/or its affiliates. + * Copyright (c) 2021, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,15 +27,20 @@ /** * Exposes Jakarta API, delegates to javax API. */ -class JakartaProducer implements JMSProducer { - private final javax.jms.JMSProducer delegate; +class JakartaProducer implements JMSProducer, JakartaWrapper { + private final T delegate; private CompletionListener completionListener; private javax.jms.CompletionListener javaxCompletionListener; - JakartaProducer(javax.jms.JMSProducer delegate) { + JakartaProducer(T delegate) { this.delegate = delegate; } + @Override + public T unwrap() { + return delegate; + } + @Override public JMSProducer send(Destination destination, Message message) { delegate.send(ShimUtil.destination(destination), ShimUtil.message(message)); diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaSession.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaSession.java index 4b0d09df3dc..55697e3aeab 100644 --- a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaSession.java +++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaSession.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2022 Oracle and/or its affiliates. + * Copyright (c) 2021, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,11 +40,13 @@ /** * Exposes Jakarta API, delegates to javax API. + * + * @param Type of the javax delegate */ -public class JakartaSession implements Session { - private final javax.jms.Session delegate; +public class JakartaSession implements Session, JakartaWrapper { + private final T delegate; - JakartaSession(javax.jms.Session delegate) { + JakartaSession(T delegate) { this.delegate = delegate; } @@ -251,11 +253,18 @@ public void unsubscribe(String name) throws JMSException { * Unwrap the underlying instance of javax session. * * @param type class to unwrap to - * @param type to unwrap to + * @param type to unwrap to * @return unwrapped session + * @deprecated since 3.0.3, use {@link io.helidon.messaging.connectors.jms.shim.JakartaSession#unwrap()} instead. * @throws java.lang.ClassCastException in case the underlying instance is not compatible with the requested type */ - public T unwrap(Class type) { + @Deprecated(forRemoval = true, since = "3.0.3") + public S unwrap(Class type) { return type.cast(delegate); } + + @Override + public T unwrap() { + return delegate; + } } diff --git a/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaWrapper.java b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaWrapper.java new file mode 100644 index 00000000000..cdad45281b4 --- /dev/null +++ b/messaging/connectors/jms-shim/src/main/java/io/helidon/messaging/connectors/jms/shim/JakartaWrapper.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.messaging.connectors.jms.shim; + +/** + * Jakarta JMS shim objects with accessible delegate. + * + * @param Javax JMS delegate type + */ +public interface JakartaWrapper { + + /** + * Unwrap the underlying javax instance. + * + * @return unwrapped javax delegate + */ + T unwrap(); +} diff --git a/messaging/connectors/jms/pom.xml b/messaging/connectors/jms/pom.xml index 320bc07b6d9..73662c2dd4b 100644 --- a/messaging/connectors/jms/pom.xml +++ b/messaging/connectors/jms/pom.xml @@ -63,6 +63,10 @@ provided true
+ + io.helidon.messaging + helidon-messaging-jms-shim + jakarta.jms jakarta.jms-api diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/AbstractJmsMessage.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/AbstractJmsMessage.java index 1d25e1bddb7..30081660b78 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/AbstractJmsMessage.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/AbstractJmsMessage.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,10 +23,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; import io.helidon.messaging.MessagingException; +import io.helidon.messaging.NackHandler; import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; @@ -40,11 +42,12 @@ abstract class AbstractJmsMessage implements JmsMessage { private Executor executor; private SessionMetadata sharedSessionEntry; private volatile boolean acked = false; + private final NackHandler nackHandler; - protected AbstractJmsMessage() { - } - - protected AbstractJmsMessage(Executor executor, SessionMetadata sharedSessionEntry) { + protected AbstractJmsMessage(NackHandler nackHandler, + Executor executor, + SessionMetadata sharedSessionEntry) { + this.nackHandler = nackHandler; this.sharedSessionEntry = sharedSessionEntry; this.executor = executor; } @@ -116,4 +119,8 @@ public CompletionStage ack() { }); } + @Override + public Function> getNack() { + return this.nackHandler != null ? this.nackHandler.getNack(this) : reason -> CompletableFuture.completedFuture(null); + } } diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConnectionContext.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConnectionContext.java index 328322b65dc..cb62c9e1349 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConnectionContext.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/ConnectionContext.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import io.helidon.config.Config; import io.helidon.messaging.MessagingException; +import io.helidon.messaging.connectors.jms.shim.JakartaJms; import jakarta.jms.ConnectionFactory; import jakarta.jms.Destination; @@ -87,11 +88,13 @@ Optional lookupDestination() { } Optional lookupFactory(String jndi) { - return Optional.ofNullable((ConnectionFactory) lookup(jndi)); + return Optional.ofNullable(lookup(jndi)) + .map(o -> JakartaJms.resolve(o, ConnectionFactory.class)); } Optional lookupDestination(String jndi) { - return Optional.ofNullable((Destination) lookup(jndi)); + return Optional.ofNullable((Destination) lookup(jndi)) + .map(o -> JakartaJms.resolve(o, Destination.class)); } diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsBytesMessage.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsBytesMessage.java index df2c4a54a24..2b73c492ab2 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsBytesMessage.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsBytesMessage.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.util.concurrent.Executor; import io.helidon.messaging.MessagingException; +import io.helidon.messaging.NackHandler; import jakarta.jms.BytesMessage; import jakarta.jms.JMSException; @@ -33,8 +34,11 @@ public class JmsBytesMessage extends AbstractJmsMessage { private final jakarta.jms.BytesMessage msg; - JmsBytesMessage(jakarta.jms.BytesMessage msg, Executor executor, SessionMetadata sharedSessionEntry) { - super(executor, sharedSessionEntry); + JmsBytesMessage(NackHandler nackHandler, + jakarta.jms.BytesMessage msg, + Executor executor, + SessionMetadata sharedSessionEntry) { + super(nackHandler, executor, sharedSessionEntry); this.msg = msg; } diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java index d8e40459cd2..04f2069af60 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsConnector.java @@ -43,7 +43,9 @@ import io.helidon.config.ConfigValue; import io.helidon.config.mp.MpConfig; import io.helidon.messaging.MessagingException; +import io.helidon.messaging.NackHandler; import io.helidon.messaging.Stoppable; +import io.helidon.messaging.connectors.jms.shim.JakartaWrapper; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.BeforeDestroyed; @@ -64,6 +66,7 @@ import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttribute; import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory; import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; @@ -76,6 +79,102 @@ */ @ApplicationScoped @Connector(JmsConnector.CONNECTOR_NAME) +@ConnectorAttribute(name = JmsConnector.USERNAME_ATTRIBUTE, + description = "User name used to connect JMS session", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = JmsConnector.PASSWORD_ATTRIBUTE, + description = "Password to connect JMS session", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = JmsConnector.TYPE_ATTRIBUTE, + description = "Possible values are: queue, topic", + defaultValue = "queue", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = JmsConnector.DESTINATION_ATTRIBUTE, + description = "Queue or topic name", + mandatory = true, + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = JmsConnector.ACK_MODE_ATTRIBUTE, + description = "Possible values are: " + + "AUTO_ACKNOWLEDGE- session automatically acknowledges a client’s receipt of a message, " + + "CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually, " + + "DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages.", + defaultValue = "AUTO_ACKNOWLEDGE", + direction = ConnectorAttribute.Direction.INCOMING, + type = "io.helidon.messaging.connectors.jms.AcknowledgeMode") +@ConnectorAttribute(name = JmsConnector.TRANSACTED_ATTRIBUTE, + description = "Indicates whether the session will use a local transaction.", + mandatory = false, + defaultValue = "false", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "boolean") +@ConnectorAttribute(name = JmsConnector.MESSAGE_SELECTOR_ATTRIBUTE, + description = "JMS API message selector expression based on a subset of the SQL92. " + + "Expression can only access headers and properties, not the payload.", + mandatory = false, + direction = ConnectorAttribute.Direction.INCOMING, + type = "string") +@ConnectorAttribute(name = JmsConnector.CLIENT_ID_ATTRIBUTE, + description = "Client identifier for JMS connection.", + mandatory = false, + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = JmsConnector.DURABLE_ATTRIBUTE, + description = "True for creating durable consumer (only for topic).", + mandatory = false, + defaultValue = "false", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "boolean") +@ConnectorAttribute(name = JmsConnector.SUBSCRIBER_NAME_ATTRIBUTE, + description = "Subscriber name for durable consumer used to identify subscription.", + mandatory = false, + direction = ConnectorAttribute.Direction.INCOMING, + type = "string") +@ConnectorAttribute(name = JmsConnector.NON_LOCAL_ATTRIBUTE, + description = "If true then any messages published to the topic using this session’s connection, " + + "or any other connection with the same client identifier, " + + "will not be added to the durable subscription.", + mandatory = false, + defaultValue = "false", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "boolean") +@ConnectorAttribute(name = JmsConnector.NAMED_FACTORY_ATTRIBUTE, + description = "Select in case factory is injected as a named bean or configured with name.", + mandatory = false, + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = JmsConnector.POLL_TIMEOUT_ATTRIBUTE, + description = "Timeout for polling for next message in every poll cycle in millis. Default value: 50", + mandatory = false, + defaultValue = "50", + direction = ConnectorAttribute.Direction.INCOMING, + type = "long") +@ConnectorAttribute(name = JmsConnector.PERIOD_EXECUTIONS_ATTRIBUTE, + description = "Period for executing poll cycles in millis.", + mandatory = false, + defaultValue = "100", + direction = ConnectorAttribute.Direction.INCOMING, + type = "long") +@ConnectorAttribute(name = JmsConnector.SESSION_GROUP_ID_ATTRIBUTE, + description = "When multiple channels share same session-group-id, " + + "they share same JMS session and same JDBC connection as well.", + mandatory = false, + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = JmsConnector.JNDI_ATTRIBUTE + "." + JmsConnector.JNDI_JMS_FACTORY_ATTRIBUTE, + description = "JNDI name of JMS factory.", + mandatory = false, + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = JmsConnector.JNDI_ATTRIBUTE + "." + JmsConnector.JNDI_PROPS_ATTRIBUTE, + description = "Environment properties used for creating initial context java.naming.factory.initial, " + + "java.naming.provider.url …", + mandatory = false, + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "properties") public class JmsConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, Stoppable { private static final Logger LOGGER = Logger.getLogger(JmsConnector.class.getName()); @@ -254,18 +353,22 @@ void terminate(@Observes @BeforeDestroyed(ApplicationScoped.class) Object event) /** * Create reactive messaging message from JMS message. * + * @param nackHandler Not acknowledged handler * @param message JMS message * @param executor executor used for async execution of ack * @param sessionMetadata JMS session metadata * @return reactive messaging message extended with custom JMS features */ - protected JmsMessage createMessage(jakarta.jms.Message message, Executor executor, SessionMetadata sessionMetadata) { + protected JmsMessage createMessage(NackHandler nackHandler, + jakarta.jms.Message message, + Executor executor, + SessionMetadata sessionMetadata) { if (message instanceof TextMessage) { - return new JmsTextMessage((TextMessage) message, executor, sessionMetadata); + return new JmsTextMessage(nackHandler, (TextMessage) message, executor, sessionMetadata); } else if (message instanceof BytesMessage) { - return new JmsBytesMessage((BytesMessage) message, executor, sessionMetadata); + return new JmsBytesMessage(nackHandler, (BytesMessage) message, executor, sessionMetadata); } else { - return new AbstractJmsMessage(executor, sessionMetadata) { + return new AbstractJmsMessage(nackHandler, executor, sessionMetadata) { @Override public jakarta.jms.Message getJmsMessage() { @@ -331,39 +434,38 @@ public PublisherBuilder> getPublisherBuilder(Config mpConfi SessionMetadata sessionEntry = prepareSession(config, factory); Destination destination = createDestination(sessionEntry.session(), ctx); - String messageSelector = config.get(MESSAGE_SELECTOR_ATTRIBUTE).asString().orElse(null); - String subscriberName = config.get(SUBSCRIBER_NAME_ATTRIBUTE).asString().orElse(null); - MessageConsumer consumer; - if (config.get(DURABLE_ATTRIBUTE).asBoolean().orElse(false)) { - if (!(destination instanceof Topic)) { - throw new MessagingException("Can't create durable consumer. Only topic can be durable!"); - } - consumer = sessionEntry.session().createDurableSubscriber( - (Topic) destination, - subscriberName, - messageSelector, - config.get(NON_LOCAL_ATTRIBUTE).asBoolean().orElse(false)); - } else { - consumer = sessionEntry.session().createConsumer(destination, messageSelector); - } + MessageConsumer consumer = createConsumer(config, destination, sessionEntry); BufferedEmittingPublisher> emitter = BufferedEmittingPublisher.create(); + JmsNackHandler nackHandler = JmsNackHandler.create(emitter, config, this); Long pollTimeout = config.get(POLL_TIMEOUT_ATTRIBUTE) .asLong() .orElse(POLL_TIMEOUT_DEFAULT); + Long periodExecutions = config.get(PERIOD_EXECUTIONS_ATTRIBUTE) + .asLong() + .orElse(PERIOD_EXECUTIONS_DEFAULT); + AtomicReference> lastMessage = new AtomicReference<>(); scheduler.scheduleAtFixedRate( - () -> produce(emitter, sessionEntry, consumer, ackMode, awaitAck, pollTimeout, lastMessage), - 0, - config.get(PERIOD_EXECUTIONS_ATTRIBUTE) - .asLong() - .orElse(PERIOD_EXECUTIONS_DEFAULT), - TimeUnit.MILLISECONDS); + () -> { + if (!emitter.hasRequests()) { + return; + } + // When await-ack is true, no message is received until previous one is acked + if (ackMode != AcknowledgeMode.AUTO_ACKNOWLEDGE + && awaitAck + && lastMessage.get() != null + && !lastMessage.get().isAck()) { + return; + } + produce(emitter, sessionEntry, consumer, nackHandler, pollTimeout) + .ifPresent(lastMessage::set); + }, 0, periodExecutions, TimeUnit.MILLISECONDS); sessionEntry.connection().start(); return ReactiveStreams.fromPublisher(FlowAdapters.toPublisher(Multi.create(emitter))); } catch (JMSException e) { @@ -384,9 +486,8 @@ public SubscriberBuilder, Void> getSubscriberBuilder(Config SessionMetadata sessionEntry = prepareSession(config, factory); Session session = sessionEntry.session(); Destination destination = createDestination(session, ctx); - MessageProducer producer = session.createProducer(destination); - configureProducer(producer, ctx); - AtomicReference mapper = new AtomicReference<>(); + MessageProducer producer = createProducer(destination, ctx, sessionEntry); + AtomicReference mapper = new AtomicReference<>(); return ReactiveStreams.>builder() .flatMapCompletionStage(m -> consume(m, session, mapper, producer, config)) .onError(t -> LOGGER.log(Level.SEVERE, t, () -> "Error intercepted from channel " @@ -401,7 +502,15 @@ private void configureProducer(MessageProducer producer, ConnectionContext ctx) io.helidon.config.Config config = ctx.config().get("producer"); if (!config.exists()) return; - Class clazz = producer.getClass(); + final Object instance; + // Shim producer? + if (producer instanceof JakartaWrapper) { + instance = ((JakartaWrapper) producer).unwrap(); + } else { + instance = producer; + } + + Class clazz = instance.getClass(); Map setterMethods = Arrays.stream(clazz.getDeclaredMethods()) .filter(m -> m.getParameterCount() == 1) .collect(Collectors.toMap(m -> ConfigHelper.stripSet(m.getName()), Function.identity())); @@ -417,7 +526,7 @@ private void configureProducer(MessageProducer producer, ConnectionContext ctx) return; } try { - m.invoke(producer, c.as(m.getParameterTypes()[0]).get()); + m.invoke(instance, c.as(m.getParameterTypes()[0]).get()); } catch (Throwable e) { LOGGER.log(Level.WARNING, "Error when setting JMS producer property " + key @@ -428,43 +537,31 @@ private void configureProducer(MessageProducer producer, ConnectionContext ctx) }); } - private void produce( + private Optional> produce( BufferedEmittingPublisher> emitter, SessionMetadata sessionEntry, MessageConsumer consumer, - AcknowledgeMode ackMode, - Boolean awaitAck, - Long pollTimeout, - AtomicReference> lastMessage) { - - if (!emitter.hasRequests()) { - return; - } - // When await-ack is true, no message is received until previous one is acked - if (ackMode != AcknowledgeMode.AUTO_ACKNOWLEDGE - && awaitAck - && lastMessage.get() != null - && !lastMessage.get().isAck()) { - return; - } + JmsNackHandler nackHandler, + Long pollTimeout) { try { jakarta.jms.Message message = consumer.receive(pollTimeout); if (message == null) { - return; + return Optional.empty(); } LOGGER.fine(() -> "Received message: " + message); - JmsMessage preparedMessage = createMessage(message, executor, sessionEntry); - lastMessage.set(preparedMessage); + JmsMessage preparedMessage = createMessage(nackHandler, message, executor, sessionEntry); emitter.emit(preparedMessage); + return Optional.of(preparedMessage); } catch (Throwable e) { emitter.fail(e); + return Optional.empty(); } } - private CompletionStage consume( + CompletionStage consume( Message m, Session session, - AtomicReference mapper, + AtomicReference mapper, MessageProducer producer, io.helidon.config.Config config) { @@ -474,28 +571,34 @@ private CompletionStage consume( } return CompletableFuture - .supplyAsync(() -> { - try { - jakarta.jms.Message jmsMessage; - - if (m instanceof OutgoingJmsMessage) { - // custom mapping, properties etc. - jmsMessage = ((OutgoingJmsMessage) m).toJmsMessage(session, mapper.get()); - } else { - // default mappers - jmsMessage = mapper.get().apply(session, m); - } - // actual send - producer.send(jmsMessage); - return m.ack(); - } catch (JMSException e) { - sendingErrorHandler(config).accept(m, e); - } - return CompletableFuture.completedFuture(null); - }, executor) + .supplyAsync(() -> consumeAsync(m, session, mapper, producer, config), executor) .thenApply(aVoid -> m); } + protected CompletionStage consumeAsync(Message m, + Session session, + AtomicReference mapper, + MessageProducer producer, + io.helidon.config.Config config) { + try { + jakarta.jms.Message jmsMessage; + + if (m instanceof OutgoingJmsMessage) { + // custom mapping, properties etc. + jmsMessage = ((OutgoingJmsMessage) m).toJmsMessage(session, mapper.get()); + } else { + // default mappers + jmsMessage = mapper.get().apply(session, m); + } + // actual send + producer.send(jmsMessage); + return m.ack(); + } catch (JMSException e) { + sendingErrorHandler(config).accept(m, e); + } + return CompletableFuture.completedFuture(null); + } + /** * Customizable handler for errors during sending. * @@ -504,12 +607,13 @@ private CompletionStage consume( */ protected BiConsumer, JMSException> sendingErrorHandler(io.helidon.config.Config config) { return (m, e) -> { + m.nack(e); throw new MessagingException("Error during sending JMS message.", e); }; } - private SessionMetadata prepareSession(io.helidon.config.Config config, - ConnectionFactory factory) throws JMSException { + protected SessionMetadata prepareSession(io.helidon.config.Config config, + ConnectionFactory factory) throws JMSException { Optional sessionGroupId = config.get(SESSION_GROUP_ID_ATTRIBUTE).asString().asOptional(); if (sessionGroupId.isPresent() && sessionRegister.containsKey(sessionGroupId.get())) { return sessionRegister.get(sessionGroupId.get()); @@ -543,9 +647,10 @@ private SessionMetadata prepareSession(io.helidon.config.Config config, sessionRegister.put(sessionGroupId.orElseGet(() -> UUID.randomUUID().toString()), sharedSessionEntry); return sharedSessionEntry; } + } - Destination createDestination(Session session, ConnectionContext ctx) { + protected Destination createDestination(Session session, ConnectionContext ctx) { io.helidon.config.Config config = ctx.config(); if (ctx.isJndi()) { @@ -582,6 +687,34 @@ Destination createDestination(Session session, ConnectionContext ctx) { } + protected MessageConsumer createConsumer(io.helidon.config.Config config, + Destination destination, + SessionMetadata sessionEntry) throws JMSException { + String messageSelector = config.get(MESSAGE_SELECTOR_ATTRIBUTE).asString().orElse(null); + String subscriberName = config.get(SUBSCRIBER_NAME_ATTRIBUTE).asString().orElse(null); + + if (config.get(DURABLE_ATTRIBUTE).asBoolean().orElse(false)) { + if (!(destination instanceof Topic)) { + throw new MessagingException("Can't create durable consumer. Only topic can be durable!"); + } + return sessionEntry.session().createDurableSubscriber( + (Topic) destination, + subscriberName, + messageSelector, + config.get(NON_LOCAL_ATTRIBUTE).asBoolean().orElse(false)); + } else { + return sessionEntry.session().createConsumer(destination, messageSelector); + } + } + + protected MessageProducer createProducer(Destination destination, + ConnectionContext ctx, + SessionMetadata sessionEntry) throws JMSException { + MessageProducer producer = sessionEntry.session().createProducer(destination); + configureProducer(producer, ctx); + return producer; + } + /** * Builder for {@link io.helidon.messaging.connectors.jms.JmsConnector}. */ diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsNackHandler.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsNackHandler.java new file mode 100644 index 00000000000..2c6214649c7 --- /dev/null +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsNackHandler.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.messaging.connectors.jms; + +import java.util.HashMap; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import io.helidon.common.reactive.BufferedEmittingPublisher; +import io.helidon.config.Config; +import io.helidon.config.ConfigSources; +import io.helidon.messaging.MessagingException; +import io.helidon.messaging.NackHandler; + +import jakarta.jms.ConnectionFactory; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.MessageProducer; +import org.eclipse.microprofile.reactive.messaging.Message; + +import static java.lang.System.Logger.Level.ERROR; +import static java.lang.System.Logger.Level.WARNING; + +abstract class JmsNackHandler implements NackHandler> { + + static JmsNackHandler create(BufferedEmittingPublisher> emitter, + Config config, + JmsConnector jmsConnector) { + Config dlq = config.get("nack-dlq"); + Config logOnly = config.get("nack-log-only"); + if (dlq.exists()) { + dlq = dlq.detach(); + return new JmsDLQ(config, dlq, jmsConnector); + } else if (logOnly.exists() && logOnly.asBoolean().orElse(true)) { + logOnly = logOnly.detach(); + return new JmsNackHandler.Log(config, logOnly); + } + // Default nack handling strategy + return new JmsNackHandler.KillChannel(emitter, config); + } + + static class Log extends JmsNackHandler { + + private final System.Logger logger; + private final String channelName; + private final System.Logger.Level level; + + Log(Config config, Config logOnlyConfig) { + this.channelName = config.get(JmsConnector.CHANNEL_NAME_ATTRIBUTE) + .asString() + .orElseThrow(() -> new MessagingException("Missing channel name!")); + + this.level = logOnlyConfig.get("level") + .as(System.Logger.Level.class) + .orElse(WARNING); + + this.logger = System.getLogger(logOnlyConfig.get("logger") + .asString() + .orElse(JmsNackHandler.class.getName())); + } + + + @Override + public Function> getNack(JmsMessage message) { + return t -> nack(t, message); + } + + private CompletionStage nack(Throwable t, JmsMessage message) { + logger.log(level, messageToString("NACKED Message ignored", channelName, message)); + message.ack(); + return CompletableFuture.completedFuture(null); + } + } + + static class KillChannel extends JmsNackHandler { + + private static final System.Logger LOGGER = System.getLogger(JmsNackHandler.KillChannel.class.getName()); + private final BufferedEmittingPublisher> emitter; + private final String channelName; + + KillChannel(BufferedEmittingPublisher> emitter, Config config) { + this.emitter = emitter; + this.channelName = config.get(JmsConnector.CHANNEL_NAME_ATTRIBUTE) + .asString() + .orElseThrow(() -> new MessagingException("Missing channel name!")); + } + + @Override + public Function> getNack(JmsMessage message) { + return throwable -> nack(throwable, message); + } + + private CompletionStage nack(Throwable t, JmsMessage message) { + LOGGER.log(ERROR, messageToString("NACKED message, killing the channel", channelName, message), t); + emitter.fail(t); + return CompletableFuture.failedStage(t); + } + } + + static String messageToString(String prefix, String channel, JmsMessage message) { + StringBuilder msg = new StringBuilder(prefix); + msg.append("\n"); + appendNonNull(msg, "channel", channel); + appendNonNull(msg, "correlationId", message.getCorrelationId()); + appendNonNull(msg, "replyTo", message.getReplyTo()); + for (String prop : message.getPropertyNames()) { + appendNonNull(msg, prop, message.getProperty(prop)); + } + return msg.toString(); + } + + static StringBuilder appendNonNull(StringBuilder sb, String name, Object value) { + if (Objects.isNull(value)) return sb; + return sb.append(name + ": ").append(value).append("\n"); + } + + static class JmsDLQ extends JmsNackHandler { + private static final System.Logger LOGGER = System.getLogger(JmsNackHandler.JmsDLQ.class.getName()); + private final MessageProducer producer; + private final SessionMetadata sessionMetadata; + private final AtomicReference mapper = new AtomicReference<>(); + private final String channelName; + private Config config; + private JmsConnector jmsConnector; + private Config dlq; + + JmsDLQ(Config config, Config dlq, JmsConnector jmsConnector) { + this.config = config; + this.jmsConnector = jmsConnector; + this.channelName = config.get(JmsConnector.CHANNEL_NAME_ATTRIBUTE) + .asString() + .orElseThrow(() -> new MessagingException("Missing channel name!")); + + Config.Builder dlqCfgBuilder = Config.builder(); + HashMap dlqCfgMap = new HashMap<>(); + if (dlq.isLeaf()) { + // nack-dlq=destination_name - Uses actual connection config, just set dlq destination + String destination = dlq.asString().orElseThrow(() -> new MessagingException("nack-dlq with no value!")); + dlqCfgMap.put(JmsConnector.DESTINATION_ATTRIBUTE, destination); + dlqCfgMap.put("type", "queue"); // default is queue + this.dlq = dlqCfgBuilder + .sources( + ConfigSources.create(dlqCfgMap), + ConfigSources.create(config.detach()) + ) + .disableEnvironmentVariablesSource() + .disableSystemPropertiesSource() + .build(); + } else { + // Custom dlq connection config + this.dlq = dlq.detach(); + } + + try { + ConnectionContext ctx = new ConnectionContext(this.dlq); + ConnectionFactory factory = jmsConnector.getFactory(ctx) + .orElseThrow(() -> new MessagingException("No ConnectionFactory found.")); + sessionMetadata = jmsConnector.prepareSession(dlq, factory); + Destination destination = jmsConnector.createDestination(sessionMetadata.session(), ctx); + producer = jmsConnector.createProducer(destination, ctx, sessionMetadata); + } catch (JMSException e) { + throw new MessagingException("Error when setting up DLQ nack handler for channel " + channelName, e); + } + } + + @Override + public Function> getNack(JmsMessage message) { + return throwable -> nack(throwable, message); + } + + private CompletionStage nack(Throwable t, JmsMessage message) { + try { + + Throwable cause = t; + while (cause.getCause() != null && cause != cause.getCause()) { + cause = cause.getCause(); + } + + // It has to be incoming JMS message as this nack handler cannot be used outside of connector + JmsMessage.OutgoingJmsMessageBuilder builder = JmsMessage.builder(message.getJmsMessage()); + builder.property(DLQ_ERROR_PROP, cause.getClass().getName()) + .property(DLQ_ERROR_MSG_PROP, cause.getMessage()) + .correlationId(message.getCorrelationId()) + .payload(message.getPayload()); + + config.get(JmsConnector.DESTINATION_ATTRIBUTE) + .asString() + .ifPresent(s -> builder.property(DLQ_ORIG_TOPIC_PROP, s)); + + Message dlqMessage = builder.build(); + jmsConnector.consume(dlqMessage, sessionMetadata.session(), mapper, producer, config); + } catch (Throwable e) { + e.addSuppressed(t); + LOGGER.log(ERROR, "Error when sending nacked message to DLQ", e); + return CompletableFuture.completedStage(null); + } + return CompletableFuture.completedStage(null); + } + } +} + diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsTextMessage.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsTextMessage.java index 154aeb37b36..e875c7f7864 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsTextMessage.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/JmsTextMessage.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.util.concurrent.Executor; import io.helidon.messaging.MessagingException; +import io.helidon.messaging.NackHandler; import jakarta.jms.JMSException; @@ -29,8 +30,11 @@ public class JmsTextMessage extends AbstractJmsMessage { private final jakarta.jms.TextMessage msg; - JmsTextMessage(jakarta.jms.TextMessage msg, Executor executor, SessionMetadata sharedSessionEntry) { - super(executor, sharedSessionEntry); + JmsTextMessage(NackHandler nackHandler, + jakarta.jms.TextMessage msg, + Executor executor, + SessionMetadata sharedSessionEntry) { + super(nackHandler, executor, sharedSessionEntry); this.msg = msg; } diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/MessageMapper.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/MessageMapper.java new file mode 100644 index 00000000000..8bbfe770e25 --- /dev/null +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/MessageMapper.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.messaging.connectors.jms; + +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.Session; + +/** + * Mapper used for translating reactive messaging message to JMS message. + */ +@FunctionalInterface +public interface MessageMapper { + /** + * Convert messaging message to JMS message. + * + * @param s JMS session + * @param m Reactive messaging message to be converted + * @return JMS message + * @throws JMSException + */ + Message apply(Session s, org.eclipse.microprofile.reactive.messaging.Message m) throws JMSException; +} diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/MessageMappers.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/MessageMappers.java index ba955ff0261..ffbe8841243 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/MessageMappers.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/MessageMappers.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,6 @@ import jakarta.jms.BytesMessage; import jakarta.jms.JMSException; -import jakarta.jms.Message; -import jakarta.jms.Session; class MessageMappers { @@ -57,8 +55,4 @@ static MessageMapper getJmsMessageMapper(org.eclipse.microprofile.reactive.messa }); } - @FunctionalInterface - interface MessageMapper { - Message apply(Session s, org.eclipse.microprofile.reactive.messaging.Message m) throws JMSException; - } } diff --git a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/OutgoingJmsMessage.java b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/OutgoingJmsMessage.java index 1cf2ddd1409..c9fe2987aa1 100644 --- a/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/OutgoingJmsMessage.java +++ b/messaging/connectors/jms/src/main/java/io/helidon/messaging/connectors/jms/OutgoingJmsMessage.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,7 +78,7 @@ void postProcess(PostProcessor processor) { this.postProcessors.add(processor); } - jakarta.jms.Message toJmsMessage(Session session, MessageMappers.MessageMapper defaultMapper) throws JMSException { + jakarta.jms.Message toJmsMessage(Session session, MessageMapper defaultMapper) throws JMSException { jakarta.jms.Message jmsMessage; if (mapper != null) { jmsMessage = mapper.apply(getPayload(), session); diff --git a/messaging/connectors/jms/src/main/java/module-info.java b/messaging/connectors/jms/src/main/java/module-info.java index 31fa2af968f..1f683b63b7d 100644 --- a/messaging/connectors/jms/src/main/java/module-info.java +++ b/messaging/connectors/jms/src/main/java/module-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,9 +31,11 @@ requires io.helidon.common.context; requires io.helidon.common.reactive; requires io.helidon.common.configurable; + requires io.helidon.messaging.jms.shim; requires io.helidon.messaging; requires microprofile.config.api; requires java.naming; + requires javax.jms.api; exports io.helidon.messaging.connectors.jms; } diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaConsumerMessage.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaConsumerMessage.java index 8086afbdf04..dcaae635327 100644 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaConsumerMessage.java +++ b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaConsumerMessage.java @@ -37,7 +37,7 @@ class KafkaConsumerMessage implements KafkaMessage { private final CompletableFuture ack; - private final NackHandler nack; + private final KafkaNackHandler nack; private final long millisWaitingTimeout; private final AtomicBoolean acked = new AtomicBoolean(); private final ConsumerRecord consumerRecord; @@ -52,7 +52,7 @@ class KafkaConsumerMessage implements KafkaMessage { */ KafkaConsumerMessage(ConsumerRecord consumerRecord, CompletableFuture ack, - NackHandler nack, + KafkaNackHandler nack, long millisWaitingTimeout) { Objects.requireNonNull(consumerRecord); this.consumerRecord = consumerRecord; diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/NackHandler.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaNackHandler.java similarity index 90% rename from messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/NackHandler.java rename to messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaNackHandler.java index 0dc3b76038e..08f216a8dc3 100644 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/NackHandler.java +++ b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaNackHandler.java @@ -27,34 +27,35 @@ import io.helidon.common.reactive.EmittingPublisher; import io.helidon.common.reactive.Multi; import io.helidon.config.Config; +import io.helidon.messaging.NackHandler; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.reactivestreams.FlowAdapters; -interface NackHandler { +interface KafkaNackHandler extends NackHandler> { Function> getNack(KafkaMessage message); - static NackHandler create(EmittingPublisher> emitter, Config config) { + static KafkaNackHandler create(EmittingPublisher> emitter, Config config) { Config dlq = config.get("nack-dlq"); Config logOnly = config.get("nack-log-only"); if (dlq.exists()) { dlq = dlq.detach(); - return new NackHandler.KafkaDLQ<>(emitter, config, dlq); + return new KafkaNackHandler.KafkaDLQ<>(emitter, config, dlq); } else if (logOnly.exists() && logOnly.asBoolean().orElse(true)) { logOnly = logOnly.detach(); - return new NackHandler.Log<>(config, logOnly); + return new KafkaNackHandler.Log<>(config, logOnly); } - return new NackHandler.KillChannel<>(emitter); + return new KafkaNackHandler.KillChannel<>(emitter); } - class Log implements NackHandler { + class Log implements KafkaNackHandler { Log(Config config, Config logOnlyConfig) { } - private static final Logger LOGGER = Logger.getLogger(NackHandler.Log.class.getName()); + private static final Logger LOGGER = Logger.getLogger(KafkaNackHandler.Log.class.getName()); @Override public Function> getNack(KafkaMessage message) { @@ -67,9 +68,9 @@ private CompletionStage nack(Throwable t, KafkaMessage message) { } } - class KillChannel implements NackHandler { + class KillChannel implements KafkaNackHandler { - private static final Logger LOGGER = Logger.getLogger(NackHandler.KillChannel.class.getName()); + private static final Logger LOGGER = Logger.getLogger(KafkaNackHandler.KillChannel.class.getName()); private final EmittingPublisher> emitter; KillChannel(EmittingPublisher> emitter) { @@ -88,7 +89,7 @@ private CompletionStage nack(Throwable t, KafkaMessage message) { } } - class KafkaDLQ implements NackHandler { + class KafkaDLQ implements KafkaNackHandler { private static final String DESERIALIZER_MASK = "([^.]*)Deserializer([^.]*$)"; private static final String DESERIALIZER_REPLACEMENT = "$1Serializer$2"; @@ -149,7 +150,7 @@ class KafkaDLQ implements NackHandler { @Override public Function> getNack(KafkaMessage message) { - return throwable -> this.nack(throwable, message); + return t -> this.nack(t, message); } private CompletionStage nack(Throwable t, KafkaMessage origMsg) { diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaPublisher.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaPublisher.java index 1dbc973d16e..3a0c233b8d8 100644 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaPublisher.java +++ b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaPublisher.java @@ -124,7 +124,7 @@ private void start() { kafkaConsumer.subscribe(topics, partitionsAssignedLatch); } - NackHandler nack = NackHandler.create(emitter, config); + KafkaNackHandler nack = KafkaNackHandler.create(emitter, config); // This thread reads from Kafka topics and push in kafkaBufferedEvents scheduler.scheduleAtFixedRate(() -> { diff --git a/messaging/connectors/pom.xml b/messaging/connectors/pom.xml index d2c4cc413f4..0507245d0d4 100644 --- a/messaging/connectors/pom.xml +++ b/messaging/connectors/pom.xml @@ -32,9 +32,10 @@ kafka + jms-shim jms aq - jms-shim + wls-jms mock diff --git a/messaging/connectors/wls-jms/pom.xml b/messaging/connectors/wls-jms/pom.xml new file mode 100644 index 00000000000..a881be8c434 --- /dev/null +++ b/messaging/connectors/wls-jms/pom.xml @@ -0,0 +1,64 @@ + + + + + + 4.0.0 + + + io.helidon.messaging + helidon-messaging-connectors-project + 4.0.0-SNAPSHOT + + + io.helidon.messaging.wls-jms + helidon-messaging-wls-jms + jar + Helidon Messaging Weblogic JMS Connector + + + + io.helidon.messaging.jms + helidon-messaging-jms + + + + jakarta.enterprise + jakarta.enterprise.cdi-api + provided + true + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.hamcrest + hamcrest-all + test + + + org.mockito + mockito-core + test + + + diff --git a/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/IsolatedContextFactory.java b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/IsolatedContextFactory.java new file mode 100644 index 00000000000..fee2e576d05 --- /dev/null +++ b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/IsolatedContextFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.messaging.connectors.wls; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Hashtable; + +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; + +/** + * Initial JNDI context for Weblogic thin client initial context loaded by different classloader. + */ +public class IsolatedContextFactory implements InitialContextFactory { + + private static final String WLS_INIT_CTX_FACTORY = "weblogic.jndi.WLInitialContextFactory"; + + @Override + public Context getInitialContext(Hashtable env) throws NamingException { + return ThinClientClassLoader.executeInIsolation(() -> { + try { + Class wlInitialContextFactory = ThinClientClassLoader.getInstance().loadClass(WLS_INIT_CTX_FACTORY); + Constructor contextFactoryConstructor = wlInitialContextFactory.getConstructor(); + InitialContextFactory contextFactoryInstance = (InitialContextFactory) contextFactoryConstructor.newInstance(); + return contextFactoryInstance.getInitialContext(env); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find " + WLS_INIT_CTX_FACTORY, e); + } catch (NoSuchMethodException + | InvocationTargetException + | InstantiationException + | IllegalAccessException e) { + throw new RuntimeException("Cannot instantiate " + WLS_INIT_CTX_FACTORY, e); + } + }); + } +} diff --git a/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/ThinClientClassLoader.java b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/ThinClientClassLoader.java new file mode 100644 index 00000000000..cc0c6cb6a01 --- /dev/null +++ b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/ThinClientClassLoader.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.messaging.connectors.wls; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Path; + +import io.helidon.common.LazyValue; + +import static java.lang.System.Logger.Level.TRACE; + +class ThinClientClassLoader extends URLClassLoader { + + private static final System.Logger LOGGER = System.getLogger(ThinClientClassLoader.class.getName()); + private static final LazyValue ISOLATION_CL = LazyValue.create(ThinClientClassLoader::new); + private static volatile String thinJarLocation = "wlthint3client.jar"; + private final ClassLoader contextClassLoader; + + ThinClientClassLoader() { + super("thinClientClassLoader", new URL[0], null); + contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + + File currDirFile = Path.of("", thinJarLocation).toFile(); + LOGGER.log(TRACE, "Looking for Weblogic thin client jar file " + currDirFile.getPath() + " on filesystem"); + if (currDirFile.exists()) { + this.addURL(currDirFile.toURI().toURL()); + return; + } + + throw new RuntimeException("Can't locate thin jar file " + thinJarLocation); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inWlsJar(name)) { + try { + return super.loadClass(name, resolve); + } catch (ClassNotFoundException e) { + LOGGER.log(TRACE, "Cannot load class " + name + " from WLS thin client classloader.", e); + contextClassLoader.loadClass(name); + } + } + return contextClassLoader.loadClass(name); + } + + @Override + public URL getResource(String name) { + + if (inWlsJar(name)) { + return super.getResource(name); + } + return contextClassLoader.getResource(name); + } + + static ThinClientClassLoader getInstance() { + return ISOLATION_CL.get(); + } + + static void setThinJarLocation(String thinJarLocation) { + ThinClientClassLoader.thinJarLocation = thinJarLocation; + } + + static T executeInIsolation(IsolationSupplier supplier) { + ClassLoader originalCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(ISOLATION_CL.get()); + return supplier.get(); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + Thread.currentThread().setContextClassLoader(originalCl); + } + } + + boolean inWlsJar(String name) { + // Load only javax JMS API from outside, so cast works + return !name.startsWith("javax.jms") + && !name.equals(IsolatedContextFactory.class.getName()); + } + + @FunctionalInterface + interface IsolationSupplier { + T get() throws Throwable; + } +} diff --git a/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/WeblogicConnector.java b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/WeblogicConnector.java new file mode 100644 index 00000000000..952670a6ac5 --- /dev/null +++ b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/WeblogicConnector.java @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.messaging.connectors.wls; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import io.helidon.config.Config; +import io.helidon.messaging.connectors.jms.ConnectionContext; +import io.helidon.messaging.connectors.jms.JmsConnector; +import io.helidon.messaging.connectors.jms.MessageMapper; +import io.helidon.messaging.connectors.jms.SessionMetadata; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttribute; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; + +import static io.helidon.messaging.connectors.wls.ThinClientClassLoader.executeInIsolation; + +/** + * MicroProfile Reactive Messaging Weblogic JMS connector. + */ +@ApplicationScoped +@Connector(WeblogicConnector.CONNECTOR_NAME) +@ConnectorAttribute(name = WeblogicConnector.WLS_URL, + description = "Weblogic server URL", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + mandatory = true, + type = "string") +@ConnectorAttribute(name = WeblogicConnector.THIN_CLIENT_PATH, + description = "Filepath to the Weblogic thin T3 client jar(wlthint3client.jar), " + + "can be usually found within Weblogic installation 'server/lib/wlthint3client.jar'", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + mandatory = true, + type = "string") +@ConnectorAttribute(name = WeblogicConnector.JMS_FACTORY_ATTRIBUTE, + description = "Weblogic JMS factory name", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = WeblogicConnector.WLS_INIT_CONTEXT_PRINCIPAL, + description = "Weblogic initial context principal(user)", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = WeblogicConnector.WLS_INIT_CONTEXT_CREDENTIAL, + description = "Weblogic initial context credential(password)", + direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, + type = "string") +@ConnectorAttribute(name = "producer.unit-of-order", + description = "All messages from same unit of order will be processed sequentially in the order they were " + + "created.", + direction = ConnectorAttribute.Direction.OUTGOING, + type = "string") +@ConnectorAttribute(name = "producer.compression-threshold", + description = "Max bytes number of serialized message body so any message that exceeds this limit " + + "will trigger message compression.", + direction = ConnectorAttribute.Direction.OUTGOING, + type = "int") +@ConnectorAttribute(name = "producer.redelivery-limit", + description = "Number of times message is redelivered after recover or rollback.", + direction = ConnectorAttribute.Direction.OUTGOING, + type = "int") +@ConnectorAttribute(name = "producer.send-timeout", + description = "Maximum time the producer will wait for space when sending a message.", + direction = ConnectorAttribute.Direction.OUTGOING, + type = "long") +@ConnectorAttribute(name = "producer.time-to-deliver", + description = "Delay before sent message is made visible on its target destination.", + direction = ConnectorAttribute.Direction.OUTGOING, + type = "long") +public class WeblogicConnector extends JmsConnector { + private static final System.Logger LOGGER = System.getLogger(WeblogicConnector.class.getName()); + static final String JMS_FACTORY_ATTRIBUTE = "jms-factory"; + static final String THIN_CLIENT_PATH = "thin-jar"; + static final String WLS_URL = "url"; + static final String WLS_INIT_CONTEXT_PRINCIPAL = "principal"; + static final String WLS_INIT_CONTEXT_CREDENTIAL = "credentials"; + /** + * Microprofile messaging Weblogic JMS connector name. + */ + public static final String CONNECTOR_NAME = "helidon-weblogic-jms"; + + @Inject + protected WeblogicConnector(Config config, + Instance connectionFactories) { + super(config, connectionFactories); + config.get("mp.messaging.connector.helidon-weblogic-jms.thin-jar") + .asString() + .ifPresent(ThinClientClassLoader::setThinJarLocation); + } + + protected WeblogicConnector(Map connectionFactoryMap, + ScheduledExecutorService scheduler, + String thinJarLocation, + ExecutorService executor) { + super(connectionFactoryMap, scheduler, executor); + ThinClientClassLoader.setThinJarLocation(thinJarLocation); + } + + @Override + public PublisherBuilder> getPublisherBuilder(org.eclipse.microprofile.config.Config mpConfig) { + return super.getPublisherBuilder(WlsConnectorConfigAliases.map(mpConfig)); + } + + @Override + public SubscriberBuilder, Void> getSubscriberBuilder(org.eclipse.microprofile.config.Config mpConfig) { + return super.getSubscriberBuilder(WlsConnectorConfigAliases.map(mpConfig)); + } + + @Override + protected MessageConsumer createConsumer(Config config, + Destination destination, + SessionMetadata sessionEntry) throws JMSException { + return executeInIsolation(() -> super.createConsumer(config, destination, sessionEntry)); + } + + @Override + protected Optional getFactory(ConnectionContext ctx) { + return executeInIsolation(() -> super.getFactory(ctx)); + } + + @Override + protected Destination createDestination(Session session, ConnectionContext ctx) { + return executeInIsolation(() -> super.createDestination(session, ctx)); + } + + @Override + protected SessionMetadata prepareSession(Config config, ConnectionFactory factory) throws JMSException { + return executeInIsolation(() -> super.prepareSession(config, factory)); + } + + @Override + protected CompletionStage consumeAsync(Message m, + Session session, + AtomicReference mapper, + MessageProducer producer, + Config config) { + return executeInIsolation(() -> super.consumeAsync(m, session, mapper, producer, config)); + } +} diff --git a/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/WlsConnectorConfigAliases.java b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/WlsConnectorConfigAliases.java new file mode 100644 index 00000000000..0e4a2487c53 --- /dev/null +++ b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/WlsConnectorConfigAliases.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.messaging.connectors.wls; + +import java.util.HashMap; +import java.util.Map; + +import io.helidon.config.ConfigSources; +import io.helidon.config.mp.MpConfig; +import io.helidon.config.mp.MpConfigSources; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.spi.ConfigProviderResolver; + +class WlsConnectorConfigAliases { + + private WlsConnectorConfigAliases() { + } + + private static final Map ALIASES = Map.of( + WeblogicConnector.JMS_FACTORY_ATTRIBUTE, "jndi.jms-factory", + "url", "jndi.env-properties.java.naming.provider.url", + "principal", "jndi.env-properties.java.naming.security.principal", + "credentials", "jndi.env-properties.java.naming.security.credentials" + ); + + static Config map(Config connConfig) { + Map mapped = new HashMap<>(); + + mapped.put("jndi.env-properties.java.naming.factory.initial", IsolatedContextFactory.class.getName()); + + ALIASES.forEach((key, value) -> connConfig.getOptionalValue(key, String.class) + .ifPresent(s -> mapped.put(value, s))); + + io.helidon.config.Config cfg = io.helidon.config.Config.builder() + .addSource(ConfigSources.create(MpConfig.toHelidonConfig(connConfig))) + .disableEnvironmentVariablesSource() + .disableSystemPropertiesSource() + .disableParserServices() + .disableCaching() + .disableValueResolving() + .build(); + + return ConfigProviderResolver.instance() + .getBuilder() + .withSources(MpConfigSources.create(mapped), MpConfigSources.create(cfg)) + .build(); + } +} diff --git a/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/package-info.java b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/package-info.java new file mode 100644 index 00000000000..4cde68441b1 --- /dev/null +++ b/messaging/connectors/wls-jms/src/main/java/io/helidon/messaging/connectors/wls/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Microprofile messaging Weblogic JMS connector. + */ +package io.helidon.messaging.connectors.wls; diff --git a/messaging/connectors/wls-jms/src/main/java/module-info.java b/messaging/connectors/wls-jms/src/main/java/module-info.java new file mode 100644 index 00000000000..4e78fefff7f --- /dev/null +++ b/messaging/connectors/wls-jms/src/main/java/module-info.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Microprofile messaging Weblogic JMS connector. + */ +module io.helidon.messaging.connectors.wls { + requires java.logging; + + requires static jakarta.cdi; + requires static jakarta.inject; + requires io.helidon.messaging.connectors.jms; + requires jakarta.jms.api; + requires java.naming; + requires microprofile.config.api; + requires io.helidon.config.mp; + + exports io.helidon.messaging.connectors.wls; +} diff --git a/messaging/connectors/wls-jms/src/main/resources/META-INF/beans.xml b/messaging/connectors/wls-jms/src/main/resources/META-INF/beans.xml new file mode 100644 index 00000000000..6d563a7f80c --- /dev/null +++ b/messaging/connectors/wls-jms/src/main/resources/META-INF/beans.xml @@ -0,0 +1,24 @@ + + + + diff --git a/messaging/messaging/src/main/java/io/helidon/messaging/NackHandler.java b/messaging/messaging/src/main/java/io/helidon/messaging/NackHandler.java new file mode 100644 index 00000000000..9e606daa19b --- /dev/null +++ b/messaging/messaging/src/main/java/io/helidon/messaging/NackHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.messaging; + +import java.util.concurrent.CompletionStage; +import java.util.function.Function; + +import org.eclipse.microprofile.reactive.messaging.Message; + +/** + * Nack handler for interface for messages connectors. + * + * @param Connector specific message type + */ +public interface NackHandler> { + + /** + * Error type causing the message to be sent to DLQ. + */ + String DLQ_ERROR_PROP = "dlq-error"; + /** + * Message from error causing DLQ redirection. + */ + String DLQ_ERROR_MSG_PROP = "dlq-error-msg"; + /** + * Original destination of this message. + */ + String DLQ_ORIG_TOPIC_PROP = "dlq-orig-destination"; + + /** + * Return nack function to be used by message when nacked. + * + * @param message owner message of the nack function + * @return nack function to be used by message when nacked + */ + Function> getNack(M message); +} diff --git a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractJmsTest.java b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractJmsTest.java index dcb290b80c6..bca413f1718 100644 --- a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractJmsTest.java +++ b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractJmsTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,13 +26,17 @@ import org.junit.jupiter.api.BeforeAll; public class AbstractJmsTest { + + static final String BROKER_URL = "vm://localhost?broker.persistent=false"; +// static final String BROKER_URL = "tcp://localhost:61616"; static Session session; static ConnectionFactory connectionFactory; @BeforeAll static void beforeAll() throws Exception { - connectionFactory = JakartaJms.create(new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false")); + connectionFactory = JakartaJms.create(new ActiveMQConnectionFactory(BROKER_URL)); Connection connection = connectionFactory.createConnection(); + connection.start(); session = connection.createSession(false, AcknowledgeMode.AUTO_ACKNOWLEDGE.getAckMode()); } diff --git a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractSampleBean.java b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractSampleBean.java index 15fded35d28..439c4dbf231 100644 --- a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractSampleBean.java +++ b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AbstractSampleBean.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -117,7 +117,7 @@ public static class Channel1 extends AbstractSampleBean { @Incoming("test-channel-1") @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public CompletionStage channel1(Message msg) { + public CompletionStage channel1(Message msg) { LOGGER.fine(() -> String.format("Received %s", msg.getPayload())); consumed().add(msg.getPayload()); msg.ack(); @@ -137,12 +137,12 @@ public Message channel2ToChannel3(Message msg) { return Message.of("Processed" + msg.getPayload()); } - @Incoming("test-channel-7") + @Incoming("test-channel-31") @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public CompletionStage channel7(Message msg) { + public CompletionStage channel31(Message msg) { LOGGER.fine(() -> String.format("Received %s", msg.getPayload())); consumed().add(msg.getPayload()); - msg.ack().whenComplete((a, b) -> countDown("channel7()")); + msg.ack().whenComplete((a, b) -> countDown("channel31()")); return CompletableFuture.completedFuture(null); } } @@ -151,7 +151,7 @@ public CompletionStage channel7(Message msg) { public static class ChannelError extends AbstractSampleBean { @Incoming("test-channel-error") @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public CompletionStage error(Message msg) { + public CompletionStage error(Message msg) { try { LOGGER.fine(() -> String.format("Received possible error %s", msg.getPayload())); consumed().add(Integer.toString(Integer.parseInt(msg.getPayload()))); @@ -168,7 +168,7 @@ public static class ChannelSelector extends AbstractSampleBean { @Incoming("test-channel-selector") @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public CompletionStage selector(Message msg) { + public CompletionStage selector(Message msg) { LOGGER.fine(() -> String.format("Received %s", msg.getPayload())); consumed().add(msg.getPayload()); msg.ack(); @@ -218,38 +218,41 @@ public void onComplete() { public static class Channel5 extends AbstractSampleBean { @Incoming("test-channel-5") - public SubscriberBuilder, Void> channel5() { - LOGGER.fine(() -> "In channel5"); - return ReactiveStreams.>builder() - .to(new Subscriber>() { - @Override - public void onSubscribe(Subscription subscription) { - LOGGER.fine(() -> "channel5 onSubscribe()"); - subscription.request(3); - } + public void channel5(String msg) { + this.consumed().add(String.valueOf(Integer.parseInt(msg))); + countDown("channel5(String msg)"); + } + } - @Override - public void onNext(Message msg) { - consumed().add(Integer.toString(Integer.parseInt(msg.getPayload()))); - LOGGER.fine(() -> "Added " + msg.getPayload()); - msg.ack(); - countDown("onNext()"); - } + @ApplicationScoped + public static class Channel6 extends AbstractSampleBean { - @Override - public void onError(Throwable t) { - LOGGER.fine(() -> "Error " + t.getMessage() + ". Adding error in consumed() list"); - consumed().add("error"); - countDown("onError()"); - } + @Incoming("test-channel-6") + public void channel6(String msg) { + this.consumed().add(String.valueOf(Integer.parseInt(msg))); + countDown("channel6(String msg)"); + } + } - @Override - public void onComplete() { - consumed().add("complete"); - countDown("onComplete()"); - } - }); + @ApplicationScoped + public static class Channel7 extends AbstractSampleBean { + + @Incoming("test-channel-7") + @Outgoing("test-channel-71") + public Integer channel7(String msg) { + return Integer.parseInt(msg); } + + @Incoming("test-channel-71") + public SubscriberBuilder sink(){ + return ReactiveStreams.builder() + .map(String::valueOf) + .onErrorResume(t -> "error") + .peek(s -> this.consumed().add(s)) + .forEach(s -> countDown("channel7(String msg)")); + } + + } @ApplicationScoped diff --git a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AssertingHandler.java b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AssertingHandler.java new file mode 100644 index 00000000000..8692f29c5ce --- /dev/null +++ b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/AssertingHandler.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.messaging.connectors.jms; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Handler; +import java.util.logging.LogRecord; + +import org.hamcrest.Matchers; + +import static org.hamcrest.MatcherAssert.assertThat; + +public class AssertingHandler extends Handler { + + private List recordList = new ArrayList<>(); + + @Override + public void publish(LogRecord record) { + recordList.add(record.getMessage()); + } + + @Override + public void flush() { + + } + + @Override + public void close() throws SecurityException { + + } + + void assertLogMessageLogged(String message) { + assertThat(recordList, Matchers.hasItem(message)); + } +} diff --git a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/JmsMpTest.java b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/JmsMpTest.java index 9b340aa53f9..5e4fb017ffc 100644 --- a/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/JmsMpTest.java +++ b/tests/integration/jms/src/test/java/io/helidon/messaging/connectors/jms/JmsMpTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,11 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.logging.LogManager; +import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import io.helidon.microprofile.config.ConfigCdiExtension; import io.helidon.microprofile.messaging.MessagingCdiExtension; @@ -38,9 +41,13 @@ import jakarta.enterprise.inject.spi.CDI; import jakarta.jms.JMSException; import jakarta.jms.TextMessage; -import org.junit.jupiter.api.Disabled; + +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + @HelidonTest @DisableDiscovery @AddBeans({ @@ -48,6 +55,8 @@ @AddBean(AbstractSampleBean.Channel1.class), @AddBean(AbstractSampleBean.Channel4.class), @AddBean(AbstractSampleBean.Channel5.class), + @AddBean(AbstractSampleBean.Channel6.class), + @AddBean(AbstractSampleBean.Channel7.class), @AddBean(AbstractSampleBean.ChannelSelector.class), @AddBean(AbstractSampleBean.ChannelError.class), @AddBean(AbstractSampleBean.ChannelProcessor.class), @@ -62,7 +71,7 @@ }) @AddConfigs({ @AddConfig(key = "mp.messaging.connector.helidon-jms.jndi.env-properties.java.naming.provider.url", - value = "vm://localhost?broker.persistent=false"), + value = AbstractJmsTest.BROKER_URL), @AddConfig(key = "mp.messaging.connector.helidon-jms.jndi.env-properties.java.naming.factory.initial", value = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"), @@ -78,9 +87,9 @@ @AddConfig(key = "mp.messaging.outgoing.test-channel-3.type", value = "topic"), @AddConfig(key = "mp.messaging.outgoing.test-channel-3.destination", value = JmsMpTest.TEST_TOPIC_3), - @AddConfig(key = "mp.messaging.incoming.test-channel-7.connector", value = JmsConnector.CONNECTOR_NAME), - @AddConfig(key = "mp.messaging.incoming.test-channel-7.type", value = "topic"), - @AddConfig(key = "mp.messaging.incoming.test-channel-7.destination", value = JmsMpTest.TEST_TOPIC_3), + @AddConfig(key = "mp.messaging.incoming.test-channel-31.connector", value = JmsConnector.CONNECTOR_NAME), + @AddConfig(key = "mp.messaging.incoming.test-channel-31.type", value = "topic"), + @AddConfig(key = "mp.messaging.incoming.test-channel-31.destination", value = JmsMpTest.TEST_TOPIC_3), @AddConfig(key = "mp.messaging.incoming.test-channel-error.connector", value = JmsConnector.CONNECTOR_NAME), @AddConfig(key = "mp.messaging.incoming.test-channel-error.type", value = "topic"), @@ -91,14 +100,24 @@ @AddConfig(key = "mp.messaging.incoming.test-channel-4.destination", value = JmsMpTest.TEST_TOPIC_4), @AddConfig(key = "mp.messaging.incoming.test-channel-5.connector", value = JmsConnector.CONNECTOR_NAME), - @AddConfig(key = "mp.messaging.incoming.test-channel-5.type", value = "topic"), - @AddConfig(key = "mp.messaging.incoming.test-channel-5.destination", value = JmsMpTest.TEST_TOPIC_5), + @AddConfig(key = "mp.messaging.incoming.test-channel-5.type", value = "queue"), + @AddConfig(key = "mp.messaging.incoming.test-channel-5.destination", value = JmsMpTest.TEST_QUEUE_5), + @AddConfig(key = "mp.messaging.incoming.test-channel-5.nack-dlq", value = JmsMpTest.DLQ_QUEUE), + + @AddConfig(key = "mp.messaging.incoming.test-channel-6.connector", value = JmsConnector.CONNECTOR_NAME), + @AddConfig(key = "mp.messaging.incoming.test-channel-6.type", value = "queue"), + @AddConfig(key = "mp.messaging.incoming.test-channel-6.destination", value = JmsMpTest.TEST_QUEUE_6), + @AddConfig(key = "mp.messaging.incoming.test-channel-6.nack-log-only", value = "true"), + + @AddConfig(key = "mp.messaging.incoming.test-channel-7.connector", value = JmsConnector.CONNECTOR_NAME), + @AddConfig(key = "mp.messaging.incoming.test-channel-7.type", value = "queue"), + @AddConfig(key = "mp.messaging.incoming.test-channel-7.destination", value = JmsMpTest.TEST_QUEUE_7), @AddConfig(key = "mp.messaging.incoming.test-channel-selector.connector", value = JmsConnector.CONNECTOR_NAME), @AddConfig(key = "mp.messaging.incoming.test-channel-selector.message-selector", value = "source IN ('helidon','voyager')"), @AddConfig(key = "mp.messaging.incoming.test-channel-selector.type", value = "topic"), - @AddConfig(key = "mp.messaging.incoming.test-channel-selector.destination", value = JmsMpTest.TEST_TOPIC_6), + @AddConfig(key = "mp.messaging.incoming.test-channel-selector.destination", value = JmsMpTest.TEST_TOPIC_SELECTOR), @AddConfig(key = "mp.messaging.incoming.test-channel-bytes-fromJms.connector", value = JmsConnector.CONNECTOR_NAME), @AddConfig(key = "mp.messaging.incoming.test-channel-bytes-fromJms.type", value = "queue"), @@ -140,21 +159,23 @@ @AddConfig(key = "mp.messaging.outgoing.test-channel-derived-msg-toJms.type", value = "queue"), @AddConfig(key = "mp.messaging.outgoing.test-channel-derived-msg-toJms.destination", value = JmsMpTest.TEST_TOPIC_DERIVED_2), }) -@Disabled("3.0.0-JAKARTA") class JmsMpTest extends AbstractMPTest { static final String TEST_TOPIC_1 = "topic-1"; static final String TEST_TOPIC_2 = "topic-2"; static final String TEST_TOPIC_3 = "topic-3"; static final String TEST_TOPIC_4 = "topic-4"; - static final String TEST_TOPIC_5 = "topic-5"; - static final String TEST_TOPIC_6 = "topic-6"; + static final String TEST_QUEUE_5 = "queue-5"; + static final String TEST_QUEUE_6 = "queue-6"; + static final String TEST_TOPIC_SELECTOR = "topic-selector"; + static final String TEST_QUEUE_7 = "queue-7"; static final String TEST_TOPIC_BYTES = "topic-bytes"; static final String TEST_TOPIC_PROPS = "topic-properties"; static final String TEST_TOPIC_CUST_MAPPER = "topic-cust-mapper"; static final String TEST_TOPIC_DERIVED_1 = "topic-derived-1"; static final String TEST_TOPIC_DERIVED_2 = "topic-derived-2"; static final String TEST_TOPIC_ERROR = "topic-error"; + static final String DLQ_QUEUE = "DLQ_TOPIC"; @Test void messageSelector() { @@ -166,7 +187,7 @@ void messageSelector() { "reliant"); //configured selector: source IN ('helidon','voyager') AbstractSampleBean bean = CDI.current().select(AbstractSampleBean.ChannelSelector.class).get(); - produceAndCheck(bean, testData, TEST_TOPIC_6, List.of("helidon", "voyager"), this::setSourceProperty); + produceAndCheck(bean, testData, TEST_TOPIC_SELECTOR, List.of("helidon", "voyager"), this::setSourceProperty); } private void setSourceProperty(TextMessage m) { @@ -217,13 +238,60 @@ void withBackPressure() { } @Test - void withBackPressureAndError() { + void withBackPressureAndNackKillChannel() { List testData = Arrays.asList("2222", "2222"); - AbstractSampleBean bean = CDI.current().select(AbstractSampleBean.Channel5.class).get(); - produceAndCheck(bean, testData, TEST_TOPIC_5, testData); + AbstractSampleBean bean = CDI.current().select(AbstractSampleBean.Channel7.class).get(); + produceAndCheck(bean, testData, TEST_QUEUE_7, testData); bean.restart(); - testData = Collections.singletonList("not a number"); - produceAndCheck(bean, testData, TEST_TOPIC_5, Collections.singletonList("error")); + produceAndCheck(bean, List.of("not a number"), TEST_QUEUE_7, Collections.singletonList("error")); + } + + @Test + void noAckDQL() throws JMSException { + List expected = List.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); + List testData = List.of("0", "1", "2", "3", "4", "5", "not a number!", "6", "7", "8", "9", "10"); + + + AbstractSampleBean.Channel5 channel5 = CDI.current().select(AbstractSampleBean.Channel5.class).get(); + produceAndCheck(channel5, testData, TEST_QUEUE_5, expected); + + List dlq = consumeAllCurrent(DLQ_QUEUE) + .map(TextMessage.class::cast) + .toList(); + + assertThat(dlq.stream() + .map(tm -> { + try { + return tm.getText(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + }) + .toList(), Matchers.contains("not a number!")); + TextMessage textMessage = dlq.get(0); + assertThat(textMessage.getStringProperty("dlq-error"), is("java.lang.NumberFormatException")); + assertThat(textMessage.getStringProperty("dlq-error-msg"), is("For input string: \"not a number!\"")); + assertThat(textMessage.getStringProperty("dlq-orig-destination"), is(JmsMpTest.TEST_QUEUE_5)); + } + + @Test + void noAckLogOnly() throws JMSException { + Logger nackHandlerLogger = LogManager.getLogManager().getLogger(JmsNackHandler.class.getName()); + AssertingHandler assertingHandler = new AssertingHandler(); + nackHandlerLogger.addHandler(assertingHandler); + try { + List expected = List.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10"); + List testData = expected.stream() + .flatMap(s -> "5".equals(s) ? Stream.of(s, "not a number!") : Stream.of(s)) + .toList(); + + + AbstractSampleBean.Channel6 channel6 = CDI.current().select(AbstractSampleBean.Channel6.class).get(); + produceAndCheck(channel6, testData, TEST_QUEUE_6, expected); + assertingHandler.assertLogMessageLogged("NACKED Message ignored\nchannel: test-channel-6\n"); + } finally { + nackHandlerLogger.removeHandler(assertingHandler); + } } @Test diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java index 757347bc5aa..4d07d6fd69a 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java @@ -85,7 +85,7 @@ public class KafkaSeTest extends AbstractKafkaTest { private static final String TEST_SE_TOPIC_9 = "special-se-topic-9"; private static final String TEST_SE_TOPIC_PATTERN_34 = "special-se-topic-[3-4]"; - static Logger nackHandlerLogLogger = Logger.getLogger(NackHandler.Log.class.getName()); + static Logger nackHandlerLogLogger = Logger.getLogger(KafkaNackHandler.Log.class.getName()); private static final List logNackHandlerWarnings = new ArrayList<>(1);