Skip to content

Commit

Permalink
Event Hubs RBAC support (#4498)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesBirdsall authored and serkantkaraca committed Jul 23, 2019
1 parent b651d78 commit c876a6e
Show file tree
Hide file tree
Showing 69 changed files with 1,814 additions and 584 deletions.
10 changes: 8 additions & 2 deletions sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.1</version>
<version>3.0.0</version>
<relativePath>../pom.data.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>2.5.1</version>
<version>3.0.0</version>

<name>Microsoft Azure SDK for Event Hubs Event Processor Host(EPH)</name>
<description>EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer</description>
Expand Down Expand Up @@ -45,6 +45,12 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>msal4j</artifactId>
<version>0.4.0-preview</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.gson.Gson;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageExtendedErrorInformation;
Expand Down Expand Up @@ -46,6 +47,7 @@ class AzureStorageCheckpointLeaseManager implements ICheckpointManager, ILeaseMa
private static final String METADATA_OWNER_NAME = "OWNINGHOST";

private final String storageConnectionString;
private final StorageCredentials storageCredentials;
private final String storageBlobPrefix;
private final BlobRequestOptions leaseOperationOptions = new BlobRequestOptions();
private final BlobRequestOptions checkpointOperationOptions = new BlobRequestOptions();
Expand All @@ -59,16 +61,30 @@ class AzureStorageCheckpointLeaseManager implements ICheckpointManager, ILeaseMa

private Hashtable<String, Checkpoint> latestCheckpoint = new Hashtable<String, Checkpoint>();

AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName) {
this(storageConnectionString, storageContainerName, "");
}

AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName, String storageBlobPrefix) {
if ((storageConnectionString == null) || storageConnectionString.trim().isEmpty()) {
throw new IllegalArgumentException("Provide valid Azure Storage connection string when using Azure Storage");
}
this.storageConnectionString = storageConnectionString;
this.storageCredentials = null;

if ((storageContainerName != null) && storageContainerName.trim().isEmpty()) {
throw new IllegalArgumentException("Azure Storage container name must be a valid container name or null to use the default");
}
this.storageContainerName = storageContainerName;

// Convert all-whitespace prefix to empty string. Convert null prefix to empty string.
// Then the rest of the code only has one case to worry about.
this.storageBlobPrefix = (storageBlobPrefix != null) ? storageBlobPrefix.trim() : "";
}

AzureStorageCheckpointLeaseManager(StorageCredentials storageCredentials, String storageContainerName, String storageBlobPrefix) {
if (storageCredentials == null) {
throw new IllegalArgumentException("Provide valid Azure Storage credentials when using Azure Storage");
}
this.storageConnectionString = null;
this.storageCredentials = storageCredentials;

if ((storageContainerName != null) && storageContainerName.trim().isEmpty()) {
throw new IllegalArgumentException("Azure Storage container name must be a valid container name or null to use the default");
}
Expand Down Expand Up @@ -102,7 +118,13 @@ void initialize(HostContext hostContext) throws InvalidKeyException, URISyntaxEx
+ "Must be from 3 to 63 characters long.");
}

this.storageClient = CloudStorageAccount.parse(this.storageConnectionString).createCloudBlobClient();
CloudStorageAccount storageAccount = null;
if (this.storageConnectionString != null) {
storageAccount = CloudStorageAccount.parse(this.storageConnectionString);
} else {
storageAccount = new CloudStorageAccount(this.storageCredentials);
}
this.storageClient = storageAccount.createCloudBlobClient();

this.eventHubContainer = this.storageClient.getContainerReference(this.storageContainerName);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.azure.eventprocessorhost;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

import com.microsoft.azure.eventhubs.AzureActiveDirectoryTokenProvider;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubClientOptions;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.ITokenProvider;
import com.microsoft.azure.eventhubs.RetryPolicy;

abstract class EventHubClientFactory {
protected ScheduledExecutorService executor;

protected final EventHubClientOptions options;

public EventHubClientFactory(final RetryPolicy retryPolicy) {
this((new EventHubClientOptions()).setRetryPolicy(retryPolicy));
}

public EventHubClientFactory(final EventHubClientOptions options) {
this.options = options;
}

public void setExecutor(ScheduledExecutorService executor) {
this.executor = executor;
}

abstract CompletableFuture<EventHubClient> createEventHubClient() throws EventHubException, IOException;

static class EHCFWithConnectionString extends EventHubClientFactory {
private final String eventHubConnectionString;

public EHCFWithConnectionString(final String eventHubConnectionString,
final RetryPolicy retryPolicy) {
super(retryPolicy);
this.eventHubConnectionString = eventHubConnectionString;
}

public CompletableFuture<EventHubClient> createEventHubClient() throws EventHubException, IOException {
return EventHubClient.createFromConnectionString(this.eventHubConnectionString, this.options.getRetryPolicy(), this.executor);
}
}

static class EHCFWithAuthCallback extends EventHubClientFactory {
private final URI endpoint;
private final String eventHubPath;
private final AzureActiveDirectoryTokenProvider.AuthenticationCallback authCallback;
private final String authority;

public EHCFWithAuthCallback(final URI endpoint,
final String eventHubPath,
final AzureActiveDirectoryTokenProvider.AuthenticationCallback authCallback,
final String authority,
final EventHubClientOptions options) {
super(options);
this.endpoint = endpoint;
this.eventHubPath = eventHubPath;
this.authCallback = authCallback;
this.authority = authority;
}

public CompletableFuture<EventHubClient> createEventHubClient() throws EventHubException, IOException {
return EventHubClient.createWithAzureActiveDirectory(this.endpoint,
this.eventHubPath, this.authCallback, this.authority, this.executor, this.options);
}
}

static class EHCFWithTokenProvider extends EventHubClientFactory {
private final URI endpoint;
private final String eventHubPath;
private final ITokenProvider tokenProvider;

public EHCFWithTokenProvider(final URI endpoint,
final String eventHubPath,
final ITokenProvider tokenProvider,
final EventHubClientOptions options) {
super(options);
this.endpoint = endpoint;
this.eventHubPath = eventHubPath;
this.tokenProvider = tokenProvider;
}

public CompletableFuture<EventHubClient> createEventHubClient() throws EventHubException, IOException {
return EventHubClient.createWithTokenProvider(this.endpoint, this.eventHubPath, this.tokenProvider, this.executor, this.options);
}
}
}
Loading

0 comments on commit c876a6e

Please sign in to comment.