Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure: Add FileIO that supports ADLSv2 storage #8303

Merged
merged 37 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6d965d2
Azure: add support for ADLSv2 for storage
bryanck Aug 12, 2023
8c59928
typo fix
bryanck Aug 13, 2023
e76a1a5
update labeler
bryanck Aug 13, 2023
2576250
Simpler URI parsing
bryanck Aug 13, 2023
e2fe211
PR feedback
bryanck Aug 14, 2023
42debe3
allow any domain name in URI
bryanck Aug 14, 2023
f5a67c8
Merge branch 'master' into azure-support
bryanck Aug 14, 2023
6b53fbb
allow optional container in URI
bryanck Aug 14, 2023
571b562
include notice from dependencies
bryanck Aug 14, 2023
bc36c1d
notice dedupe
bryanck Aug 14, 2023
f7b2221
simpler naming and oauth creds
bryanck Aug 15, 2023
f86811b
naming
bryanck Aug 15, 2023
dd066b9
simpler auth
bryanck Aug 15, 2023
c8a4b5a
support sas token
bryanck Aug 15, 2023
93a00c3
per-account sas token
bryanck Aug 15, 2023
9c5c73e
Merge remote-tracking branch 'upstream/master' into azure-support
bryanck Aug 15, 2023
852bbf0
fix runtime shading
bryanck Aug 16, 2023
0d9b7f0
prefix and bulk operations
bryanck Aug 17, 2023
6764f70
checkstyle
bryanck Aug 17, 2023
902a39b
simpler batch delete
bryanck Aug 17, 2023
3de84de
Merge remote-tracking branch 'upstream/master' into azure-support
bryanck Aug 17, 2023
c72f3ac
add connection string property
bryanck Aug 18, 2023
339f066
per account connection string
bryanck Aug 19, 2023
29d5700
more tests
bryanck Aug 19, 2023
60ad696
use Azurite where possible
bryanck Aug 19, 2023
e048e2f
Merge branch 'master' into azure-support
bryanck Aug 19, 2023
4da61a3
move shade fix to separate PR
bryanck Aug 20, 2023
ed7c4d4
add azure dependency to hive runtime
bryanck Aug 20, 2023
7450fca
skip stack trace log for missing Hadoop
bryanck Aug 20, 2023
217a5b6
Merge remote-tracking branch 'upstream/master' into azure-support
bryanck Aug 21, 2023
3d4cedc
PR feedback
bryanck Aug 22, 2023
3efb647
catch specific exception
bryanck Aug 22, 2023
f19d5af
null check
bryanck Aug 22, 2023
de97b78
handle missing path in prefix ops
bryanck Aug 23, 2023
9f02e72
PR feedback
bryanck Aug 23, 2023
0ea6e05
Improved URI regex
bryanck Aug 25, 2023
e758e08
Merge branch 'master' into azure-support
Fokko Aug 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,20 @@ PIG:
- pig/**/*
AWS:
- aws/**/*
- aws-bundle/**/*
NESSIE:
- nessie/**/*
ALIYUN:
- aliyun/**/*
GCP:
- gcp/**/*
- gcp-bundle/**/*
DELL:
- dell/**/*
SNOWFLAKE:
- snowflake/**/*
OPENAPI:
- open-api/**/*
AZURE:
- azure/**/*
- azure-bundle/**/*
519 changes: 519 additions & 0 deletions azure-bundle/LICENSE

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions azure-bundle/NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

Apache Iceberg
Copyright 2017-2023 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
50 changes: 50 additions & 0 deletions azure-bundle/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

project(":iceberg-azure-bundle") {

apply plugin: 'com.github.johnrengelman.shadow'

tasks.jar.dependsOn tasks.shadowJar

dependencies {
implementation platform(libs.azuresdk.bom)
implementation "com.azure:azure-storage-file-datalake"
implementation "com.azure:azure-identity"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other integrations, we don't bundle the dependencies and only ship the Iceberg side. That keeps our bundle small and doesn't force any particular version on downstream consumers. It also avoids needing to do a lot of license and notice documentation work. Is that possible here? Is there a dependency bundle that we can use at runtime?

Copy link
Contributor Author

@bryanck bryanck Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The azure-bundle project build was set up for bundling all of the necessary Azure dependencies in one shadow jar as an (optional) convenience to users, similar to the aws-bundle and gcp-bundle projects, which include only the necessary runtime libraries at the same version used for the Iceberg build and shades conflicting libraries. A user can opt to include their own Azure dependencies if desired and not use this at all. For example, all you would need to run with Spark is the Spark runtime and Azure/AWS/GCP bundle. Neither Microsoft nor Google provide such a bundle. Amazon has one for AWS but it is very large, which causes issues with some systems.

This is separate from the azure project build which declares the Azure dependencies as compileOnly so they are not included with any runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see. I didn't know about the other bundle projects. Looks like the LICENSE file is updated for those, but not the NOTICE. Did you check whether each bundled project has a NOTICE that we need to include?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not, I'll do that now for all three.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this. I'll open a separate PR for the AWS and GCP bundles.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR for the AWS and GCP bundles is here: #8323

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It's amazing that we can automate this now. It was such a giant pain to do this in the past!

}

shadowJar {
archiveClassifier.set(null)
zip64 true

// include the LICENSE and NOTICE files for the shaded Jar
from(projectDir) {
include 'LICENSE'
include 'NOTICE'
}

// relocate Azure-specific versions
relocate 'io.netty', 'org.apache.iceberg.azure.shaded.io.netty'
relocate 'com.fasterxml.jackson', 'org.apache.iceberg.azure.shaded.com.fasterxml.jackson'
}

jar {
enabled = false
}
}
59 changes: 59 additions & 0 deletions azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.azure;

import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.file.datalake.DataLakePathClientBuilder;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;

public class AzureProperties implements Serializable {
public static final String ADLSV2_READ_BLOCK_SIZE = "adlsv2.read.block-size-bytes";
public static final String ADLSV2_WRITE_BLOCK_SIZE = "adlsv2.write.block-size-bytes";
private static final DefaultAzureCredential DEFAULT_CREDENTIAL =
new DefaultAzureCredentialBuilder().build();

private Integer adlsv2ReadBlockSize;
private Long adlsv2WriteBlockSize;

public AzureProperties() {}

public AzureProperties(Map<String, String> properties) {
if (properties.containsKey(ADLSV2_READ_BLOCK_SIZE)) {
adlsv2ReadBlockSize = Integer.parseInt(properties.get(ADLSV2_READ_BLOCK_SIZE));
}
if (properties.containsKey(ADLSV2_WRITE_BLOCK_SIZE)) {
adlsv2WriteBlockSize = Long.parseLong(properties.get(ADLSV2_WRITE_BLOCK_SIZE));
}
}

public Optional<Integer> adlsv2ReadBlockSize() {
return Optional.ofNullable(adlsv2ReadBlockSize);
}

public Optional<Long> adlsv2WriteBlockSize() {
return Optional.ofNullable(adlsv2WriteBlockSize);
}

public <T extends DataLakePathClientBuilder> void applyCredentialConfiguration(T builder) {
builder.credential(DEFAULT_CREDENTIAL);
}
}
132 changes: 132 additions & 0 deletions azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSv2FileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.azure.adlsv2;

import com.azure.core.http.HttpClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakePathClientBuilder;
import java.util.Map;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.util.SerializableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* FileIO implementation backed by Azure Data Lake Storage v2 (ADLSv2)
*
* <p>Locations follow the conventions used by Hadoop's Azure support, i.e.
*
* <pre>{@code abfs[s]://<container>@<storage account>.dfs.core.windows.net/<blob_path>}</pre>
*
* <p>See <a href="https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html">Hadoop Azure
* Support</a>
*/
public class ADLSv2FileIO implements FileIO {
private static final Logger LOG = LoggerFactory.getLogger(ADLSv2FileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";

private static final HttpClient HTTP = HttpClient.createDefault();

private AzureProperties azureProperties;
private MetricsContext metrics = MetricsContext.nullMetrics();
private SerializableMap<String, String> properties = null;

/**
* No-arg constructor to load the FileIO dynamically.
*
* <p>All fields are initialized by calling {@link ADLSv2FileIO#initialize(Map)} later.
*/
public ADLSv2FileIO() {}

@Override
public InputFile newInputFile(String path) {
return ADLSv2InputFile.of(path, client(path), azureProperties, metrics);
}

@Override
public InputFile newInputFile(String path, long length) {
return ADLSv2InputFile.of(path, length, client(path), azureProperties, metrics);
}

@Override
public OutputFile newOutputFile(String path) {
return ADLSv2OutputFile.of(path, client(path), azureProperties, metrics);
}

@Override
public void deleteFile(String path) {
// There is no specific contract about whether delete should fail
// and other FileIO providers ignore failure. Log the failure for
// now as it is not a required operation for Iceberg.
try {
client(path).delete();
} catch (Exception e) {
LOG.warn("Failed to delete path: {}", path, e);
}
}

@Override
public Map<String, String> properties() {
return properties.immutableMap();
}

@VisibleForTesting
DataLakeFileClient client(String path) {
ADLSv2Location location = new ADLSv2Location(path);
DataLakePathClientBuilder clientBuilder =
new DataLakePathClientBuilder()
.httpClient(HTTP)
.endpoint(location.storageAccountUrl())
.fileSystemName(location.container())
.pathName(location.path());

azureProperties.applyCredentialConfiguration(clientBuilder);

return clientBuilder.buildFileClient();
}

@Override
public void initialize(Map<String, String> props) {
this.properties = SerializableMap.copyOf(props);
this.azureProperties = new AzureProperties(properties);

// Report Hadoop metrics if Hadoop is available
try {
DynConstructors.Ctor<MetricsContext> ctor =
DynConstructors.builder(MetricsContext.class)
.hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
.buildChecked();
MetricsContext context = ctor.newInstance("abfs");
context.initialize(properties);
this.metrics = context;
} catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException e) {
LOG.warn(
"Unable to load metrics class: '{}', falling back to null metrics",
DEFAULT_METRICS_IMPL,
e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.azure.adlsv2;

import com.azure.storage.file.datalake.DataLakeFileClient;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;

class ADLSv2InputFile extends BaseADLSv2File implements InputFile {
private Long fileSize;

static ADLSv2InputFile of(
String location,
DataLakeFileClient fileClient,
AzureProperties azureProperties,
MetricsContext metrics) {
return new ADLSv2InputFile(location, null, fileClient, azureProperties, metrics);
}

static ADLSv2InputFile of(
String location,
long length,
DataLakeFileClient fileClient,
AzureProperties azureProperties,
MetricsContext metrics) {
return new ADLSv2InputFile(
location, length > 0 ? length : null, fileClient, azureProperties, metrics);
}

ADLSv2InputFile(
String location,
Long fileSize,
DataLakeFileClient fileClient,
AzureProperties azureProperties,
MetricsContext metrics) {
super(location, fileClient, azureProperties, metrics);
this.fileSize = fileSize;
}

@Override
public long getLength() {
if (fileSize == null) {
this.fileSize = fileClient().getProperties().getFileSize();
}
return fileSize;
}

@Override
public SeekableInputStream newStream() {
return new ADLSv2InputStream(fileClient(), fileSize, azureProperties(), metrics());
}
}