diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 23af67a33d688..5f7737dd44e3b 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -347,6 +347,22 @@ bool IsContainerNotFound(const Storage::StorageException& e) { return false; } +const auto kHierarchicalNamespaceIsDirectoryMetadataKey = "hdi_isFolder"; +const auto kFlatNamespaceIsDirectoryMetadataKey = "is_directory"; + +bool MetadataIndicatesIsDirectory(const Storage::Metadata& metadata) { + // Inspired by + // https://github.com/Azure/azure-sdk-for-cpp/blob/12407e8bfcb9bc1aa43b253c1d0ec93bf795ae3b/sdk/storage/azure-storage-files-datalake/src/datalake_utilities.cpp#L86-L91 + auto hierarchical_directory_metadata = + metadata.find(kHierarchicalNamespaceIsDirectoryMetadataKey); + if (hierarchical_directory_metadata != metadata.end()) { + return hierarchical_directory_metadata->second == "true"; + } + auto flat_directory_metadata = metadata.find(kFlatNamespaceIsDirectoryMetadataKey); + return flat_directory_metadata != metadata.end() && + flat_directory_metadata->second == "true"; +} + template std::string FormatValue(typename TypeTraits::CType value) { struct StringAppender { @@ -512,11 +528,18 @@ class ObjectInputFile final : public io::RandomAccessFile { Status Init() { if (content_length_ != kNoSize) { + // When the user provides the file size we don't validate that its a file. This is + // only a read so its not a big deal if the user makes a mistake. DCHECK_GE(content_length_, 0); return Status::OK(); } try { + // To open an ObjectInputFile the Blob must exist and it must not represent + // a directory. Additionally we need to know the file size. auto properties = blob_client_->GetProperties(); + if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) { + return NotAFile(location_); + } content_length_ = properties.Value.BlobSize; metadata_ = PropertiesToMetadata(properties.Value); return Status::OK(); @@ -698,11 +721,10 @@ class ObjectAppendStream final : public io::OutputStream { ObjectAppendStream(std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzureLocation& location, const std::shared_ptr& metadata, - const AzureOptions& options, int64_t size = kNoSize) + const AzureOptions& options) : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), - location_(location), - content_length_(size) { + location_(location) { if (metadata && metadata->size() != 0) { metadata_ = ArrowMetadataToAzureMetadata(metadata); } else if (options.default_metadata && options.default_metadata->size() != 0) { @@ -716,17 +738,31 @@ class ObjectAppendStream final : public io::OutputStream { io::internal::CloseFromDestructor(this); } - Status Init() { - if (content_length_ != kNoSize) { - DCHECK_GE(content_length_, 0); - pos_ = content_length_; + Status Init(const bool truncate, + std::function ensure_not_flat_namespace_directory) { + if (truncate) { + content_length_ = 0; + pos_ = 0; + // We need to create an empty file overwriting any existing file, but + // fail if there is an existing directory. + RETURN_NOT_OK(ensure_not_flat_namespace_directory()); + // On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing + // directory so we don't need to check like we do on flat namespace. + RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_)); } else { try { auto properties = block_blob_client_->GetProperties(); + if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) { + return NotAFile(location_); + } content_length_ = properties.Value.BlobSize; pos_ = content_length_; } catch (const Storage::StorageException& exception) { if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + // No file exists but on flat namespace its possible there is a directory + // marker or an implied directory. Ensure there is no directory before starting + // a new empty file. + RETURN_NOT_OK(ensure_not_flat_namespace_directory()); RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_)); } else { return ExceptionToStatus( @@ -743,6 +779,7 @@ class ObjectAppendStream final : public io::OutputStream { block_ids_.push_back(block.Name); } } + initialised_ = true; return Status::OK(); } @@ -789,6 +826,11 @@ class ObjectAppendStream final : public io::OutputStream { Status Flush() override { RETURN_NOT_OK(CheckClosed("flush")); + if (!initialised_) { + // If the stream has not been successfully initialized then there is nothing to + // flush. This also avoids some unhandled errors when flushing in the destructor. + return Status::OK(); + } return CommitBlockList(block_blob_client_, block_ids_, metadata_); } @@ -840,10 +882,11 @@ class ObjectAppendStream final : public io::OutputStream { std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; + int64_t content_length_ = kNoSize; bool closed_ = false; + bool initialised_ = false; int64_t pos_ = 0; - int64_t content_length_ = kNoSize; std::vector block_ids_; Storage::Metadata metadata_; }; @@ -1662,20 +1705,32 @@ class AzureFileSystem::Impl { AzureFileSystem* fs) { RETURN_NOT_OK(ValidateFileLocation(location)); + const auto blob_container_client = GetBlobContainerClient(location.container); auto block_blob_client = std::make_shared( - blob_service_client_->GetBlobContainerClient(location.container) - .GetBlockBlobClient(location.path)); + blob_container_client.GetBlockBlobClient(location.path)); + + auto ensure_not_flat_namespace_directory = [this, location, + blob_container_client]() -> Status { + ARROW_ASSIGN_OR_RAISE( + auto hns_support, + HierarchicalNamespaceSupport(GetFileSystemClient(location.container))); + if (hns_support == HNSSupport::kDisabled) { + // Flat namespace so we need to GetFileInfo in-case its a directory. + ARROW_ASSIGN_OR_RAISE(auto status, GetFileInfo(blob_container_client, location)) + if (status.type() == FileType::Directory) { + return NotAFile(location); + } + } + // kContainerNotFound - it doesn't exist, so no need to check if its a directory. + // kEnabled - hierarchical namespace so Azure APIs will fail if its a directory. We + // don't need to explicitly check. + return Status::OK(); + }; std::shared_ptr stream; - if (truncate) { - RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client)); - stream = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, options_, 0); - } else { - stream = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, options_); - } - RETURN_NOT_OK(stream->Init()); + stream = std::make_shared(block_blob_client, fs->io_context(), + location, metadata, options_); + RETURN_NOT_OK(stream->Init(truncate, ensure_not_flat_namespace_directory)); return stream; } @@ -1690,7 +1745,7 @@ class AzureFileSystem::Impl { // on directory marker blobs. // https://github.com/fsspec/adlfs/blob/32132c4094350fca2680155a5c236f2e9f991ba5/adlfs/spec.py#L855-L870 Blobs::UploadBlockBlobFromOptions blob_options; - blob_options.Metadata.emplace("is_directory", "true"); + blob_options.Metadata.emplace(kFlatNamespaceIsDirectoryMetadataKey, "true"); block_blob_client.UploadFrom(nullptr, 0, blob_options); } diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index e6bd80d1d2508..649536bb348c8 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -826,6 +826,41 @@ class TestAzureFileSystem : public ::testing::Test { AssertFileInfo(fs(), subdir3, FileType::Directory); } + void TestDisallowReadingOrWritingDirectoryMarkers() { + auto data = SetUpPreexistingData(); + auto directory_path = data.Path("directory"); + + ASSERT_OK(fs()->CreateDir(directory_path)); + ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path)); + ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path)); + ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path)); + + auto directory_path_with_slash = directory_path + "/"; + ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path_with_slash)); + ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path_with_slash)); + ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path_with_slash)); + } + + void TestDisallowCreatingFileAndDirectoryWithTheSameName() { + auto data = SetUpPreexistingData(); + auto path1 = data.Path("directory1"); + ASSERT_OK(fs()->CreateDir(path1)); + ASSERT_RAISES(IOError, fs()->OpenOutputStream(path1)); + ASSERT_RAISES(IOError, fs()->OpenAppendStream(path1)); + AssertFileInfo(fs(), path1, FileType::Directory); + + auto path2 = data.Path("directory2"); + ASSERT_OK(fs()->OpenOutputStream(path2)); + // CreateDir returns OK even if there is already a file or directory at this + // location. Whether or not this is the desired behaviour is debatable. + ASSERT_OK(fs()->CreateDir(path2)); + AssertFileInfo(fs(), path2, FileType::File); + } + + void TestOpenOutputStreamWithMissingContainer() { + ASSERT_RAISES(IOError, fs()->OpenOutputStream("not-a-container/file", {})); + } + void TestDeleteDirSuccessEmpty() { if (HasSubmitBatchBug()) { GTEST_SKIP() << kSubmitBatchBugMessage; @@ -1559,6 +1594,19 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirOnMissingContainer) { this->TestCreateDirOnMissingContainer(); } +TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowReadingOrWritingDirectoryMarkers) { + this->TestDisallowReadingOrWritingDirectoryMarkers(); +} + +TYPED_TEST(TestAzureFileSystemOnAllScenarios, + DisallowCreatingFileAndDirectoryWithTheSameName) { + this->TestDisallowCreatingFileAndDirectoryWithTheSameName(); +} + +TYPED_TEST(TestAzureFileSystemOnAllScenarios, OpenOutputStreamWithMissingContainer) { + this->TestOpenOutputStreamWithMissingContainer(); +} + TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) { this->TestDeleteDirSuccessEmpty(); } @@ -2162,6 +2210,18 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) { .Value.Metadata; // Defaults are overwritten and not merged. EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata); + + // Metadata can be written without writing any data. + ASSERT_OK_AND_ASSIGN( + output, fs_with_defaults->OpenAppendStream( + full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}}))); + ASSERT_OK(output->Close()); + blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name) + .GetBlockBlobClient(blob_path) + .GetProperties() + .Value.Metadata; + // Defaults are overwritten and not merged. + EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata); } TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {