Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ class BeamModulePlugin implements Plugin<Project> {
//
// There are a few versions are determined by the BOMs by running scripts/tools/bomupgrader.py
// marked as [bomupgrader]. See the documentation of that script for detail.
def activemq_version = "5.19.2"
def activemq_version = "6.2.5"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

ActiveMQ 6.x requires Java 17 as a minimum runtime version. Since Apache Beam still supports and runs tests on Java 8 and Java 11, upgrading activemq_version to 6.2.5 will cause test execution failures (such as UnsupportedClassVersionError) on Java 8 and Java 11 environments.

Please consider restricting the JmsIO tests to only run when the build JDK is 17 or higher, or verify how Java 8/11 compatibility will be maintained for the test suite.

def autovalue_version = "1.9"
def autoservice_version = "1.0.1"
def aws_java_sdk2_version = "2.20.162"
Expand Down Expand Up @@ -641,7 +641,7 @@ class BeamModulePlugin implements Plugin<Project> {
def protobuf_version = "4.33.2"
// TODO(https://github.com/apache/beam/issues/37637): Remove this once the Bom has been updated to at least reach this version
def bigtable_version = "2.73.1"
def qpid_jms_client_version = "0.61.0"
def qpid_jms_client_version = "2.10.0"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Qpid JMS Client 2.x requires Java 11 as a minimum runtime version. Since Apache Beam still supports Java 8, upgrading qpid_jms_client_version to 2.10.0 will break Java 8 compatibility for any users or tests utilizing this client. Please verify if this is intentional or if we need to restrict this dependency/module to Java 11+.

def quickcheck_version = "1.0"
def sbe_tool_version = "1.25.1"
def singlestore_jdbc_version = "1.1.4"
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/io/jms/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.geronimo.specs:geronimo-jms_2.0_spec:1.0-alpha-2"
implementation "jakarta.jms:jakarta.jms-api:3.1.0"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

jakarta.jms:jakarta.jms-api:3.1.0 (Jakarta Messaging 3.1) is compiled for Java 11 (class file version 55.0). Since Apache Beam still supports Java 8, using version 3.1.0 will break Java 8 compatibility for the beam-sdks-java-io-jms module.

To maintain Java 8 compatibility while migrating to the jakarta.jms namespace, please use jakarta.jms:jakarta.jms-api:3.0.0 (Jakarta Messaging 3.0), which is compiled for Java 8 (class file version 52.0).

  implementation "jakarta.jms:jakarta.jms-api:3.0.0"

testImplementation library.java.activemq_amqp
testImplementation library.java.activemq_broker
testImplementation library.java.activemq_jaas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
*/
package org.apache.beam.sdk.io.jms;

import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
Expand All @@ -36,15 +45,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read.Unbounded;
Expand Down Expand Up @@ -86,9 +86,9 @@
*
* <p>JmsIO source returns unbounded collection of JMS records as {@code PCollection<JmsRecord>}. A
* {@link JmsRecord} includes JMS headers and properties, along with the JMS {@link
* javax.jms.TextMessage} payload.
* jakarta.jms.TextMessage} payload.
*
* <p>To configure a JMS source, you have to provide a {@link javax.jms.ConnectionFactory} and the
* <p>To configure a JMS source, you have to provide a {@link jakarta.jms.ConnectionFactory} and the
* destination (queue or topic) where to consume. The following example illustrates various options
* for configuring the source:
*
Expand All @@ -102,8 +102,8 @@
*
* }</pre>
*
* <p>It is possible to read any type of JMS {@link javax.jms.Message} into a custom POJO using the
* following configuration:
* <p>It is possible to read any type of JMS {@link jakarta.jms.Message} into a custom POJO using
* the following configuration:
*
* <pre>{@code
* pipeline.apply(JmsIO.<T>readMessage()
Expand All @@ -121,8 +121,8 @@
* <h3>Writing to a JMS destination</h3>
*
* <p>JmsIO sink supports writing text messages to a JMS destination on a broker. To configure a JMS
* sink, you must specify a {@link javax.jms.ConnectionFactory} and a {@link javax.jms.Destination}
* name. For instance:
* sink, you must specify a {@link jakarta.jms.ConnectionFactory} and a {@link
* jakarta.jms.Destination} name. For instance:
*
* <pre>{@code
* pipeline
Expand Down Expand Up @@ -1053,7 +1053,7 @@ public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, String> to
}

/**
* Map the {@code EventT} object to a {@link javax.jms.Message}.
* Map the {@code EventT} object to a {@link jakarta.jms.Message}.
*
* <p>For instance:
*
Expand All @@ -1075,7 +1075,7 @@ public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, String> to
* .apply(JmsIO.write().withValueMapper(valueNapper)
* }</pre>
*
* @param valueMapper The function returning the {@link javax.jms.Message}
* @param valueMapper The function returning the {@link jakarta.jms.Message}
* @return The corresponding {@link JmsIO.Write}.
*/
public Write<EventT> withValueMapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.beam.sdk.io.jms;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import jakarta.jms.Destination;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import javax.jms.Destination;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.beam.sdk.transforms.SerializableBiFunction;

/**
* The TextMessageMapper takes a {@link String} value, a {@link javax.jms.Session} and returns a
* {@link javax.jms.TextMessage}.
* The TextMessageMapper takes a {@link String} value, a {@link jakarta.jms.Session} and returns a
* {@link jakarta.jms.TextMessage}.
*/
public class TextMessageMapper implements SerializableBiFunction<String, Session, Message> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.jms;

import jakarta.jms.BytesMessage;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Message;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
Expand Down Expand Up @@ -143,7 +143,7 @@ Class<? extends ConnectionFactory> getConnectionFactoryClass() {
return this.connectionFactoryClass;
}

/** A test class that maps a {@link javax.jms.BytesMessage} into a {@link String}. */
/** A test class that maps a {@link jakarta.jms.BytesMessage} into a {@link String}. */
public static class BytesMessageToStringMessageMapper implements JmsIO.MessageMapper<String> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.QueueBrowser;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
Expand All @@ -32,13 +39,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.beam.sdk.PipelineResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.QueueBrowser;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
Expand All @@ -71,16 +81,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.Callback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
*/
package org.apache.beam.sdk.io.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;

public class MockNonSerializableConnectionFactory implements ConnectionFactory {
@Override
Expand Down
Loading