Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x] 6037 AQ connector @ConnectorAttribute #6038

Merged
merged 2 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,12 +26,113 @@
import io.helidon.common.Builder;
import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
import io.helidon.common.configurable.ThreadPoolSupplier;
import io.helidon.messaging.connectors.jms.JmsConnector;

import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttribute;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory;

/**
* Reactive Messaging Oracle AQ connector.
*/
@ConnectorAttribute(name = AqConnector.DATASOURCE_ATTRIBUTE,
description = "name of the datasource bean used to connect Oracle DB with AQ",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = AqConnector.URL_ATTRIBUTE,
description = "jdbc connection string used to connect Oracle DB with AQ (forbidden when datasource is specified)",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.USERNAME_ATTRIBUTE,
description = "User name used to connect JMS session",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.PASSWORD_ATTRIBUTE,
description = "Password to connect JMS session",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.TYPE_ATTRIBUTE,
description = "Possible values are: queue, topic",
defaultValue = "queue",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.DESTINATION_ATTRIBUTE,
description = "Queue or topic name",
mandatory = true,
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.ACK_MODE_ATTRIBUTE,
description = "Possible values are: "
+ "AUTO_ACKNOWLEDGE- session automatically acknowledges a client’s receipt of a message, "
+ "CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually, "
+ "DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages.",
defaultValue = "AUTO_ACKNOWLEDGE",
direction = ConnectorAttribute.Direction.INCOMING,
type = "io.helidon.messaging.connectors.jms.AcknowledgeMode")
@ConnectorAttribute(name = JmsConnector.TRANSACTED_ATTRIBUTE,
description = "Indicates whether the session will use a local transaction.",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.AWAIT_ACK_ATTRIBUTE,
description = "Wait for the acknowledgement of previous message before pulling next one.",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.MESSAGE_SELECTOR_ATTRIBUTE,
description = "JMS API message selector expression based on a subset of the SQL92. "
+ "Expression can only access headers and properties, not the payload.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING,
type = "string")
@ConnectorAttribute(name = JmsConnector.CLIENT_ID_ATTRIBUTE,
description = "Client identifier for JMS connection.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.DURABLE_ATTRIBUTE,
description = "True for creating durable consumer (only for topic).",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.SUBSCRIBER_NAME_ATTRIBUTE,
description = "Subscriber name for durable consumer used to identify subscription.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING,
type = "string")
@ConnectorAttribute(name = JmsConnector.NON_LOCAL_ATTRIBUTE,
description = "If true then any messages published to the topic using this session’s connection, "
+ "or any other connection with the same client identifier, "
+ "will not be added to the durable subscription.",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.NAMED_FACTORY_ATTRIBUTE,
description = "Select in case factory is injected as a named bean or configured with name.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
@ConnectorAttribute(name = JmsConnector.POLL_TIMEOUT_ATTRIBUTE,
description = "Timeout for polling for next message in every poll cycle in millis. Default value: 50",
mandatory = false,
defaultValue = "50",
direction = ConnectorAttribute.Direction.INCOMING,
type = "long")
@ConnectorAttribute(name = JmsConnector.PERIOD_EXECUTIONS_ATTRIBUTE,
description = "Period for executing poll cycles in millis.",
mandatory = false,
defaultValue = "100",
direction = ConnectorAttribute.Direction.INCOMING,
type = "long")
@ConnectorAttribute(name = JmsConnector.SESSION_GROUP_ID_ATTRIBUTE,
description = "When multiple channels share same session-group-id, "
+ "they share same JMS session and same JDBC connection as well.",
mandatory = false,
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "string")
public interface AqConnector extends ConnectorFactory {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.AWAIT_ACK_ATTRIBUTE,
description = "Wait for the acknowledgement of previous message before pulling next one.",
mandatory = false,
defaultValue = "false",
direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING,
type = "boolean")
@ConnectorAttribute(name = JmsConnector.MESSAGE_SELECTOR_ATTRIBUTE,
description = "JMS API message selector expression based on a subset of the SQL92. "
+ "Expression can only access headers and properties, not the payload.",
Expand Down Expand Up @@ -186,43 +192,93 @@ public class JmsConnector implements IncomingConnectorFactory, OutgoingConnector
/**
* Select in case factory is injected as a named bean or configured with name.
*/
protected static final String NAMED_FACTORY_ATTRIBUTE = "named-factory";
public static final String NAMED_FACTORY_ATTRIBUTE = "named-factory";

/**
* User name used with ConnectionFactory.
* Username used with ConnectionFactory.
*/
protected static final String USERNAME_ATTRIBUTE = "username";
public static final String USERNAME_ATTRIBUTE = "username";

/**
* Password used with ConnectionFactory.
*/
protected static final String PASSWORD_ATTRIBUTE = "password";
public static final String PASSWORD_ATTRIBUTE = "password";

/**
* Client identifier for JMS connection.
*/
protected static final String CLIENT_ID_ATTRIBUTE = "client-id";
public static final String CLIENT_ID_ATTRIBUTE = "client-id";

/**
* True for creating durable consumer (only for topic).
*/
protected static final String DURABLE_ATTRIBUTE = "durable";
public static final String DURABLE_ATTRIBUTE = "durable";

/**
* Subscriber name for durable consumer used to identify subscription.
*/
protected static final String SUBSCRIBER_NAME_ATTRIBUTE = "subscriber-name";
public static final String SUBSCRIBER_NAME_ATTRIBUTE = "subscriber-name";

/**
* If true then any messages published to the topic using this session's connection,
* or any other connection with the same client identifier,
* will not be added to the durable subscription.
*/
protected static final String NON_LOCAL_ATTRIBUTE = "non-local";

static final String ACK_MODE_ATTRIBUTE = "acknowledge-mode";
static final String TRANSACTED_ATTRIBUTE = "transacted";
static final String AWAIT_ACK_ATTRIBUTE = "await-ack";
static final String MESSAGE_SELECTOR_ATTRIBUTE = "message-selector";
static final String POLL_TIMEOUT_ATTRIBUTE = "poll-timeout";
static final String PERIOD_EXECUTIONS_ATTRIBUTE = "period-executions";
static final String TYPE_ATTRIBUTE = "type";
static final String DESTINATION_ATTRIBUTE = "destination";
static final String SESSION_GROUP_ID_ATTRIBUTE = "session-group-id";
public static final String NON_LOCAL_ATTRIBUTE = "non-local";

/**
* JMS acknowledge mode.
* <p>
* Possible values are:
* </p>
* <ul>
* <li>AUTO_ACKNOWLEDGE - session automatically acknowledges a client’s receipt of a message,
* <li>CLIENT_ACKNOWLEDGE - receipt of a message is acknowledged only when Message.ack() is called manually,
* <li>DUPS_OK_ACKNOWLEDGE - session lazily acknowledges the delivery of messages.
* </ul>
*/
public static final String ACK_MODE_ATTRIBUTE = "acknowledge-mode";

/**
* Indicates whether the session will use a local transaction.
*/
public static final String TRANSACTED_ATTRIBUTE = "transacted";

/**
* Wait for the acknowledgement of previous message before pulling next one.
*/
public static final String AWAIT_ACK_ATTRIBUTE = "await-ack";

/**
* JMS API message selector expression based on a subset of the SQL92.
* Expression can only access headers and properties, not the payload.
*/
public static final String MESSAGE_SELECTOR_ATTRIBUTE = "message-selector";

/**
* Timeout for polling for next message in every poll cycle in millis.
*/
public static final String POLL_TIMEOUT_ATTRIBUTE = "poll-timeout";

/**
* Period for executing poll cycles in millis.
*/
public static final String PERIOD_EXECUTIONS_ATTRIBUTE = "period-executions";

/**
* Possible values are: queue, topic.
*/
public static final String TYPE_ATTRIBUTE = "type";

/**
* Queue or topic name.
*/
public static final String DESTINATION_ATTRIBUTE = "destination";

/**
* When multiple channels share same session-group-id, they share same JMS session and same JDBC connection as well.
*/
public static final String SESSION_GROUP_ID_ATTRIBUTE = "session-group-id";
static final String JNDI_ATTRIBUTE = "jndi";
static final String JNDI_PROPS_ATTRIBUTE = "env-properties";
static final String JNDI_JMS_FACTORY_ATTRIBUTE = "jms-factory";
Expand Down