Skip to content

Commit

Permalink
FLUME-3237: Handling RuntimeExceptions coming from the JMS provider i…
Browse files Browse the repository at this point in the history
…n JMSSource

Handling RuntimeExceptions in the same way as JMSExceptions in order to trigger
the reconnecting mechanism in JMSSource.

This closes #210

Reviewers: Endre Major, Ferenc Szabo

(Peter Turcsanyi via Ferenc Szabo)
  • Loading branch information
szaboferee committed Jul 11, 2018
1 parent 719afe9 commit a76e2e9
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ class JMSMessageConsumer {
List<Event> take() throws JMSException {
List<Event> result = new ArrayList<Event>(batchSize);
Message message;
message = messageConsumer.receive(pollTimeout);
message = receive();
if (message != null) {
result.addAll(messageConverter.convert(message));
int max = batchSize - 1;
for (int i = 0; i < max; i++) {
message = messageConsumer.receiveNoWait();
message = receiveNoWait();
if (message == null) {
break;
}
Expand All @@ -147,11 +147,35 @@ List<Event> take() throws JMSException {
return result;
}

private Message receive() throws JMSException {
try {
return messageConsumer.receive(pollTimeout);
} catch (RuntimeException runtimeException) {
JMSException jmsException = new JMSException("JMS provider has thrown runtime exception: "
+ runtimeException.getMessage());
jmsException.setLinkedException(runtimeException);
throw jmsException;
}
}

private Message receiveNoWait() throws JMSException {
try {
return messageConsumer.receiveNoWait();
} catch (RuntimeException runtimeException) {
JMSException jmsException = new JMSException("JMS provider has thrown runtime exception: "
+ runtimeException.getMessage());
jmsException.setLinkedException(runtimeException);
throw jmsException;
}
}

void commit() {
try {
session.commit();
} catch (JMSException jmsException) {
logger.warn("JMS Exception processing commit", jmsException);
} catch (RuntimeException runtimeException) {
logger.warn("Runtime Exception processing commit", runtimeException);
}
}

Expand All @@ -160,6 +184,8 @@ void rollback() {
session.rollback();
} catch (JMSException jmsException) {
logger.warn("JMS Exception processing rollback", jmsException);
} catch (RuntimeException runtimeException) {
logger.warn("Runtime Exception processing rollback", runtimeException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,68 @@ public void testCreateDurableSubscription() throws Exception {
verify(session, times(1)).createDurableSubscriber(topic, name, messageSelector, true);
}

@Test(expected = JMSException.class)
public void testTakeFailsDueToJMSExceptionFromReceive() throws JMSException {
when(messageConsumer.receive(anyLong())).thenThrow(new JMSException(""));
consumer = create();

consumer.take();
}

@Test(expected = JMSException.class)
public void testTakeFailsDueToRuntimeExceptionFromReceive() throws JMSException {
when(messageConsumer.receive(anyLong())).thenThrow(new RuntimeException());
consumer = create();

consumer.take();
}

@Test(expected = JMSException.class)
public void testTakeFailsDueToJMSExceptionFromReceiveNoWait() throws JMSException {
when(messageConsumer.receiveNoWait()).thenThrow(new JMSException(""));
consumer = create();

consumer.take();
}

@Test(expected = JMSException.class)
public void testTakeFailsDueToRuntimeExceptionFromReceiveNoWait() throws JMSException {
when(messageConsumer.receiveNoWait()).thenThrow(new RuntimeException());
consumer = create();

consumer.take();
}

@Test
public void testCommitFailsDueToJMSException() throws JMSException {
doThrow(new JMSException("")).when(session).commit();
consumer = create();

consumer.commit();
}

@Test
public void testCommitFailsDueToRuntimeException() throws JMSException {
doThrow(new RuntimeException()).when(session).commit();
consumer = create();

consumer.commit();
}

@Test
public void testRollbackFailsDueToJMSException() throws JMSException {
doThrow(new JMSException("")).when(session).rollback();
consumer = create();

consumer.rollback();
}

@Test
public void testRollbackFailsDueToRuntimeException() throws JMSException {
doThrow(new RuntimeException()).when(session).rollback();
consumer = create();

consumer.rollback();
}

}

0 comments on commit a76e2e9

Please sign in to comment.