-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add FileIO implementation for Azure Blob Storage #4465
Conversation
.baseline/checkstyle/checkstyle.xml
Outdated
@@ -122,7 +122,8 @@ | |||
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*, | |||
org.apache.spark.sql.functions.*, | |||
org.apache.spark.sql.connector.iceberg.write.RowLevelOperation.Command.*, | |||
org.junit.Assert.*"/> | |||
org.junit.Assert.*, | |||
org.assertj.core.api.Assertions.*"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Ryan - without the above change, the checkstyleIntegration
task fails with the following error:
[ant:checkstyle] [ERROR] /<redacted>/upstream-iceberg/azure/src/integration/java/org/apache/iceberg/azure/blob/TestAzureBlobOutputStream.java:38:46: Using a static member import should be avoided - org.assertj.core.api.Assertions.assertThat. [AvoidStaticImport]
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':iceberg-azure:checkstyleIntegration'.
> Checkstyle rule violations were found. See the report at: file:///<redacted>/upstream-iceberg/azure/build/reports/checkstyle/integration.html
Checkstyle files with violations: 5
Checkstyle violations by severity: [error:12]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you shouldn't use a static import for those methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack,
will make the necessary modifications.
build.gradle
Outdated
dependencies { | ||
api project(':iceberg-api') | ||
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') | ||
implementation platform('com.azure:azure-sdk-bom') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather track dependency versions specifically rather than relying on some external BOM. BOMs are fine for end uses, but libraries should generally not delegate dependency versions to other projects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
Will switch to direct dependencies.
Note: Currently GCP also relies on bom. We should also make a change in the GCP module to avoid using bom.
Line 375 in 7c2ea01
implementation platform('com.google.cloud:libraries-bom') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If required, I can file another PR to eliminate the usage of bom in the GCP module.
@@ -38,6 +42,9 @@ buildscript { | |||
|
|||
plugins { | |||
id 'nebula.dependency-recommender' version '9.0.2' | |||
// Since 7.x gradle-docker-plugin is compiled using JDK11, thus using the latest version will fail Java8 builds | |||
// https://bmuschko.github.io/gradle-docker-plugin/current/user-guide/#change_log |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesn't make it clear why this plugin is used. Can you explain in more detail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
try { | ||
uri = new URI(location); | ||
} catch (URISyntaxException e) { | ||
throw new ValidationException("Invalid Azure URI: %s.", location); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for end punctuation in logs, and in fact in cases like this it is misleading because .
could be interpreted as part of the URI. Can you remove end punctuation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
Preconditions.checkNotNull(location, "Location cannot be null."); | ||
final URI uri; | ||
try { | ||
uri = new URI(location); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally discourage the use of URI
because it handles URI encoding in strange ways. I think it is a best practice to ignore the URI and parse manually using split
and delimiters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
} | ||
this.location = location; | ||
|
||
ValidationException.check( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ValidationException
is not a substitute for IllegalArgumentException
. ValidationException
indicates that while an argument may be valid, it is inconsistent with other arguments or config.
For example, when creating a partition spec, "column"
is a valid column reference, but null
would result in an IllegalArgumentException
. But, if "column"
is not a defined name in the schema then a ValidationException
is thrown because you can't partition by an unknown column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class AzureBlobClientFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be package-private? I don't see a reason why people would need to use it directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along with this, I've also made BaseAzureBlobFile
package-private.
|
||
public static BlobClient createBlobClient(AzureURI azureURI, AzureProperties azureProperties) { | ||
final String storageAccount = azureURI.storageAccount(); | ||
final BlobClientBuilder builder = new BlobClientBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Iceberg, we don't use final
for local variables. Recent JVM versions (8+) handle this without problems and it is also not very valuable because it isn't actually translated into bytecode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, I'll remove it for the local variables.
LOG.debug("Using {} endpoint for {}", endpoint, storageAccount); | ||
builder.endpoint(endpoint); | ||
final AuthType authType = azureProperties.authType(storageAccount); | ||
setAuth(storageAccount, authType, azureProperties, builder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is setAuth
not required when using a connection string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general form of a connection string is as follows:
DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;
AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;
BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;
TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
Thus the connection string contains all the necessary auth-related and storage account endpoints information required for establishing a connection and hence it is not necessary to explicitly set the auth.
https://docs.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string#configure-a-connection-string-for-an-azure-storage-account
Note: the above connection string does not leak any actual keys, it is the default connection string used by the Azurite Emulator thus safe to share on public forums.
@Override | ||
public long getLength() { | ||
if (length == null) { | ||
length = blobClient().getProperties().getBlobSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the blob doesn't exist? Could this throw NotFoundException
to standardize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It throws a BlobStorageException: Status code 404, (BlobNotFound)
as of now. I can standardize it to throw NotFoundException
|
||
private void openStream(long offset) { | ||
final BlobInputStreamOptions options = new BlobInputStreamOptions().setRange(new BlobRange(offset)) | ||
.setBlockSize(azureProperties.readBlockSize(azureURI.storageAccount())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually put each chained method on a separate line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
final long bytesToSkip = newPos - pos; | ||
// BlobInputStream#skip only repositions the internal pointers, | ||
// the actual bytes are skipped when BlobInputStream#read is invoked. | ||
final long bytesSkipped = stream.skip(bytesToSkip); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this read through or just change the next request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not read through, it repositions internal pointers to change the next read request
targetContainerId createContainer.getContainerId() | ||
} | ||
|
||
task integrationTest(type: Test) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make sure that there is a workflow that runs these integration tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java-ci
workflow already takes care of this.
The check
task depends on the above integrationTest
task. The java-ci
workflow runs the check
task on each module in Iceberg which would in turn run the integration tests for azure.
iceberg/.github/workflows/java-ci.yml
Line 68 in 7c2ea01
- run: ./gradlew check -DsparkVersions= -DhiveVersions= -DflinkVersions= -Pquick=true -x javadoc |
|
||
package org.apache.iceberg.azure; | ||
|
||
public enum AuthType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this needs to be public.
AuthType
is accessed in AzureProperties
, AzureBlobClientFactory
, and IntegrationTests
.
Making it package-private would leave AzureProperties
unaffected since both of them reside in org.apache.iceberg.azure
package.
However, AzureBlobClientFactory
, and IntegrationTests
would throw a compilation error since they reside in org.apache.iceberg.azure.blob
package.
public class AzureProperties implements Serializable { | ||
|
||
// Start of storage account configuration | ||
public static final String STORAGE_CONNECTION_STRING = "azure.storage.%s.connection-string"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seem long. What about abfs.%s.uri
or abfs.%s.connection-string
instead? Does it really need to be "connection string"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seem long. What about abfs.%s.uri or abfs.%s.connection-string instead?
I can replace azure.storage
with abfs
for this and the rest of the configs to make them shorter.
Does it really need to be "connection string"?
It would be a good idea to keep connection-string
as is, since it would directly map to the connection-string
config available on the Azure portal, thereby reducing the chances of confusing this config with a different one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for working on this, @sumeetgajjar! Overall it is looking good.
@rdblue thanks for the review and your comments. |
bd2c950
to
b56ac5e
Compare
} | ||
|
||
@Override | ||
public void seek(long newPos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this take care of reverse seek issues, where connections can be terminated and reopened?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rbalamohan - can you please elaborate more on the reverse seek issues?
For the reverse seek case, we close the current stream and re-open the stream from the earlier offset.
// Seeking backward.
stream.close();
openStream(newPos); // newPos is a position back in the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connection close/reopens are expensive in cloud stores. https://issues.apache.org/jira/browse/HADOOP-12444 has more details on backward seek. Good set of tickets went in terms of fixing/improving for backward seek especially for columnar formats like ORC, Parq.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback - I can follow a similar approach in the current implementation where I simply set the newPos to the given value and can skip openStream in the seek method. Later lazily open a stream in the subsequent read request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Does this support Data Lake Storage Gen2? |
Hi @electrum - yes, DataLake Storage Gen2 is supported. In fact, we only support Gen2. |
@rdblue gentle ping to re-review the PR. |
Any plan to merge this PR? |
Curious about the state of this PR and if there is a plan to merge? |
Doesn't look like there is anything left to do. Will this be merged anytime soon? @rdblue ? |
Also curious about this one! |
Will this be merged anytime soon? I'd love to have support for blob storage instead of only adlfs (as implemented in #8303 ) |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
What changes were proposed in this pull request?
Currently, HadoopFileIO is used to talk to azure blob storage. This PR introduces AzureFileIO which uses Azure native SDK to communicate with Azure blob storage.
Does this PR introduce any user-facing change?
Yes, users can now configure the catalog
io-impl
property toorg.apache.iceberg.azure.blob.AzureBlobFileIO
to enable this feature.How was this patch tested?
Closes #4257