Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40037: WIP Check for directory marker metadata/ #5

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 75 additions & 20 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,22 @@ bool IsContainerNotFound(const Storage::StorageException& e) {
return false;
}

const auto kHierarchicalNamespaceIsDirectoryMetadataKey = "hdi_isFolder";
const auto kFlatNamespaceIsDirectoryMetadataKey = "is_directory";

bool MetadataIndicatesIsDirectory(const Storage::Metadata& metadata) {
// Inspired by
// https://github.com/Azure/azure-sdk-for-cpp/blob/12407e8bfcb9bc1aa43b253c1d0ec93bf795ae3b/sdk/storage/azure-storage-files-datalake/src/datalake_utilities.cpp#L86-L91
auto hierarchical_directory_metadata =
metadata.find(kHierarchicalNamespaceIsDirectoryMetadataKey);
if (hierarchical_directory_metadata != metadata.end()) {
return hierarchical_directory_metadata->second == "true";
}
auto flat_directory_metadata = metadata.find(kFlatNamespaceIsDirectoryMetadataKey);
return flat_directory_metadata != metadata.end() &&
flat_directory_metadata->second == "true";
}

template <typename ArrowType>
std::string FormatValue(typename TypeTraits<ArrowType>::CType value) {
struct StringAppender {
Expand Down Expand Up @@ -512,11 +528,18 @@ class ObjectInputFile final : public io::RandomAccessFile {

Status Init() {
if (content_length_ != kNoSize) {
// When the user provides the file size we don't validate that its a file. This is
// only a read so its not a big deal if the user makes a mistake.
DCHECK_GE(content_length_, 0);
return Status::OK();
}
try {
// To open an ObjectInputFile the Blob must exist and it must not represent
// a directory. Additionally we need to know the file size.
auto properties = blob_client_->GetProperties();
if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
return NotAFile(location_);
}
content_length_ = properties.Value.BlobSize;
metadata_ = PropertiesToMetadata(properties.Value);
return Status::OK();
Expand Down Expand Up @@ -698,11 +721,10 @@ class ObjectAppendStream final : public io::OutputStream {
ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const AzureOptions& options, int64_t size = kNoSize)
const AzureOptions& options)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
location_(location),
content_length_(size) {
location_(location) {
if (metadata && metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(metadata);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
Expand All @@ -716,17 +738,31 @@ class ObjectAppendStream final : public io::OutputStream {
io::internal::CloseFromDestructor(this);
}

Status Init() {
if (content_length_ != kNoSize) {
DCHECK_GE(content_length_, 0);
pos_ = content_length_;
Status Init(const bool truncate,
std::function<Status()> ensure_not_flat_namespace_directory) {
if (truncate) {
content_length_ = 0;
pos_ = 0;
// We need to create an empty file overwriting any existing file, but
// fail if there is an existing directory.
RETURN_NOT_OK(ensure_not_flat_namespace_directory());
// On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing
// directory so we don't need to check like we do on flat namespace.
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
} else {
try {
auto properties = block_blob_client_->GetProperties();
if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
return NotAFile(location_);
}
content_length_ = properties.Value.BlobSize;
pos_ = content_length_;
} catch (const Storage::StorageException& exception) {
if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
// No file exists but on flat namespace its possible there is a directory
// marker or an implied directory. Ensure there is no directory before starting
// a new empty file.
RETURN_NOT_OK(ensure_not_flat_namespace_directory());
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
} else {
return ExceptionToStatus(
Expand All @@ -743,6 +779,7 @@ class ObjectAppendStream final : public io::OutputStream {
block_ids_.push_back(block.Name);
}
}
initialised_ = true;
return Status::OK();
}

Expand Down Expand Up @@ -789,6 +826,11 @@ class ObjectAppendStream final : public io::OutputStream {

Status Flush() override {
RETURN_NOT_OK(CheckClosed("flush"));
if (!initialised_) {
// If the stream has not been successfully initialized then there is nothing to
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}
return CommitBlockList(block_blob_client_, block_ids_, metadata_);
}

Expand Down Expand Up @@ -840,10 +882,11 @@ class ObjectAppendStream final : public io::OutputStream {
std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;
int64_t content_length_ = kNoSize;

bool closed_ = false;
bool initialised_ = false;
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
std::vector<std::string> block_ids_;
Storage::Metadata metadata_;
};
Expand Down Expand Up @@ -1662,20 +1705,32 @@ class AzureFileSystem::Impl {
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));

const auto blob_container_client = GetBlobContainerClient(location.container);
auto block_blob_client = std::make_shared<Blobs::BlockBlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
.GetBlockBlobClient(location.path));
blob_container_client.GetBlockBlobClient(location.path));

auto ensure_not_flat_namespace_directory = [this, location,
blob_container_client]() -> Status {
ARROW_ASSIGN_OR_RAISE(
auto hns_support,
HierarchicalNamespaceSupport(GetFileSystemClient(location.container)));
if (hns_support == HNSSupport::kDisabled) {
// Flat namespace so we need to GetFileInfo in-case its a directory.
ARROW_ASSIGN_OR_RAISE(auto status, GetFileInfo(blob_container_client, location))
if (status.type() == FileType::Directory) {
return NotAFile(location);
}
}
// kContainerNotFound - it doesn't exist, so no need to check if its a directory.
// kEnabled - hierarchical namespace so Azure APIs will fail if its a directory. We
// don't need to explicitly check.
return Status::OK();
};

std::shared_ptr<ObjectAppendStream> stream;
if (truncate) {
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client));
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_, 0);
} else {
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_);
}
RETURN_NOT_OK(stream->Init());
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_);
RETURN_NOT_OK(stream->Init(truncate, ensure_not_flat_namespace_directory));
return stream;
}

Expand All @@ -1690,7 +1745,7 @@ class AzureFileSystem::Impl {
// on directory marker blobs.
// https://github.com/fsspec/adlfs/blob/32132c4094350fca2680155a5c236f2e9f991ba5/adlfs/spec.py#L855-L870
Blobs::UploadBlockBlobFromOptions blob_options;
blob_options.Metadata.emplace("is_directory", "true");
blob_options.Metadata.emplace(kFlatNamespaceIsDirectoryMetadataKey, "true");
block_blob_client.UploadFrom(nullptr, 0, blob_options);
}

Expand Down
60 changes: 60 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,41 @@ class TestAzureFileSystem : public ::testing::Test {
AssertFileInfo(fs(), subdir3, FileType::Directory);
}

void TestDisallowReadingOrWritingDirectoryMarkers() {
auto data = SetUpPreexistingData();
auto directory_path = data.Path("directory");

ASSERT_OK(fs()->CreateDir(directory_path));
ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path));

auto directory_path_with_slash = directory_path + "/";
ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path_with_slash));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path_with_slash));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path_with_slash));
}

void TestDisallowCreatingFileAndDirectoryWithTheSameName() {
auto data = SetUpPreexistingData();
auto path1 = data.Path("directory1");
ASSERT_OK(fs()->CreateDir(path1));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(path1));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(path1));
AssertFileInfo(fs(), path1, FileType::Directory);

auto path2 = data.Path("directory2");
ASSERT_OK(fs()->OpenOutputStream(path2));
// CreateDir returns OK even if there is already a file or directory at this
// location. Whether or not this is the desired behaviour is debatable.
ASSERT_OK(fs()->CreateDir(path2));
AssertFileInfo(fs(), path2, FileType::File);
}

void TestOpenOutputStreamWithMissingContainer() {
ASSERT_RAISES(IOError, fs()->OpenOutputStream("not-a-container/file", {}));
}

void TestDeleteDirSuccessEmpty() {
if (HasSubmitBatchBug()) {
GTEST_SKIP() << kSubmitBatchBugMessage;
Expand Down Expand Up @@ -1559,6 +1594,19 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirOnMissingContainer) {
this->TestCreateDirOnMissingContainer();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowReadingOrWritingDirectoryMarkers) {
this->TestDisallowReadingOrWritingDirectoryMarkers();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios,
DisallowCreatingFileAndDirectoryWithTheSameName) {
this->TestDisallowCreatingFileAndDirectoryWithTheSameName();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios, OpenOutputStreamWithMissingContainer) {
this->TestOpenOutputStreamWithMissingContainer();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) {
this->TestDeleteDirSuccessEmpty();
}
Expand Down Expand Up @@ -2162,6 +2210,18 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
.Value.Metadata;
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata);

// Metadata can be written without writing any data.
ASSERT_OK_AND_ASSIGN(
output, fs_with_defaults->OpenAppendStream(
full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}})));
ASSERT_OK(output->Close());
blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
.GetBlockBlobClient(blob_path)
.GetProperties()
.Value.Metadata;
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata);
}

TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {
Expand Down
Loading