Skip to content

Commit

Permalink
Azure multi read options (#15630)
Browse files Browse the repository at this point in the history
* Include new dependencies

* Mostly implemented

* More azure fixes

* Tests passing

* Unit tests running

* Test running after removing storage exception

* Happy with coverage now

* Add more tests

* fix client factory

* cleanup from testing

* Remove old client

* update docs

* Exclude from spellcheck

* Add licenses

* Fix identity version

* Save work

* Add azure clients

* add licenses

* typos

* Add dependencies

* Exception is not thrown

* Fix intellij check

* Don't need to override

* specify length

* urldecode

* encode path

* Fix checks

* Revert urlencode changes

* Urlencode with azure library

* Update docs/development/extensions-core/azure.md

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>

* PR changes

* Update docs/development/extensions-core/azure.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Add config for multiple storage accounts

* Deprecate AzureTaskLogsConfig.maxRetries

* Clean up azure retry block

* logic update to reuse clients

* fix comments

* Create container conditionally

* Fix key auth

* save work

* Fix unit tests

* Revert old azure input type

* Separate input source

* save work

* Add support for app registrations

* Fix unit tests

* clean up spacing

* Add coverage

* fixes from testing

* cleanup some caching behavior

* Add docs

* Fix spelling issues

* fix more spelling errors'

* Fix intellij inspections

* add simple changes from pr

* save work on fixing bug

* Fix unit tests

* Add more testing

* Fix unit test

* Add tests

* Add annotation for azureStorage

* Fix up docs

* Add comment for list method

* Fix tests

* Remove uneeded toString

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* PR changes

* fix injection of StorageConnector

* Fix checkstyle

* clean up unit tests

* More pr fixes

---------

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>
  • Loading branch information
3 people committed Jan 25, 2024
1 parent 867c636 commit 3e51224
Show file tree
Hide file tree
Showing 37 changed files with 1,627 additions and 129 deletions.
110 changes: 109 additions & 1 deletion docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,112 @@ Google Cloud Storage object:
The Azure input source reads objects directly from Azure Blob store or Azure Data Lake sources. You can
specify objects as a list of file URI strings or prefixes. You can split the Azure input source for use with [Parallel task](./native-batch.md) indexing and each worker task reads one chunk of the split data.


:::info
The old `azure` schema is deprecated. Update your specs to use the `azureStorage` schema described below instead.
:::

Sample specs:

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azureStorage",
"objectGlob": "**.json",
"uris": ["azureStorage://storageAccount/container/prefix1/file.json", "azureStorage://storageAccount/container/prefix2/file2.json"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azureStorage",
"objectGlob": "**.parquet",
"prefixes": ["azureStorage://storageAccount/container/prefix1/", "azureStorage://storageAccount/container/prefix2/"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```


```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azureStorage",
"objectGlob": "**.json",
"objects": [
{ "bucket": "storageAccount", "path": "container/prefix1/file1.json"},
{ "bucket": "storageAccount", "path": "container/prefix2/file2.json"}
],
"properties": {
"sharedAccessStorageToken": "?sv=...<storage token secret>...",
}
},
"inputFormat": {
"type": "json"
},
...
},
...
```

|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `azureStorage`.|None|yes|
|uris|JSON array of URIs where the Azure objects to be ingested are located. Use this format: `azureStorage://STORAGE_ACCOUNT/CONTAINER/PATH_TO_FILE`|None|One of the following must be set:`uris`, `prefixes`, or `objects`.|
|prefixes|JSON array of URI prefixes for the locations of Azure objects to ingest. Use this format`azureStorage://STORAGE_ACCOUNT/CONTAINER/PREFIX`. Empty objects starting with any of the given prefixes are skipped.|None|One of the following must be set:`uris`, `prefixes`, or `objects`.|
|objects|JSON array of Azure objects to ingest.|None|One of the following must be set:`uris`, `prefixes`, or `objects`.|
|objectGlob|A glob for the object part of the Azure URI. In the URI `azureStorage://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br />The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `azureStorage://foo/bar/file.json` because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br />For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (Azure blob URI starting with `azureStorage://`), `__file_bucket` (Azure bucket), and `__file_path` (Azure object path).|None|no|
|properties|Properties object for overriding the default Azure configuration. See below for more information.|None|No (defaults will be used if not given)

Note that the Azure input source skips all empty objects only when `prefixes` is specified.

The `objects` property can one of the following:

|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|bucket|Name of the Azure Blob Storage or Azure Data Lake storage account|None|yes|
|path|The container and path where data is located.|None|yes|


The `properties` property can be one of the following:

- `sharedAccessStorageToken`
- `key`
- `appRegistrationClientId`, `appRegistrationClientSecret`, and `tenantId`
- empty


|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|sharedAccessStorageToken|The plain text string of this Azure Blob Storage Shared Access Token|None|No|
|key|The root key of Azure Blob Storage Account|None|no|
|appRegistrationClientId|The client ID of the Azure App registration to authenticate as|None|No|
|appRegistrationClientSecret|The client secret of the Azure App registration to authenticate as|None|Yes if `appRegistrationClientId` is provided|
|tenantId|The tenant ID of the Azure App registration to authenticate as|None|Yes if `appRegistrationClientId` is provided|

<details closed>
<summary>Show the deprecated 'azure' input source</summary>

Note that the deprecated `azure` input source doesn't support specifying which storage account to ingest from. We recommend using the `azureStorage` instead.

Sample specs:

```json
Expand Down Expand Up @@ -372,7 +478,7 @@ Sample specs:
|uris|JSON array of URIs where the Azure objects to be ingested are located, in the form `azure://<container>/<path-to-file>`|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Azure objects to ingest, in the form `azure://<container>/<prefix>`. Empty objects starting with one of the given prefixes are skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or `objects` must be set|
|objectGlob|A glob for the object part of the S3 URI. In the URI `s3://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br />The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `s3://foo/bar/file.json`, because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br />For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no|
|objectGlob|A glob for the object part of the Azure URI. In the URI `azure://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br />The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `azure://foo/bar/file.json`, because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br />For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (Azure blob URI starting with `azure://`), `__file_bucket` (Azure bucket), and `__file_path` (Azure object path).|None|no|

Note that the Azure input source skips all empty objects only when `prefixes` is specified.
Expand All @@ -384,6 +490,8 @@ The `objects` property is:
|bucket|Name of the Azure Blob Storage or Azure Data Lake container|None|yes|
|path|The path where data is located.|None|yes|

</details>

## HDFS input source

:::info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import com.google.inject.assistedinject.AssistedInject;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.storage.azure.AzureByteSource;
import org.apache.druid.storage.azure.AzureByteSourceFactory;
import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.AzureUtils;

import javax.annotation.Nonnull;
Expand All @@ -40,21 +42,32 @@ public class AzureEntity extends RetryingInputEntity
{
private final CloudObjectLocation location;
private final AzureByteSource byteSource;
private final String scheme;

@AssistedInject
AzureEntity(
@Nonnull @Assisted CloudObjectLocation location,
@Nonnull @Assisted AzureStorage azureStorage,
@Nonnull @Assisted String scheme,
@Nonnull AzureByteSourceFactory byteSourceFactory

)
{
this.location = location;
this.byteSource = byteSourceFactory.create(location.getBucket(), location.getPath());
this.scheme = scheme;
// If scheme is azureStorage, containerName is the first prefix in the path, otherwise containerName is the bucket
if (AzureStorageAccountInputSource.SCHEME.equals(this.scheme)) {
Pair<String, String> locationInfo = AzureStorageAccountInputSource.getContainerAndPathFromObjectLocation(location);
this.byteSource = byteSourceFactory.create(locationInfo.lhs, locationInfo.rhs, azureStorage);
} else {
this.byteSource = byteSourceFactory.create(location.getBucket(), location.getPath(), azureStorage);
}
}

@Override
public URI getUri()
{
return location.toUri(AzureInputSource.SCHEME);
return location.toUri(this.scheme);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
package org.apache.druid.data.input.azure;

import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.storage.azure.AzureStorage;

/**
* Factory for creating {@link AzureEntity} objects
*/
public interface AzureEntityFactory
{
AzureEntity create(CloudObjectLocation location);
AzureEntity create(
CloudObjectLocation location,
AzureStorage azureStorage,
String scheme
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage;
Expand Down Expand Up @@ -63,7 +64,7 @@ public class AzureInputSource extends CloudObjectInputSource

@JsonCreator
public AzureInputSource(
@JacksonInject AzureStorage storage,
@JacksonInject @Global AzureStorage storage,
@JacksonInject AzureEntityFactory entityFactory,
@JacksonInject AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
@JacksonInject AzureInputDataConfig inputDataConfig,
Expand Down Expand Up @@ -128,7 +129,7 @@ public Object getSystemFieldValue(InputEntity entity, SystemField field)
@Override
protected AzureEntity createEntity(CloudObjectLocation location)
{
return entityFactory.create(location);
return entityFactory.create(location, storage, SCHEME);
}

@Override
Expand All @@ -140,7 +141,7 @@ class SplitWidget implements CloudObjectSplitWidget
public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
{
return Iterators.transform(
azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength()).iterator(),
azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength(), storage).iterator(),
blob -> {
try {
return new LocationWithSize(
Expand Down
Loading

0 comments on commit 3e51224

Please sign in to comment.