Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
#97 fixed failover configuration, fixed deleting a closed connection
Signed-off-by: Johannes Schneider <johannes.schneider@bosch-si.com>
  • Loading branch information
Johannes Schneider committed Jan 15, 2018
1 parent 1f3c579 commit 1228e61
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 16 deletions.
Expand Up @@ -67,10 +67,21 @@ private Context createContext(final AmqpConnection amqpConnection) throws Naming
final String protocol = amqpConnection.getProtocol();
final String hostname = amqpConnection.getHostname();
final int port = amqpConnection.getPort();
final boolean failoverEnabled = amqpConnection.isFailoverEnabled();

final String uri = formatUri(protocol, hostname, port);
final String uriWithFailoverEnabled = amqpConnection.isFailoverEnabled() ? wrapWithFailOver(uri) : uri;
final String connectionUri = appendParameters("amqps".equals(protocol), uriWithFailoverEnabled, username, password);
final String uriWithAmqpParams = appendAmqpParameters(uri);
final String uriWithTransportParams = appendTransportParameters(uriWithAmqpParams);

final String connectionUri;
if (failoverEnabled) {
final String uriWrappedWithFailover = wrapWithFailOver(uriWithTransportParams);
final String uriWithJmsParams = appendJmsParametersOverall(uriWrappedWithFailover, username, password);
connectionUri = appendFailoverParameters(uriWithJmsParams);
} else {
final String uriWithJmsParams = appendJmsParameters(uriWithTransportParams, username, password);
connectionUri = appendFailoverParameters(uriWithJmsParams);
}

@SuppressWarnings("squid:S1149") final Hashtable<Object, Object> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Expand All @@ -84,18 +95,41 @@ private static String formatUri(final String protocol, final String hostname, fi
return MessageFormat.format(pattern, protocol, hostname, Integer.toString(port));
}

private static String appendParameters(final boolean useSsl, final String uri, final String username, final String password) {
@SuppressWarnings("squid:S2068")
private static String appendJmsParametersOverall(final String uri, final String username, final String password) {
final String pattern = "{0}" +
"?jms.username={1}" +
"&jms.password={2}";
return MessageFormat.format(pattern, uri, username, password);
}

@SuppressWarnings("squid:S2068") final String pattern;
if (useSsl) {
pattern = "{0}?jms.username={1}&jms.password={2}" +
"&transport.trustAll=true&transport.verifyHost=false&amqp.saslMechanisms=PLAIN";
} else {
pattern = "{0}?jms.username={1}&jms.password={2}&amqp.saslMechanisms=PLAIN";
}
@SuppressWarnings("squid:S2068")
private static String appendJmsParameters(final String uri, final String username, final String password) {
final String pattern = "{0}" +
"&jms.username={1}" +
"&jms.password={2}";
return MessageFormat.format(pattern, uri, username, password);
}

private static String appendAmqpParameters(final String uri) {
return uri + "?amqp.saslMechanisms=PLAIN";
}

private static String appendTransportParameters(final String uri) {
return uri +
"&transport.trustAll=true" +
"&transport.verifyHost=false";
}

private static String appendFailoverParameters(final String uri) {
return uri +
"&initialReconnectDelay=10s" +
"&reconnectDelay=1s" +
"&maxReconnectDelay=1h" +
"&useReconnectBackOff=true" +
"&reconnectBackOffMultiplier=1m";
}

private static String wrapWithFailOver(final String uri) {
return MessageFormat.format("failover:({0})", uri);
}
Expand Down
Expand Up @@ -334,7 +334,8 @@ private boolean startCommandConsumersWithErrorHandling(final String action) {
.description(e.getMessage())
.build();
getSender().tell(error, getSelf());
log.error(e, "Failed to <{}> Connection <{}> with Error: <{}>.", action, amqpConnection.getId(), e.getMessage());
log.error(e, "Failed to <{}> Connection <{}> with Error: <{}>.", action, amqpConnection.getId(),
e.getMessage());
return false;
}
}
Expand All @@ -348,7 +349,8 @@ private boolean stopCommandConsumersWithErrorHandling(final String action) {
.description(e.getMessage())
.build();
getSender().tell(error, getSelf());
log.error(e, "Failed to <{}> Connection <{}> with Error: <{}>.", action, amqpConnection.getId(), e.getMessage());
log.error(e, "Failed to <{}> Connection <{}> with Error: <{}>.", action, amqpConnection.getId(),
e.getMessage());
return false;
}
}
Expand Down Expand Up @@ -430,12 +432,22 @@ private void startConnection() throws JMSException {

private void stopConnection() throws JMSException {
if (jmsSession != null) {
jmsSession.close();
try {
jmsSession.close();
jmsSession = null;
} catch (final JMSException e) {
log.debug("Session of connection '{}' already closed: {}", amqpConnection.getId(), e.getMessage());
}
}
if (jmsConnection != null) {
jmsConnection.stop();
jmsConnection.close();
log.info("Connection '{}' closed.", amqpConnection.getId());
try {
jmsConnection.stop();
jmsConnection.close();
jmsConnection = null;
log.info("Connection '{}' closed.", amqpConnection.getId());
} catch (final JMSException e) {
log.debug("Connection '{}' already closed: {}", amqpConnection.getId(), e.getMessage());
}
}
}

Expand Down

0 comments on commit 1228e61

Please sign in to comment.