Skip to content

Commit

Permalink
Merge pull request #353 from IBMStreams/develop
Browse files Browse the repository at this point in the history
Merge latest develop changes into master
  • Loading branch information
schubon committed Mar 12, 2019
2 parents de8a3b5 + f465922 commit dbca707
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,14 @@ private void convertProviderURLPath(File applicationDir) throws ParseConnectionD

// subroutine to parse and validate the connection document
// called by both the JMSSink and JMSSource
public void parseAndValidateConnectionDocument(String connectionDocument, String connection, String access,
StreamSchema streamSchema, boolean isProducer, File applicationDir) throws ParseConnectionDocumentException, SAXException,
IOException, ParserConfigurationException {
public void parseAndValidateConnectionDocument( String connectionDocument,
String connection,
String access,
StreamSchema streamSchema,
boolean isProducer,
File applicationDir)
throws ParseConnectionDocumentException, SAXException, IOException, ParserConfigurationException
{
// validate the connections document against the xsd
validateConnectionsXML(connectionDocument);
// create document builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class JMSConnectionHelper implements ExceptionListener {
// jndi context
private Context jndiContext = null;
// connection
private Connection connect = null;
private Connection connection = null;
// JMS message producer
private MessageProducer producer = null;
// JMS message consumer
Expand Down Expand Up @@ -133,8 +133,8 @@ private void setSessionCreationTime(long sessionCreationTime) {
}

// procedure to detrmine if there exists a valid connection or not
private boolean isConnectValid() {
if (connect != null)
private boolean isConnectionValid() {
if (connection != null)
return true;
return false;
}
Expand Down Expand Up @@ -173,13 +173,13 @@ private synchronized void setSession(Session session) {

// setter for connect
// connect is thread safe.Hence not synchronizing.
private void setConnect(Connection connect) {
this.connect = connect;
private void setConnection(Connection connection) {
this.connection = connection;
}

// getter for connect
private Connection getConnect() {
return connect;
private Connection getConnection() {
return connection;
}

// logger to get error messages
Expand Down Expand Up @@ -301,7 +301,7 @@ private synchronized void createConnection() throws ConnectionException, Interru
int nConnectionAttempts = 0;

// Check if connection exists or not.
if (!isConnectValid()) {
if (!isConnectionValid()) {

// Delay in miliseconds as specified in period parameter
final long delay = TimeUnit.MILLISECONDS.convert((long) period,
Expand Down Expand Up @@ -367,7 +367,7 @@ private synchronized void createConnectionNoRetry() throws ConnectionException {

tracer.log(TraceLevel.TRACE, "Begin createConnectionNoRetry()"); //$NON-NLS-1$

if (!isConnectValid()) {
if (!isConnectionValid()) {
try {
connect(isProducer);
} catch (JMSException e) {
Expand All @@ -389,22 +389,22 @@ private boolean connect(boolean isProducer) throws JMSException {
if (userPrincipal != null && !userPrincipal.isEmpty() &&
userCredential != null && !userCredential.isEmpty() ) {
tracer.log(TraceLevel.TRACE, "Create connection for user: " + userPrincipal); //$NON-NLS-1$
setConnect(connFactory.createConnection(userPrincipal, userCredential));
setConnection(connFactory.createConnection(userPrincipal, userCredential));
}
else {
tracer.log(TraceLevel.TRACE, "Create connection with empty credentials"); //$NON-NLS-1$
setConnect(connFactory.createConnection());
setConnection(connFactory.createConnection());
}
getConnect().setExceptionListener (this);
getConnection().setExceptionListener (this);

// Create session from connection; false means
// session is not transacted.

if(isProducer) {
setSession(getConnect().createSession(this.useClientAckMode, Session.AUTO_ACKNOWLEDGE));
setSession(getConnection().createSession(this.useClientAckMode, Session.AUTO_ACKNOWLEDGE));
}
else {
setSession(getConnect().createSession(false, (this.useClientAckMode) ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE));
setSession(getConnection().createSession(false, (this.useClientAckMode) ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE));
}


Expand All @@ -422,14 +422,15 @@ private boolean connect(boolean isProducer) throws JMSException {
getProducerCR().setDeliveryMode(DeliveryMode.PERSISTENT);
// start the connection
tracer.log (LogLevel.INFO, "going to start the connection for producer in client acknowledge mode ..."); //$NON-NLS-1$
getConnect().start();
getConnection().start();
}

// set the delivery mode if it is specified
// default is non-persistent
if (deliveryMode == null) {
getProducer().setDeliveryMode(DeliveryMode.NON_PERSISTENT);
} else {
}
else {
if (deliveryMode.trim().toLowerCase().equals("non_persistent")) { //$NON-NLS-1$
getProducer().setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
Expand All @@ -439,12 +440,13 @@ private boolean connect(boolean isProducer) throws JMSException {
}
}

} else {
}
else {
// Its JMSSource, So we will create a consumer
setConsumer(getSession().createConsumer(dest, messageSelector));
// start the connection
tracer.log (LogLevel.INFO, "going to start consumer connection ..."); //$NON-NLS-1$
getConnect().start();
getConnection().start();
}
tracer.log (LogLevel.INFO, "connection successfully created"); //$NON-NLS-1$
tracer.log(TraceLevel.TRACE, "End connect()"); //$NON-NLS-1$
Expand Down Expand Up @@ -523,7 +525,7 @@ boolean sendMessage(Message message) throws ConnectionException, InterruptedExce

// Recreate the connection objects if we don't have any (this
// could happen after a connection failure)
setConnect(null);
setConnection(null);
createConnection();
}

Expand Down Expand Up @@ -559,7 +561,7 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce
}
// Recreate the connection objects if we don't have any (this
// could happen after a connection failure)
setConnect(null);
setConnection(null);
logger.log(LogLevel.WARN, "ERROR_DURING_RECEIVE", //$NON-NLS-1$
new Object[] { e.toString() });
logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); //$NON-NLS-1$
Expand Down Expand Up @@ -627,7 +629,7 @@ public void recoverSession() throws JMSException, ConnectionException, Interrupt

tracer.log(LogLevel.INFO, "attempting to reconnect"); //$NON-NLS-1$
logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); //$NON-NLS-1$
setConnect(null);
setConnection(null);
createConnection();

synchronized (getSession()) {
Expand Down Expand Up @@ -670,14 +672,14 @@ public void closeConnection() {
// ignore
}
}
if (getConnect() != null) {
if (getConnection() != null) {
try {
getConnect().close();
getConnection().close();
} catch (JMSException e) {
// ignore
}
finally {
setConnect(null);
setConnection(null);
}
}
tracer.log(TraceLevel.TRACE, "End closeConnection()"); //$NON-NLS-1$
Expand Down
Loading

0 comments on commit dbca707

Please sign in to comment.