Skip to content

Commit

Permalink
- Fix parameter ReceiverOptions parameters usaged (#1670)
Browse files Browse the repository at this point in the history
- Change powermock parameters
  • Loading branch information
ehoner committed Jun 14, 2023
1 parent 9111bb7 commit 03d70a5
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,10 @@ private synchronized void initializeEventHubsManagers() {
// If no such offset exists Eventhub will return an error.
ReceiverOptions receiverOptions = new ReceiverOptions();
receiverOptions.setPrefetchCount(prefetchCount);
EventPosition position = EventPosition.fromOffset(offset, /* inclusiveFlag */false);
receiver = eventHubClientManager.getEventHubClient()
.createReceiver(consumerGroup, partitionId.toString(),
EventPosition.fromOffset(offset, /* inclusiveFlag */false)).get(DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
.createReceiver(consumerGroup, partitionId.toString(), position, receiverOptions)
.get(DEFAULT_EVENTHUB_CREATE_RECEIVER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
}

PartitionReceiveHandler handler =
Expand Down Expand Up @@ -378,9 +379,9 @@ private void renewPartitionReceiver(SystemStreamPartition ssp) {
// Recreate receiver
ReceiverOptions receiverOptions = new ReceiverOptions();
receiverOptions.setPrefetchCount(prefetchCount);
EventPosition position = EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM));
PartitionReceiver receiver = eventHubClientManager.getEventHubClient()
.createReceiverSync(consumerGroup, partitionId.toString(),
EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM)), receiverOptions);
.createReceiverSync(consumerGroup, partitionId.toString(), position, receiverOptions);

// Timeout for EventHubClient receive
receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private class MockEventHubClientManager implements EventHubClientManager {
return CompletableFuture.completedFuture(mockPartitionReceiver);
});

PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject()))
PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject(), anyObject()))
.then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> {
String partitionId = invocationOnMock.getArgumentAt(1, String.class);
EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
Expand Down

0 comments on commit 03d70a5

Please sign in to comment.