Skip to content

Commit

Permalink
Fix ExceptionContract when CreateLink fails in CBSlink.sendToken step (
Browse files Browse the repository at this point in the history
  • Loading branch information
SreeramGarlapati committed Oct 5, 2017
1 parent 95f7da9 commit 4e5b2a3
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
import java.util.Locale;
import java.util.Map;


import com.microsoft.azure.eventhubs.amqp.AmqpException;
import com.microsoft.azure.eventhubs.amqp.DispatchHandler;
import com.microsoft.azure.eventhubs.amqp.IAmqpReceiver;
import com.microsoft.azure.eventhubs.amqp.IOperationResult;
import com.microsoft.azure.eventhubs.amqp.ReceiveLinkHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,11 +45,6 @@
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;

import com.microsoft.azure.eventhubs.amqp.DispatchHandler;
import com.microsoft.azure.eventhubs.amqp.IAmqpReceiver;
import com.microsoft.azure.eventhubs.amqp.IOperationResult;
import com.microsoft.azure.eventhubs.amqp.ReceiveLinkHandler;

/**
* Common Receiver that abstracts all amqp related details
* translates event-driven reactor model into async receive Api
Expand Down Expand Up @@ -491,7 +493,18 @@ public void onComplete(Void result) {

@Override
public void onError(Exception error) {
MessageReceiver.this.onError(error);
final Exception completionException;
if (error!= null && error instanceof AmqpException) {
completionException = ExceptionUtil.toException(((AmqpException) error).getError());
if (completionException != error && completionException.getCause() == null) {
completionException.initCause(error);
}
}
else {
completionException = error;
}

MessageReceiver.this.onError(completionException);
}
});
} catch (IOException | NoSuchAlgorithmException | InvalidKeyException | RuntimeException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

import com.microsoft.azure.eventhubs.amqp.AmqpConstants;
import com.microsoft.azure.eventhubs.amqp.AmqpException;
import com.microsoft.azure.eventhubs.amqp.AmqpUtil;
import com.microsoft.azure.eventhubs.amqp.DispatchHandler;
import com.microsoft.azure.eventhubs.amqp.IAmqpSender;
import com.microsoft.azure.eventhubs.amqp.IOperationResult;
import com.microsoft.azure.eventhubs.amqp.SendLinkHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,13 +56,6 @@
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.message.Message;

import com.microsoft.azure.eventhubs.amqp.AmqpConstants;
import com.microsoft.azure.eventhubs.amqp.AmqpUtil;
import com.microsoft.azure.eventhubs.amqp.DispatchHandler;
import com.microsoft.azure.eventhubs.amqp.IAmqpSender;
import com.microsoft.azure.eventhubs.amqp.IOperationResult;
import com.microsoft.azure.eventhubs.amqp.SendLinkHandler;

/**
* Abstracts all amqp related details
* translates event-driven reactor model into async send Api
Expand Down Expand Up @@ -599,7 +600,18 @@ public void onComplete(Void result) {

@Override
public void onError(Exception error) {
MessageSender.this.onError(error);
final Exception completionException;
if (error!= null && error instanceof AmqpException) {
completionException = ExceptionUtil.toException(((AmqpException) error).getError());
if (completionException != error && completionException.getCause() == null) {
completionException.initCause(error);
}
}
else {
completionException = error;
}

MessageSender.this.onError(completionException);
}
});
} catch (IOException | NoSuchAlgorithmException | InvalidKeyException | RuntimeException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.microsoft.azure.eventhubs.exceptioncontracts;

import java.time.Duration;
import java.util.UUID;

import com.microsoft.azure.eventhubs.*;
import org.junit.After;
Expand Down Expand Up @@ -136,6 +137,34 @@ public void testUnAuthorizedAccessReceiverCreation() throws Throwable
ehClient = EventHubClient.createFromConnectionStringSync(connectionString.toString());
ehClient.createReceiverSync(TestContext.getConsumerGroupName(), PARTITION_ID, PartitionReceiver.START_OF_STREAM);
}

@Test (expected = IllegalEntityException.class)
public void testSendToNonExistantEventHub() throws Throwable
{
final ConnectionStringBuilder correctConnectionString = TestContext.getConnectionString();
final ConnectionStringBuilder connectionString = new ConnectionStringBuilder(
correctConnectionString.getEndpoint(),
"non-existant-entity" + UUID.randomUUID().toString(),
correctConnectionString.getSasKeyName(),
correctConnectionString.getSasKey());

ehClient = EventHubClient.createFromConnectionStringSync(connectionString.toString());
ehClient.sendSync(new EventData("test string".getBytes()));
}

@Test (expected = IllegalEntityException.class)
public void testReceiveFromNonExistantEventHub() throws Throwable
{
final ConnectionStringBuilder correctConnectionString = TestContext.getConnectionString();
final ConnectionStringBuilder connectionString = new ConnectionStringBuilder(
correctConnectionString.getEndpoint(),
"non-existant-entity" + UUID.randomUUID().toString(),
correctConnectionString.getSasKeyName(),
correctConnectionString.getSasKey());

ehClient = EventHubClient.createFromConnectionStringSync(connectionString.toString());
ehClient.createReceiverSync(TestContext.getConsumerGroupName(), PARTITION_ID, PartitionReceiver.START_OF_STREAM);
}

@After
public void cleanup() throws EventHubException
Expand Down

0 comments on commit 4e5b2a3

Please sign in to comment.