diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 87b9822878cce..d4bb445701444 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -15,6 +15,11 @@ // specific language governing permissions and limitations // under the License. +#include +#include +#include +#include + #include "arrow/filesystem/azurefs.h" #include "arrow/filesystem/azurefs_internal.h" @@ -283,18 +288,26 @@ struct AzureLocation { template Status ExceptionToStatus(const Storage::StorageException& exception, PrefixArgs&&... prefix_args) { - return Status::IOError(std::forward(prefix_args)..., - " Azure Error: ", exception.what()); + return Status::IOError(std::forward(prefix_args)..., " Azure Error: [", + exception.ErrorCode, "] ", exception.what()); } Status PathNotFound(const AzureLocation& location) { return ::arrow::fs::internal::PathNotFound(location.all); } +Status NotADir(const AzureLocation& location) { + return ::arrow::fs::internal::NotADir(location.all); +} + Status NotAFile(const AzureLocation& location) { return ::arrow::fs::internal::NotAFile(location.all); } +Status NotEmpty(const AzureLocation& location) { + return ::arrow::fs::internal::NotEmpty(location.all); +} + Status ValidateFileLocation(const AzureLocation& location) { if (location.container.empty()) { return PathNotFound(location); @@ -305,6 +318,23 @@ Status ValidateFileLocation(const AzureLocation& location) { return internal::AssertNoTrailingSlash(location.path); } +Status InvalidDirMoveToSubdir(const AzureLocation& src, const AzureLocation& dest) { + return Status::Invalid("Cannot Move to '", dest.all, "' and make '", src.all, + "' a sub-directory of itself."); +} + +Status DestinationParentPathNotFound(const AzureLocation& dest) { + return Status::IOError("The parent directory of the destination path '", dest.all, + "' does not exist."); +} + +Status CrossContainerMoveNotImplemented(const AzureLocation& src, + const AzureLocation& dest) { + return Status::NotImplemented( + "Move of '", src.all, "' to '", dest.all, + "' requires moving data between containers, which is not implemented."); +} + bool IsContainerNotFound(const Storage::StorageException& e) { // In some situations, only the ReasonPhrase is set and the // ErrorCode is empty, so we check both. @@ -942,6 +972,185 @@ FileInfo FileInfoFromBlob(std::string_view container, return info; } +/// \brief RAII-style guard for releasing a lease on a blob or container. +/// +/// The guard should be constructed right after a successful BlobLeaseClient::Acquire() +/// call. Use std::optional to declare a guard in outer scope and construct it +/// later with std::optional::emplace(...). +/// +/// Leases expire automatically, but explicit release means concurrent clients or +/// ourselves when trying new operations on the same blob or container don't have +/// to wait for the lease to expire by itself. +/// +/// Learn more about leases at +/// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob +class LeaseGuard { + public: + using SteadyClock = std::chrono::steady_clock; + + private: + /// \brief The time when the lease expires or is broken. + /// + /// The lease is not guaranteed to be valid until this time, but it is guaranteed to + /// be expired after this time. In other words, this is an overestimation of + /// the true time_point. + SteadyClock::time_point break_or_expires_at_; + const std::unique_ptr lease_client_; + bool release_attempt_pending_ = true; + + /// \brief The latest known expiry time of a lease guarded by this class + /// that failed to be released or was forgotten by calling Forget(). + static std::atomic latest_known_expiry_time_; + + /// \brief The maximum lease duration supported by Azure Storage. + static constexpr std::chrono::seconds kMaxLeaseDuration{60}; + + public: + LeaseGuard(std::unique_ptr lease_client, + std::chrono::seconds lease_duration) + : break_or_expires_at_(SteadyClock::now() + + std::min(kMaxLeaseDuration, lease_duration)), + lease_client_(std::move(lease_client)) { + DCHECK(lease_duration <= kMaxLeaseDuration); + DCHECK(this->lease_client_); + } + + ARROW_DISALLOW_COPY_AND_ASSIGN(LeaseGuard); + + ~LeaseGuard() { + // No point in trying any error handling here other than the debug checking. The lease + // will eventually expire on the backend without any intervention from us (just much + // later than if we released it). + [[maybe_unused]] auto status = Release(); + ARROW_LOG(DEBUG) << status; + } + + bool PendingRelease() const { + return release_attempt_pending_ && SteadyClock::now() <= break_or_expires_at_; + } + + private: + Status DoRelease() { + DCHECK(release_attempt_pending_); + try { + lease_client_->Release(); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to release the ", + lease_client_->GetLeaseId(), " lease"); + } + return Status::OK(); + } + + public: + std::string LeaseId() const { return lease_client_->GetLeaseId(); } + + bool StillValidFor(SteadyClock::duration expected_time_left) const { + return SteadyClock::now() + expected_time_left < break_or_expires_at_; + } + + /// \brief Break the lease. + /// + /// The lease will stay in the "Breaking" state for break_period seconds or + /// less if the lease is expiring before that. + /// + /// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-container#outcomes-of-use-attempts-on-containers-by-lease-state + /// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob#outcomes-of-use-attempts-on-blobs-by-lease-state + Status Break(Azure::Nullable break_period = {}) { + auto remaining_time_ms = [this]() { + const auto remaining_time = break_or_expires_at_ - SteadyClock::now(); + return std::chrono::duration_cast(remaining_time); + }; +#ifndef NDEBUG + if (break_period.HasValue() && !StillValidFor(*break_period)) { + ARROW_LOG(WARNING) + << "Azure Storage: requested break_period (" + << break_period.ValueOr(std::chrono::seconds{0}).count() + << "s) is too long or lease duration is too short for all the operations " + "performed so far (lease expires in " + << remaining_time_ms().count() << "ms)"; + } +#endif + Blobs::BreakLeaseOptions options; + options.BreakPeriod = break_period; + try { + lease_client_->Break(options); + break_or_expires_at_ = + std::min(break_or_expires_at_, + SteadyClock::now() + break_period.ValueOr(std::chrono::seconds{0})); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to break the ", + lease_client_->GetLeaseId(), " lease expiring in ", + remaining_time_ms().count(), "ms"); + } + return Status::OK(); + } + + /// \brief Break the lease before deleting or renaming the resource. + /// + /// Calling this is recommended when the resource for which the lease was acquired is + /// about to be deleted as there is no way of releasing the lease after that, we can + /// only forget about it. The break_period should be a conservative estimate of the time + /// it takes to delete/rename the resource. + /// + /// If break_period is too small, the delete/rename will fail with a lease conflict, + /// and if it's too large the only consequence is that a lease on a non-existent + /// resource will remain in the "Breaking" state for a while blocking others + /// from recreating the resource. + void BreakBeforeDeletion(std::chrono::seconds break_period) { + ARROW_CHECK_OK(Break(break_period)); + } + + // These functions are marked ARROW_NOINLINE because they are called from + // multiple locations, but are not performance-critical. + + ARROW_NOINLINE Status Release() { + if (!PendingRelease()) { + return Status::OK(); + } + auto status = DoRelease(); + if (!status.ok()) { + Forget(); + return status; + } + release_attempt_pending_ = false; + return Status::OK(); + } + + /// \brief Prevent any release attempts in the destructor. + /// + /// When it's known they would certainly fail. + /// \see LeaseGuard::BreakBeforeDeletion() + ARROW_NOINLINE void Forget() { + if (!PendingRelease()) { + release_attempt_pending_ = false; + return; + } + release_attempt_pending_ = false; + // Remember the latest known expiry time so we can gracefully handle lease + // acquisition failures by waiting until the latest forgotten lease. + auto latest = latest_known_expiry_time_.load(std::memory_order_relaxed); + while ( + latest < break_or_expires_at_ && + !latest_known_expiry_time_.compare_exchange_weak(latest, break_or_expires_at_)) { + } + DCHECK_GE(latest_known_expiry_time_.load(), break_or_expires_at_); + } + + ARROW_NOINLINE static void WaitUntilLatestKnownExpiryTime() { + auto remaining_time = latest_known_expiry_time_.load() - SteadyClock::now(); +#ifndef NDEBUG + int64_t remaining_time_ms = + std::chrono::duration_cast(remaining_time).count(); + ARROW_LOG(WARNING) << "LeaseGuard::WaitUntilLatestKnownExpiryTime for " + << remaining_time_ms << "ms..."; +#endif + DCHECK(remaining_time <= kMaxLeaseDuration); + if (remaining_time > SteadyClock::duration::zero()) { + std::this_thread::sleep_for(remaining_time); + } + } +}; + } // namespace class AzureFileSystem::Impl { @@ -975,6 +1184,11 @@ class AzureFileSystem::Impl { return blob_service_client_->GetBlobContainerClient(container_name); } + Blobs::BlobClient GetBlobClient(const std::string& container_name, + const std::string& blob_name) { + return GetBlobContainerClient(container_name).GetBlobClient(blob_name); + } + /// \param container_name Also known as "filesystem" in the ADLS Gen2 API. DataLake::DataLakeFileSystemClient GetFileSystemClient( const std::string& container_name) { @@ -1030,11 +1244,14 @@ class AzureFileSystem::Impl { /// \pre location.path is not empty. Result GetFileInfo(const DataLake::DataLakeFileSystemClient& adlfs_client, - const AzureLocation& location) { + const AzureLocation& location, + Azure::Nullable lease_id = {}) { auto file_client = adlfs_client.GetFileClient(location.path); + DataLake::GetPathPropertiesOptions options; + options.AccessConditions.LeaseId = std::move(lease_id); try { FileInfo info{location.all}; - auto properties = file_client.GetProperties(); + auto properties = file_client.GetProperties(options); if (properties.Value.IsDirectory) { info.set_type(FileType::Directory); } else if (internal::HasTrailingSlash(location.path)) { @@ -1115,6 +1332,22 @@ class AzureFileSystem::Impl { } } + Result GetFileInfoOfPathWithinContainer(const AzureLocation& location) { + DCHECK(!location.container.empty() && !location.path.empty()); + // There is a path to search within the container. Check HNS support to proceed. + auto adlfs_client = GetFileSystemClient(location.container); + ARROW_ASSIGN_OR_RAISE(auto hns_support, HierarchicalNamespaceSupport(adlfs_client)); + if (hns_support == HNSSupport::kContainerNotFound) { + return FileInfo{location.all, FileType::NotFound}; + } + if (hns_support == HNSSupport::kEnabled) { + return GetFileInfo(adlfs_client, location); + } + DCHECK_EQ(hns_support, HNSSupport::kDisabled); + auto container_client = GetBlobContainerClient(location.container); + return GetFileInfo(container_client, location); + } + private: /// \pref location.container is not empty. template @@ -1320,8 +1553,7 @@ class AzureFileSystem::Impl { AzureFileSystem* fs) { RETURN_NOT_OK(ValidateFileLocation(location)); auto blob_client = std::make_shared( - blob_service_client_->GetBlobContainerClient(location.container) - .GetBlobClient(location.path)); + GetBlobClient(location.container, location.path)); auto ptr = std::make_shared(blob_client, fs->io_context(), std::move(location)); @@ -1340,8 +1572,7 @@ class AzureFileSystem::Impl { ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(info.path())); RETURN_NOT_OK(ValidateFileLocation(location)); auto blob_client = std::make_shared( - blob_service_client_->GetBlobContainerClient(location.container) - .GetBlobClient(location.path)); + GetBlobClient(location.container, location.path)); auto ptr = std::make_shared(blob_client, fs->io_context(), std::move(location), info.size()); @@ -1625,13 +1856,16 @@ class AzureFileSystem::Impl { /// \pre location.path is not empty. Status DeleteDirOnFileSystem(const DataLake::DataLakeFileSystemClient& adlfs_client, const AzureLocation& location, bool recursive, - bool require_dir_to_exist) { + bool require_dir_to_exist, + Azure::Nullable lease_id = {}) { DCHECK(!location.container.empty()); DCHECK(!location.path.empty()); auto directory_client = adlfs_client.GetDirectoryClient(location.path); + DataLake::DeleteDirectoryOptions options; + options.AccessConditions.LeaseId = std::move(lease_id); try { - auto response = - recursive ? directory_client.DeleteRecursive() : directory_client.DeleteEmpty(); + auto response = recursive ? directory_client.DeleteRecursive(options) + : directory_client.DeleteEmpty(options); // Only the "*IfExists" functions ever set Deleted to false. // All the others either succeed or throw an exception. DCHECK(response.Value.Deleted); @@ -1710,17 +1944,440 @@ class AzureFileSystem::Impl { return Status::OK(); } + private: + /// \brief Create a BlobLeaseClient and acquire a lease on the container. + /// + /// \param allow_missing_container if true, a nullptr may be returned when the container + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes container not found) + Result> AcquireContainerLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing_container = false, bool retry_allowed = true) { + DCHECK(!location.container.empty()); + auto container_client = GetBlobContainerClient(location.container); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto container_url = container_client.GetUrl(); + auto lease_client = std::make_unique( + std::move(container_client), std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (IsContainerNotFound(exception)) { + if (allow_missing_container) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireContainerLease(location, lease_duration, allow_missing_container, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on container '", + location.container, "': ", container_url); + } + return lease_client; + } + + /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or + /// directory if Hierarchical Namespace is supported). + /// + /// \param allow_missing if true, a nullptr may be returned when the blob + /// doesn't exist, otherwise a PathNotFound(location) error is produced right away + /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and + /// optional (nullptr denotes blob not found) + Result> AcquireBlobLease( + const AzureLocation& location, std::chrono::seconds lease_duration, + bool allow_missing = false, bool retry_allowed = true) { + DCHECK(!location.container.empty() && !location.path.empty()); + auto path = std::string{internal::RemoveTrailingSlash(location.path)}; + auto blob_client = GetBlobClient(location.container, std::move(path)); + auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId(); + auto blob_url = blob_client.GetUrl(); + auto lease_client = std::make_unique(std::move(blob_client), + std::move(lease_id)); + try { + [[maybe_unused]] auto result = lease_client->Acquire(lease_duration); + DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId()); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + if (allow_missing) { + return nullptr; + } + return PathNotFound(location); + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "LeaseAlreadyPresent") { + if (retry_allowed) { + LeaseGuard::WaitUntilLatestKnownExpiryTime(); + return AcquireBlobLease(location, lease_duration, allow_missing, + /*retry_allowed=*/false); + } + } + return ExceptionToStatus(exception, "Failed to acquire a lease on file '", + location.all, "': ", blob_url); + } + return lease_client; + } + + /// \brief The default lease duration used for acquiring a lease on a container or blob. + /// + /// Azure Storage leases can be acquired for a duration of 15 to 60 seconds. + /// + /// Operations consisting of an unpredictable number of sub-operations should + /// renew the lease periodically (heartbeat pattern) instead of acquiring an + /// infinite lease (very bad idea for a library like Arrow). + static constexpr auto kLeaseDuration = std::chrono::seconds{15}; + + // These are conservative estimates of how long it takes for the client + // request to reach the server counting from the moment the Azure SDK function + // issuing the request is called. See their usage for more context. + // + // If the client connection to the server is unpredictably slow, operations + // may fail, but due to the use of leases, the entire arrow::FileSystem + // operation can be retried without risk of data loss. Thus, unexpected network + // slow downs can be fixed with retries (either by some system using Arrow or + // an user interactively retrying a failed operation). + // + // If a network is permanently slow, the lease time and these numbers should be + // increased but not so much so that the client gives up an operation because the + // values say it takes more time to reach the server than the remaining lease + // time on the resources. + // + // NOTE: The initial constant values were chosen conservatively. If we learn, + // from experience, that they are causing issues, we can increase them. And if + // broadly applicable values aren't possible, we can make them configurable. + static constexpr auto kTimeNeededForContainerDeletion = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForContainerRename = std::chrono::seconds{3}; + static constexpr auto kTimeNeededForFileOrDirectoryRename = std::chrono::seconds{3}; + + public: + /// The conditions for a successful container rename are derived from the + /// conditions for a successful `Move("/$src.container", "/$dest.container")`. + /// The numbers here match the list in `Move`. + /// + /// 1. `src.container` must exist. + /// 2. If `src.container` and `dest.container` are the same, do nothing and + /// return OK. + /// 3. N/A. + /// 4. N/A. + /// 5. `dest.container` doesn't exist or is empty. + /// NOTE: one exception related to container Move is that when the + /// destination is empty we also require the source container to be empty, + /// because the only way to perform the "Move" is by deleting the source + /// instead of deleting the destination and renaming the source. + Status RenameContainer(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && src.path.empty()); + DCHECK(!dest.container.empty() && dest.path.empty()); + auto src_container_client = GetBlobContainerClient(src.container); + + // If src and dest are the same, we only have to check src exists. + if (src.container == dest.container) { + ARROW_ASSIGN_OR_RAISE(auto info, + GetContainerPropsAsFileInfo(src, src_container_client)); + if (info.type() == FileType::NotFound) { + return PathNotFound(src); + } + DCHECK(info.type() == FileType::Directory); + return Status::OK(); + } + + // Acquire a lease on the source container because (1) we need the lease + // before rename and (2) it works as a way of checking the container exists. + ARROW_ASSIGN_OR_RAISE(auto src_lease_client, + AcquireContainerLease(src, kLeaseDuration)); + LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration}; + // Check dest.container doesn't exist or is empty. + auto dest_container_client = GetBlobContainerClient(dest.container); + std::optional dest_lease_guard; + bool dest_exists = false; + bool dest_is_empty = false; + ARROW_ASSIGN_OR_RAISE( + auto dest_lease_client, + AcquireContainerLease(dest, kLeaseDuration, /*allow_missing_container*/ true)); + if (dest_lease_client) { + dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration); + dest_exists = true; + // Emptiness check after successful acquisition of the lease. + Blobs::ListBlobsOptions list_blobs_options; + list_blobs_options.PageSizeHint = 1; + try { + auto dest_list_response = dest_container_client.ListBlobs(list_blobs_options); + dest_is_empty = dest_list_response.Blobs.empty(); + if (!dest_is_empty) { + return NotEmpty(dest); + } + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to check that '", dest.container, + "' is empty: ", dest_container_client.GetUrl()); + } + } + DCHECK(!dest_exists || dest_is_empty); + + if (!dest_exists) { + // Rename the source container. + Blobs::RenameBlobContainerOptions options; + options.SourceAccessConditions.LeaseId = src_lease_guard.LeaseId(); + try { + src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerRename); + blob_service_client_->RenameBlobContainer(src.container, dest.container, options); + src_lease_guard.Forget(); + } catch (const Storage::StorageException& exception) { + if (exception.StatusCode == Http::HttpStatusCode::BadRequest && + exception.ErrorCode == "InvalidQueryParameterValue") { + auto param_name = exception.AdditionalInformation.find("QueryParameterName"); + if (param_name != exception.AdditionalInformation.end() && + param_name->second == "comp") { + return ExceptionToStatus( + exception, "The 'rename' operation is not supported on containers. ", + "Attempting a rename of '", src.container, "' to '", dest.container, + "': ", blob_service_client_->GetUrl()); + } + } + return ExceptionToStatus(exception, "Failed to rename container '", src.container, + "' to '", dest.container, + "': ", blob_service_client_->GetUrl()); + } + } else if (dest_is_empty) { + // Even if we deleted the empty dest.container, RenameBlobContainer() would still + // fail because containers are not immediately deleted by the service -- they are + // deleted asynchronously based on retention policies and non-deterministic behavior + // of the garbage collection process. + // + // One way to still match POSIX rename semantics is to delete the src.container if + // and only if it's empty because the final state would be equivalent to replacing + // the dest.container with the src.container and its contents (which happens + // to also be empty). + Blobs::ListBlobsOptions list_blobs_options; + list_blobs_options.PageSizeHint = 1; + try { + auto src_list_response = src_container_client.ListBlobs(list_blobs_options); + if (!src_list_response.Blobs.empty()) { + // Reminder: dest is used here because we're semantically replacing dest + // with src. By deleting src if it's empty just like dest. + return Status::IOError("Unable to replace empty container: '", dest.all, "'"); + } + // Delete the source container now that we know it's empty. + Blobs::DeleteBlobContainerOptions options; + options.AccessConditions.LeaseId = src_lease_guard.LeaseId(); + DCHECK(dest_lease_guard.has_value()); + // Make sure lease on dest is still valid before deleting src. This is to ensure + // the destination container is not deleted by another process/client before + // Move() returns. + if (!dest_lease_guard->StillValidFor(kTimeNeededForContainerDeletion)) { + return Status::IOError("Unable to replace empty container: '", dest.all, "'. ", + "Preparation for the operation took too long and a " + "container lease expired."); + } + try { + src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerDeletion); + src_container_client.Delete(options); + src_lease_guard.Forget(); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Failed to delete empty container: '", + src.container, "': ", src_container_client.GetUrl()); + } + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus(exception, "Unable to replace empty container: '", + dest.all, "': ", dest_container_client.GetUrl()); + } + } + return Status::OK(); + } + + Status MoveContainerToPath(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && src.path.empty()); + DCHECK(!dest.container.empty() && !dest.path.empty()); + // Check Move pre-condition 1 -- `src` must exist. + auto container_client = GetBlobContainerClient(src.container); + ARROW_ASSIGN_OR_RAISE(auto src_info, + GetContainerPropsAsFileInfo(src, container_client)); + if (src_info.type() == FileType::NotFound) { + return PathNotFound(src); + } + // Check Move pre-condition 2. + if (src.container == dest.container) { + return InvalidDirMoveToSubdir(src, dest); + } + // Instead of checking more pre-conditions, we just abort with a + // NotImplemented status. + return CrossContainerMoveNotImplemented(src, dest); + } + + Status CreateContainerFromPath(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && !src.path.empty()); + DCHECK(!dest.empty() && dest.path.empty()); + ARROW_ASSIGN_OR_RAISE(auto src_file_info, GetFileInfoOfPathWithinContainer(src)); + switch (src_file_info.type()) { + case FileType::Unknown: + case FileType::NotFound: + return PathNotFound(src); + case FileType::File: + return Status::Invalid( + "Creating files at '/' is not possible, only directories."); + case FileType::Directory: + break; + } + if (src.container == dest.container) { + return InvalidDirMoveToSubdir(src, dest); + } + return CrossContainerMoveNotImplemented(src, dest); + } + + Status MovePathWithDataLakeAPI( + const DataLake::DataLakeFileSystemClient& src_adlfs_client, + const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && !src.path.empty()); + DCHECK(!dest.container.empty() && !dest.path.empty()); + const auto src_path = std::string{internal::RemoveTrailingSlash(src.path)}; + const auto dest_path = std::string{internal::RemoveTrailingSlash(dest.path)}; + + // Ensure that src exists and, if path has a trailing slash, that it's a directory. + ARROW_ASSIGN_OR_RAISE(auto src_lease_client, AcquireBlobLease(src, kLeaseDuration)); + LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration}; + // It might be necessary to check src is a directory 0-3 times in this function, + // so we use a lazy evaluation function to avoid redundant calls to GetFileInfo(). + std::optional src_is_dir_opt{}; + auto src_is_dir_lazy = [&]() -> Result { + if (src_is_dir_opt.has_value()) { + return *src_is_dir_opt; + } + ARROW_ASSIGN_OR_RAISE( + auto src_info, GetFileInfo(src_adlfs_client, src, src_lease_guard.LeaseId())); + src_is_dir_opt = src_info.type() == FileType::Directory; + return *src_is_dir_opt; + }; + // src must be a directory if it has a trailing slash. + if (internal::HasTrailingSlash(src.path)) { + ARROW_ASSIGN_OR_RAISE(auto src_is_dir, src_is_dir_lazy()); + if (!src_is_dir) { + return NotADir(src); + } + } + // The Azure SDK and the backend don't perform many important checks, so we have to + // do them ourselves. Additionally, based on many tests on a default-configuration + // storage account, if the destination is an empty directory, the rename operation + // will most likely fail due to a timeout. Providing both leases -- to source and + // destination -- seems to have made things work. + ARROW_ASSIGN_OR_RAISE(auto dest_lease_client, + AcquireBlobLease(dest, kLeaseDuration, /*allow_missing=*/true)); + std::optional dest_lease_guard; + if (dest_lease_client) { + dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration); + // Perform all the checks on dest (and src) before proceeding with the rename. + auto dest_adlfs_client = GetFileSystemClient(dest.container); + ARROW_ASSIGN_OR_RAISE(auto dest_info, GetFileInfo(dest_adlfs_client, dest, + dest_lease_guard->LeaseId())); + if (dest_info.type() == FileType::Directory) { + ARROW_ASSIGN_OR_RAISE(auto src_is_dir, src_is_dir_lazy()); + if (!src_is_dir) { + // If src is a regular file, complain that dest is a directory + // like POSIX rename() does. + return internal::IsADir(dest.all); + } + } else { + if (internal::HasTrailingSlash(dest.path)) { + return NotADir(dest); + } + } + } else { + // If the destination has trailing slash, we would have to check that it's a + // directory, but since it doesn't exist we must return PathNotFound... + if (internal::HasTrailingSlash(dest.path)) { + // ...unless the src is a directory, in which case we can proceed with the rename. + ARROW_ASSIGN_OR_RAISE(auto src_is_dir, src_is_dir_lazy()); + if (!src_is_dir) { + return PathNotFound(dest); + } + } + } + + try { + // NOTE: The Azure SDK provides a RenameDirectory() function, but the + // implementation is the same as RenameFile() with the only difference being + // the type of the returned object (DataLakeDirectoryClient vs DataLakeFileClient). + // + // If we call RenameDirectory() and the source is a file, no error would + // be returned and the file would be renamed just fine (!). + // + // Since we don't use the returned object, we can just use RenameFile() for both + // files and directories. Ideally, the SDK would simply expose the PathClient + // that it uses internally for both files and directories. + DataLake::RenameFileOptions options; + options.DestinationFileSystem = dest.container; + options.SourceAccessConditions.LeaseId = src_lease_guard.LeaseId(); + if (dest_lease_guard.has_value()) { + options.AccessConditions.LeaseId = dest_lease_guard->LeaseId(); + } + src_lease_guard.BreakBeforeDeletion(kTimeNeededForFileOrDirectoryRename); + src_adlfs_client.RenameFile(src_path, dest_path, options); + src_lease_guard.Forget(); + } catch (const Storage::StorageException& exception) { + // https://learn.microsoft.com/en-gb/rest/api/storageservices/datalakestoragegen2/path/create + if (exception.StatusCode == Http::HttpStatusCode::NotFound) { + if (exception.ErrorCode == "PathNotFound") { + return PathNotFound(src); + } + // "FilesystemNotFound" could be triggered by the source or destination filesystem + // not existing, but since we already checked the source filesystem exists (and + // hold a lease to it), we can assume the destination filesystem is the one the + // doesn't exist. + if (exception.ErrorCode == "FilesystemNotFound" || + exception.ErrorCode == "RenameDestinationParentPathNotFound") { + return DestinationParentPathNotFound(dest); + } + } else if (exception.StatusCode == Http::HttpStatusCode::Conflict && + exception.ErrorCode == "PathAlreadyExists") { + // "PathAlreadyExists" is only produced when the destination exists and is a + // non-empty directory, so we produce the appropriate error. + return NotEmpty(dest); + } + return ExceptionToStatus(exception, "Failed to rename '", src.all, "' to '", + dest.all, "': ", src_adlfs_client.GetUrl()); + } + return Status::OK(); + } + + Status MovePathUsingBlobsAPI(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && !src.path.empty()); + DCHECK(!dest.container.empty() && !dest.path.empty()); + if (src.container != dest.container) { + ARROW_ASSIGN_OR_RAISE(auto src_file_info, GetFileInfoOfPathWithinContainer(src)); + if (src_file_info.type() == FileType::NotFound) { + return PathNotFound(src); + } + return CrossContainerMoveNotImplemented(src, dest); + } + return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + } + + Status MovePath(const AzureLocation& src, const AzureLocation& dest) { + DCHECK(!src.container.empty() && !src.path.empty()); + DCHECK(!dest.container.empty() && !dest.path.empty()); + auto src_adlfs_client = GetFileSystemClient(src.container); + ARROW_ASSIGN_OR_RAISE(auto hns_support, + HierarchicalNamespaceSupport(src_adlfs_client)); + if (hns_support == HNSSupport::kContainerNotFound) { + return PathNotFound(src); + } + if (hns_support == HNSSupport::kEnabled) { + return MovePathWithDataLakeAPI(src_adlfs_client, src, dest); + } + DCHECK_EQ(hns_support, HNSSupport::kDisabled); + return MovePathUsingBlobsAPI(src, dest); + } + Status CopyFile(const AzureLocation& src, const AzureLocation& dest) { RETURN_NOT_OK(ValidateFileLocation(src)); RETURN_NOT_OK(ValidateFileLocation(dest)); if (src == dest) { return Status::OK(); } - auto dest_blob_client = blob_service_client_->GetBlobContainerClient(dest.container) - .GetBlobClient(dest.path); - auto src_url = blob_service_client_->GetBlobContainerClient(src.container) - .GetBlobClient(src.path) - .GetUrl(); + auto dest_blob_client = GetBlobClient(dest.container, dest.path); + auto src_url = GetBlobClient(src.container, src.path).GetUrl(); try { dest_blob_client.CopyFromUri(src_url); } catch (const Storage::StorageException& exception) { @@ -1731,6 +2388,9 @@ class AzureFileSystem::Impl { } }; +std::atomic LeaseGuard::latest_known_expiry_time_ = + SteadyClock::time_point{SteadyClock::duration::zero()}; + AzureFileSystem::AzureFileSystem(std::unique_ptr&& impl) : FileSystem(impl->io_context()), impl_(std::move(impl)) { default_async_is_sync_ = false; @@ -1772,19 +2432,7 @@ Result AzureFileSystem::GetFileInfo(const std::string& path) { auto container_client = impl_->GetBlobContainerClient(location.container); return GetContainerPropsAsFileInfo(location, container_client); } - // There is a path to search within the container. Check HNS support to proceed. - auto adlfs_client = impl_->GetFileSystemClient(location.container); - ARROW_ASSIGN_OR_RAISE(auto hns_support, - impl_->HierarchicalNamespaceSupport(adlfs_client)); - if (hns_support == HNSSupport::kContainerNotFound) { - return FileInfo{location.all, FileType::NotFound}; - } - if (hns_support == HNSSupport::kEnabled) { - return impl_->GetFileInfo(adlfs_client, location); - } - DCHECK_EQ(hns_support, HNSSupport::kDisabled); - auto container_client = impl_->GetBlobContainerClient(location.container); - return impl_->GetFileInfo(container_client, location); + return impl_->GetFileInfoOfPathWithinContainer(location); } Result AzureFileSystem::GetFileInfo(const FileSelector& select) { @@ -1900,7 +2548,24 @@ Status AzureFileSystem::DeleteFile(const std::string& path) { } Status AzureFileSystem::Move(const std::string& src, const std::string& dest) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto src_location, AzureLocation::FromString(src)); + ARROW_ASSIGN_OR_RAISE(auto dest_location, AzureLocation::FromString(dest)); + if (src_location.container.empty()) { + return Status::Invalid("Move requires a non-empty source path."); + } + if (dest_location.container.empty()) { + return Status::Invalid("Move requires a non-empty destination path."); + } + if (src_location.path.empty()) { + if (dest_location.path.empty()) { + return impl_->RenameContainer(src_location, dest_location); + } + return impl_->MoveContainerToPath(src_location, dest_location); + } + if (dest_location.path.empty()) { + return impl_->CreateContainerFromPath(src_location, dest_location); + } + return impl_->MovePath(src_location, dest_location); } Status AzureFileSystem::CopyFile(const std::string& src, const std::string& dest) { diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 55f89ba4776e2..2a131e40c05bf 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -210,6 +210,25 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem { Status DeleteFile(const std::string& path) override; + /// \brief Move / rename a file or directory. + /// + /// There are no files immediately at the root directory, so paths like + /// "/segment" always refer to a container of the storage account and are + /// treated as directories. + /// + /// If `dest` exists but the operation fails for some reason, `Move` + /// guarantees `dest` is not lost. + /// + /// Conditions for a successful move: + /// 1. `src` must exist. + /// 2. `dest` can't contain a strict path prefix of `src`. More generally, + /// a directory can't be made a subdirectory of itself. + /// 3. If `dest` already exists and it's a file, `src` must also be a file. + /// `dest` is then replaced by `src`. + /// 4. All components of `dest` must exist, except for the last. + /// 5. If `dest` already exists and it's a directory, `src` must also be a + /// directory and `dest` must be empty. `dest` is then replaced by `src` + /// and its contents. Status Move(const std::string& src, const std::string& dest) override; Status CopyFile(const std::string& src, const std::string& dest) override; diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 4d123028ea86e..c39a5b7d22bdd 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -50,12 +50,15 @@ #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/test_util.h" #include "arrow/result.h" +#include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" +#include "arrow/util/pcg_random.h" #include "arrow/util/string.h" +#include "arrow/util/unreachable.h" #include "arrow/util/value_parsing.h" namespace arrow { @@ -335,7 +338,7 @@ TEST(AzureFileSystem, OptionsCompare) { struct PreexistingData { public: - using RNG = std::mt19937_64; + using RNG = random::pcg32_fast; public: const std::string container_name; @@ -354,7 +357,10 @@ culpa qui officia deserunt mollit anim id est laborum. explicit PreexistingData(RNG& rng) : container_name{RandomContainerName(rng)} {} // Creates a path by concatenating the container name and the stem. - std::string ContainerPath(std::string_view stem) const { + std::string ContainerPath(std::string_view stem) const { return Path(stem); } + + // Short alias to ContainerPath() + std::string Path(std::string_view stem) const { return ConcatAbstractPath(container_name, stem); } @@ -391,7 +397,7 @@ culpa qui officia deserunt mollit anim id est laborum. class TestAzureFileSystem : public ::testing::Test { protected: // Set in constructor - std::mt19937_64 rng_; + random::pcg32_fast rng_; // Set in SetUp() int64_t debug_log_start_ = 0; @@ -477,18 +483,41 @@ class TestAzureFileSystem : public ::testing::Test { Blobs::BlobContainerClient CreateContainer(const std::string& name) { auto container_client = blob_service_client_->GetBlobContainerClient(name); - (void)container_client.CreateIfNotExists(); + ARROW_UNUSED(container_client.CreateIfNotExists()); return container_client; } + DataLake::DataLakeFileSystemClient CreateFilesystem(const std::string& name) { + auto adlfs_client = datalake_service_client_->GetFileSystemClient(name); + ARROW_UNUSED(adlfs_client.CreateIfNotExists()); + return adlfs_client; + } + Blobs::BlobClient CreateBlob(Blobs::BlobContainerClient& container_client, const std::string& name, const std::string& data = "") { auto blob_client = container_client.GetBlockBlobClient(name); - (void)blob_client.UploadFrom(reinterpret_cast(data.data()), - data.size()); + ARROW_UNUSED(blob_client.UploadFrom(reinterpret_cast(data.data()), + data.size())); return blob_client; } + DataLake::DataLakeFileClient CreateFile( + DataLake::DataLakeFileSystemClient& filesystem_client, const std::string& name, + const std::string& data = "") { + auto file_client = filesystem_client.GetFileClient(name); + ARROW_UNUSED(file_client.UploadFrom(reinterpret_cast(data.data()), + data.size())); + return file_client; + } + + DataLake::DataLakeDirectoryClient CreateDirectory( + DataLake::DataLakeFileSystemClient& adlfs_client, const std::string& name) { + EXPECT_TRUE(WithHierarchicalNamespace()); + auto dir_client = adlfs_client.GetDirectoryClient(name); + dir_client.Create(); + return dir_client; + } + Blobs::Models::BlobProperties GetBlobProperties(const std::string& container_name, const std::string& blob_name) { return blob_service_client_->GetBlobContainerClient(container_name) @@ -507,8 +536,13 @@ class TestAzureFileSystem : public ::testing::Test { PreexistingData SetUpPreexistingData() { PreexistingData data(rng_); - auto container_client = CreateContainer(data.container_name); - CreateBlob(container_client, data.kObjectName, PreexistingData::kLoremIpsum); + if (WithHierarchicalNamespace()) { + auto filesystem_client = CreateFilesystem(data.container_name); + CreateFile(filesystem_client, data.kObjectName, PreexistingData::kLoremIpsum); + } else { + auto container_client = CreateContainer(data.container_name); + CreateBlob(container_client, data.kObjectName, PreexistingData::kLoremIpsum); + } return data; } @@ -854,6 +888,393 @@ class TestAzureFileSystem : public ::testing::Test { const auto directory_path = data.RandomDirectoryPath(rng_); ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false)); } + + private: + using StringMatcher = + ::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher>; + + StringMatcher HasDirMoveToSubdirMessage(const std::string& src, + const std::string& dest) { + return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + src + + "' a sub-directory of itself."); + } + + StringMatcher HasCrossContainerNotImplementedMessage(const std::string& container_name, + const std::string& dest) { + return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest + + "' requires moving data between " + "containers, which is not implemented."); + } + + StringMatcher HasMissingParentDirMessage(const std::string& dest) { + return ::testing::HasSubstr("The parent directory of the destination path '" + dest + + "' does not exist."); + } + + /// \brief Expected POSIX semantics for the rename operation on multiple + /// scenarios. + /// + /// If the src doesn't exist, the error is always ENOENT, otherwise we are + /// left with the following combinations: + /// + /// 1. src's type + /// a. File + /// b. Directory + /// 2. dest's existence + /// a. NotFound + /// b. File + /// c. Directory + /// - empty + /// - non-empty + /// 3. src path has a trailing slash (or not) + /// 4. dest path has a trailing slash (or not) + /// + /// Limitations: this function doesn't consider paths so it assumes that the + /// paths don't lead requests for moves that would make the source a subdir of + /// the destination. + /// + /// \param paths_are_equal src and dest paths without trailing slashes are equal + /// \return std::nullopt if success is expected in the scenario or the errno + /// if failure is expected. + static std::optional RenameSemantics(FileType src_type, bool src_trailing_slash, + FileType dest_type, bool dest_trailing_slash, + bool dest_is_empty_dir = false, + bool paths_are_equal = false) { + DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown); + DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory) + << "dest_is_empty_dir must imply dest_type == FileType::Directory"; + switch (src_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + return {ENOENT}; + case FileType::File: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + if (src_trailing_slash) { + return {ENOTDIR}; + } + if (dest_trailing_slash) { + // A slash on the destination path requires that it exists, + // so a confirmation that it's a directory can be performed. + return {ENOENT}; + } + return {}; + case FileType::File: + if (src_trailing_slash || dest_trailing_slash) { + return {ENOTDIR}; + } + // The existing file is replaced successfuly. + return {}; + case FileType::Directory: + if (src_trailing_slash) { + return {ENOTDIR}; + } + return EISDIR; + } + break; + case FileType::Directory: + switch (dest_type) { + case FileType::Unknown: + break; + case FileType::NotFound: + // We don't have to care about the slashes when the source is a directory. + return {}; + case FileType::File: + return {ENOTDIR}; + case FileType::Directory: + if (!paths_are_equal && !dest_is_empty_dir) { + return {ENOTEMPTY}; + } + return {}; + } + break; + } + Unreachable("Invalid parameters passed to RenameSemantics"); + } + + Status CheckExpectedErrno(const std::string& src, const std::string& dest, + std::optional expected_errno, + const char* expected_errno_name, FileInfo* out_src_info) { + auto the_fs = fs(); + const bool src_trailing_slash = internal::HasTrailingSlash(src); + const bool dest_trailing_slash = internal::HasTrailingSlash(dest); + const auto src_path = std::string{internal::RemoveTrailingSlash(src)}; + const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)}; + ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path)); + bool dest_is_empty_dir = false; + if (dest_info.type() == FileType::Directory) { + FileSelector select; + select.base_dir = dest_path; + select.recursive = false; + // TODO(ARROW-40014): investigate why this can't be false here + select.allow_not_found = true; + ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select)); + if (dest_contents.empty()) { + dest_is_empty_dir = true; + } + } + auto paths_are_equal = src_path == dest_path; + auto truly_expected_errno = + RenameSemantics(out_src_info->type(), src_trailing_slash, dest_info.type(), + dest_trailing_slash, dest_is_empty_dir, paths_are_equal); + if (truly_expected_errno != expected_errno) { + if (expected_errno.has_value()) { + return Status::Invalid("expected_errno=", expected_errno_name, "=", + *expected_errno, + " used in ASSERT_MOVE is incorrect. " + "POSIX semantics for this scenario require errno=", + strerror(truly_expected_errno.value_or(0))); + } else { + DCHECK(truly_expected_errno.has_value()); + return Status::Invalid( + "ASSERT_MOVE used to assert success in a scenario for which " + "POSIX semantics requires errno=", + strerror(*truly_expected_errno)); + } + } + return Status::OK(); + } + + void AssertAfterMove(const std::string& src, const std::string& dest, FileType type) { + if (internal::RemoveTrailingSlash(src) != internal::RemoveTrailingSlash(dest)) { + AssertFileInfo(fs(), src, FileType::NotFound); + } + AssertFileInfo(fs(), dest, type); + } + + static bool WithErrno(const Status& status, int expected_errno) { + auto* detail = status.detail().get(); + return detail && + arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == expected_errno; + } + + std::optional MoveErrorMessageMatcher(const FileInfo& src_info, + const std::string& src, + const std::string& dest, + int for_errno) { + switch (for_errno) { + case ENOENT: { + auto& path = src_info.type() == FileType::NotFound ? src : dest; + return ::testing::HasSubstr("Path does not exist '" + path + "'"); + } + case ENOTEMPTY: + return ::testing::HasSubstr("Directory not empty: '" + dest + "'"); + } + return std::nullopt; + } + +#define ASSERT_MOVE(src, dest, expected_errno) \ + do { \ + auto _src = (src); \ + auto _dest = (dest); \ + std::optional _expected_errno = (expected_errno); \ + FileInfo _src_info; \ + ASSERT_OK( \ + CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, &_src_info)); \ + auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, _dest)); \ + if (_expected_errno.has_value()) { \ + if (WithErrno(_move_st, *_expected_errno)) { \ + /* If the Move failed, the source should remain unchanged. */ \ + AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, \ + _src_info.type()); \ + auto _message_matcher = \ + MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); \ + if (_message_matcher.has_value()) { \ + EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, _move_st); \ + } else { \ + SUCCEED(); \ + } \ + } else { \ + FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) \ + << "' did not fail with errno=" << #expected_errno; \ + } \ + } else { \ + if (!_move_st.ok()) { \ + FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) \ + << "' failed with " << _move_st.ToString(); \ + } else { \ + AssertAfterMove(_src, _dest, _src_info.type()); \ + } \ + } \ + } while (false) + +#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt) + + // Tests for Move() + + public: + void TestRenameContainer() { + EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv()); + auto data = SetUpPreexistingData(); + // Container exists, so renaming to the same name succeeds because it's a no-op. + ASSERT_MOVE_OK(data.container_name, data.container_name); + // Renaming a container that doesn't exist fails. + ASSERT_MOVE("missing-container", "missing-container", ENOENT); + ASSERT_MOVE("missing-container", data.container_name, ENOENT); + // Renaming a container to an existing non-empty container fails. + auto non_empty_container = PreexistingData::RandomContainerName(rng_); + auto non_empty_container_client = CreateContainer(non_empty_container); + CreateBlob(non_empty_container_client, "object1", PreexistingData::kLoremIpsum); + ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY); + // Renaming to an empty container fails to replace it + auto empty_container = PreexistingData::RandomContainerName(rng_); + auto empty_container_client = CreateContainer(empty_container); + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr("Unable to replace empty container: '" + empty_container + + "'"), + fs()->Move(data.container_name, empty_container)); + // Renaming to a non-existing container creates it + auto missing_container = PreexistingData::RandomContainerName(rng_); + AssertFileInfo(fs(), missing_container, FileType::NotFound); + if (env->backend() == AzureBackend::kAzurite) { + // Azurite returns a 201 Created for RenameBlobContainer, but the created + // container doesn't contain the blobs from the source container and + // the source container remains undeleted after the "rename". + } else { + // See Azure SDK issue/question: + // https://github.com/Azure/azure-sdk-for-cpp/issues/5262 + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr("The 'rename' operation is not supported on containers."), + fs()->Move(data.container_name, missing_container)); + // ASSERT_MOVE_OK(data.container_name, missing_container); + // AssertFileInfo(fs(), + // ConcatAbstractPath(missing_container, + // PreexistingData::kObjectName), FileType::File); + } + // Renaming to an empty container can work if the source is also empty + auto new_empty_container = PreexistingData::RandomContainerName(rng_); + auto new_empty_container_client = CreateContainer(new_empty_container); + ASSERT_MOVE_OK(empty_container, new_empty_container); + } + + void TestMoveContainerToPath() { + auto data = SetUpPreexistingData(); + ASSERT_MOVE("missing-container", data.ContainerPath("new-subdir"), ENOENT); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + HasDirMoveToSubdirMessage(data.container_name, data.ContainerPath("new-subdir")), + fs()->Move(data.container_name, data.ContainerPath("new-subdir"))); + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(data.container_name, + "missing-container/new-subdir"), + fs()->Move(data.container_name, "missing-container/new-subdir")); + } + + void TestCreateContainerFromPath() { + auto data = SetUpPreexistingData(); + auto missing_path = data.RandomDirectoryPath(rng_); + ASSERT_MOVE(missing_path, "new-container", ENOENT); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::HasSubstr("Creating files at '/' is not possible, only directories."), + fs()->Move(data.ObjectPath(), "new-file")); + auto src_dir_path = data.RandomDirectoryPath(rng_); + ASSERT_OK(fs()->CreateDir(src_dir_path, false)); + EXPECT_OK_AND_ASSIGN(auto src_dir_info, fs()->GetFileInfo(src_dir_path)); + EXPECT_EQ(src_dir_info.type(), FileType::Directory); + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(src_dir_path, "new-container"), + fs()->Move(src_dir_path, "new-container")); + } + + void TestMovePath() { + Status st; + auto data = SetUpPreexistingData(); + // When source doesn't exist. + ASSERT_MOVE("missing-container/src-path", data.ContainerPath("dest-path"), ENOENT); + auto missing_path1 = data.RandomDirectoryPath(rng_); + ASSERT_MOVE(missing_path1, "missing-container/path", ENOENT); + + // But when source exists... + if (!WithHierarchicalNamespace()) { + // ...and containers are different, we get an error message telling cross-container + // moves are not implemented. + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + HasCrossContainerNotImplementedMessage(data.ObjectPath(), + "missing-container/path"), + fs()->Move(data.ObjectPath(), "missing-container/path")); + GTEST_SKIP() << "The rest of TestMovePath is not implemented for non-HNS scenarios"; + } + auto adlfs_client = + datalake_service_client_->GetFileSystemClient(data.container_name); + // ...and dest.container doesn't exist. + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, HasMissingParentDirMessage("missing-container/path"), + fs()->Move(data.ObjectPath(), "missing-container/path")); + AssertFileInfo(fs(), data.ObjectPath(), FileType::File); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + IOError, HasMissingParentDirMessage(data.Path("missing-subdir/file")), + fs()->Move(data.ObjectPath(), data.Path("missing-subdir/file"))); + AssertFileInfo(fs(), data.ObjectPath(), FileType::File); + + // src is a file and dest does not exists + ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0")); + ASSERT_MOVE(data.Path("file0/"), data.Path("file1"), ENOTDIR); + ASSERT_MOVE(data.Path("file0"), data.Path("file1/"), ENOENT); + ASSERT_MOVE(data.Path("file0/"), data.Path("file1/"), ENOTDIR); + // "file0" exists + + // src is a file and dest exists (as a file) + CreateFile(adlfs_client, PreexistingData::kObjectName, PreexistingData::kLoremIpsum); + CreateFile(adlfs_client, "file1", PreexistingData::kLoremIpsum); + ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0")); + ASSERT_MOVE(data.Path("file1/"), data.Path("file0"), ENOTDIR); + ASSERT_MOVE(data.Path("file1"), data.Path("file0/"), ENOTDIR); + ASSERT_MOVE(data.Path("file1/"), data.Path("file0/"), ENOTDIR); + // "file0" and "file1" exist + + // src is a file and dest exists (as an empty dir) + CreateDirectory(adlfs_client, "subdir0"); + ASSERT_MOVE(data.Path("file0"), data.Path("subdir0"), EISDIR); + ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0"), ENOTDIR); + ASSERT_MOVE(data.Path("file0"), data.Path("subdir0/"), EISDIR); + ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0/"), ENOTDIR); + + // src is a file and dest exists (as a non-empty dir) + CreateFile(adlfs_client, "subdir0/file-at-subdir"); + ASSERT_MOVE(data.Path("file0"), data.Path("subdir0"), EISDIR); + ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0"), ENOTDIR); + ASSERT_MOVE(data.Path("file0"), data.Path("subdir0/"), EISDIR); + ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0/"), ENOTDIR); + // "subdir0/file-at-subdir" exists + + // src is a directory and dest does not exists + ASSERT_MOVE_OK(data.Path("subdir0"), data.Path("subdir1")); + ASSERT_MOVE_OK(data.Path("subdir1/"), data.Path("subdir2")); + ASSERT_MOVE_OK(data.Path("subdir2"), data.Path("subdir3/")); + ASSERT_MOVE_OK(data.Path("subdir3/"), data.Path("subdir4/")); + AssertFileInfo(fs(), data.Path("subdir4/file-at-subdir"), FileType::File); + // "subdir4/file-at-subdir" exists + + // src is a directory and dest exists as an empty directory + CreateDirectory(adlfs_client, "subdir0"); + CreateDirectory(adlfs_client, "subdir1"); + CreateDirectory(adlfs_client, "subdir2"); + CreateDirectory(adlfs_client, "subdir3"); + ASSERT_MOVE_OK(data.Path("subdir4"), data.Path("subdir0")); + ASSERT_MOVE_OK(data.Path("subdir0/"), data.Path("subdir1")); + ASSERT_MOVE_OK(data.Path("subdir1"), data.Path("subdir2/")); + ASSERT_MOVE_OK(data.Path("subdir2/"), data.Path("subdir3/")); + AssertFileInfo(fs(), data.Path("subdir3/file-at-subdir"), FileType::File); + // "subdir3/file-at-subdir" exists + + // src is directory and dest exists as a non-empty directory + CreateDirectory(adlfs_client, "subdir0"); + ASSERT_MOVE(data.Path("subdir0"), data.Path("subdir3"), ENOTEMPTY); + ASSERT_MOVE(data.Path("subdir0/"), data.Path("subdir3"), ENOTEMPTY); + ASSERT_MOVE(data.Path("subdir0"), data.Path("subdir3/"), ENOTEMPTY); + ASSERT_MOVE(data.Path("subdir0/"), data.Path("subdir3/"), ENOTEMPTY); + } }; void TestAzureFileSystem::TestDetectHierarchicalNamespace(bool trip_up_azurite) { @@ -940,10 +1361,9 @@ void TestAzureFileSystem::TestGetFileInfoObjectWithNestedStructure() { FileType::NotFound); if (WithHierarchicalNamespace()) { - datalake_service_client_->GetFileSystemClient(data.container_name) - .GetDirectoryClient("test-empty-object-dir") - .Create(); - + auto adlfs_client = + datalake_service_client_->GetFileSystemClient(data.container_name); + CreateDirectory(adlfs_client, "test-empty-object-dir"); AssertFileInfo(fs(), data.ContainerPath("test-empty-object-dir"), FileType::Directory); } @@ -1108,6 +1528,20 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirContentsFailureNonexisten this->TestDeleteDirContentsFailureNonexistent(); } +TYPED_TEST(TestAzureFileSystemOnAllScenarios, RenameContainer) { + this->TestRenameContainer(); +} + +TYPED_TEST(TestAzureFileSystemOnAllScenarios, MoveContainerToPath) { + this->TestMoveContainerToPath(); +} + +TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateContainerFromPath) { + this->TestCreateContainerFromPath(); +} + +TYPED_TEST(TestAzureFileSystemOnAllScenarios, MovePath) { this->TestMovePath(); } + // Tests using Azurite (the local Azure emulator) TEST_F(TestAzuriteFileSystem, GetFileInfoSelector) { @@ -1384,9 +1818,14 @@ TEST_F(TestAzuriteFileSystem, DeleteDirContentsFailureNonexistent) { TEST_F(TestAzuriteFileSystem, DeleteFileSuccess) { const auto container_name = PreexistingData::RandomContainerName(rng_); - ASSERT_OK(fs()->CreateDir(container_name)); - const auto file_name = ConcatAbstractPath(container_name, "abc"); - CreateFile(fs(), file_name, "data"); + const auto file_name = ConcatAbstractPath(container_name, "filename"); + if (WithHierarchicalNamespace()) { + auto adlfs_client = CreateFilesystem(container_name); + CreateFile(adlfs_client, "filename", "data"); + } else { + auto container = CreateContainer(container_name); + CreateBlob(container, "filename", "data"); + } arrow::fs::AssertFileInfo(fs(), file_name, FileType::File); ASSERT_OK(fs()->DeleteFile(file_name)); arrow::fs::AssertFileInfo(fs(), file_name, FileType::NotFound); @@ -1394,24 +1833,38 @@ TEST_F(TestAzuriteFileSystem, DeleteFileSuccess) { TEST_F(TestAzuriteFileSystem, DeleteFileFailureNonexistent) { const auto container_name = PreexistingData::RandomContainerName(rng_); - ASSERT_OK(fs()->CreateDir(container_name)); const auto nonexistent_file_name = ConcatAbstractPath(container_name, "nonexistent"); + if (WithHierarchicalNamespace()) { + ARROW_UNUSED(CreateFilesystem(container_name)); + } else { + ARROW_UNUSED(CreateContainer(container_name)); + } ASSERT_RAISES(IOError, fs()->DeleteFile(nonexistent_file_name)); } TEST_F(TestAzuriteFileSystem, DeleteFileFailureContainer) { const auto container_name = PreexistingData::RandomContainerName(rng_); - ASSERT_OK(fs()->CreateDir(container_name)); + if (WithHierarchicalNamespace()) { + ARROW_UNUSED(CreateFilesystem(container_name)); + } else { + ARROW_UNUSED(CreateContainer(container_name)); + } arrow::fs::AssertFileInfo(fs(), container_name, FileType::Directory); ASSERT_RAISES(IOError, fs()->DeleteFile(container_name)); } TEST_F(TestAzuriteFileSystem, DeleteFileFailureDirectory) { - const auto directory_name = - ConcatAbstractPath(PreexistingData::RandomContainerName(rng_), "directory"); - ASSERT_OK(fs()->CreateDir(directory_name)); - arrow::fs::AssertFileInfo(fs(), directory_name, FileType::Directory); - ASSERT_RAISES(IOError, fs()->DeleteFile(directory_name)); + auto container_name = PreexistingData::RandomContainerName(rng_); + if (WithHierarchicalNamespace()) { + auto adlfs_client = CreateFilesystem(container_name); + CreateDirectory(adlfs_client, "directory"); + } else { + auto container = CreateContainer(container_name); + CreateBlob(container, "directory/"); + } + auto directory_path = ConcatAbstractPath(container_name, "directory"); + arrow::fs::AssertFileInfo(fs(), directory_path, FileType::Directory); + ASSERT_RAISES(IOError, fs()->DeleteFile(directory_path)); } TEST_F(TestAzuriteFileSystem, CopyFileSuccessDestinationNonexistent) { @@ -1542,7 +1995,7 @@ std::shared_ptr NormalizerKeyValueMetadata( auto value = metadata->value(i); if (key == "Content-Hash") { std::vector output; - output.reserve(value.size() / 2); + output.resize(value.size() / 2); if (ParseHexValues(value, output.data()).ok()) { // Valid value value = std::string(value.size(), 'F'); diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc index 13f43d45db6c1..8747f9683b90f 100644 --- a/cpp/src/arrow/filesystem/util_internal.cc +++ b/cpp/src/arrow/filesystem/util_internal.cc @@ -64,11 +64,21 @@ Status PathNotFound(std::string_view path) { .WithDetail(StatusDetailFromErrno(ENOENT)); } +Status IsADir(std::string_view path) { + return Status::IOError("Is a directory: '", path, "'") + .WithDetail(StatusDetailFromErrno(EISDIR)); +} + Status NotADir(std::string_view path) { return Status::IOError("Not a directory: '", path, "'") .WithDetail(StatusDetailFromErrno(ENOTDIR)); } +Status NotEmpty(std::string_view path) { + return Status::IOError("Directory not empty: '", path, "'") + .WithDetail(StatusDetailFromErrno(ENOTEMPTY)); +} + Status NotAFile(std::string_view path) { return Status::IOError("Not a regular file: '", path, "'"); } diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h index 29a51512d0aa2..96cc5178a9f31 100644 --- a/cpp/src/arrow/filesystem/util_internal.h +++ b/cpp/src/arrow/filesystem/util_internal.h @@ -43,9 +43,15 @@ Status CopyStream(const std::shared_ptr& src, ARROW_EXPORT Status PathNotFound(std::string_view path); +ARROW_EXPORT +Status IsADir(std::string_view path); + ARROW_EXPORT Status NotADir(std::string_view path); +ARROW_EXPORT +Status NotEmpty(std::string_view path); + ARROW_EXPORT Status NotAFile(std::string_view path); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 751ef28d415e0..b693336e09921 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -449,6 +449,13 @@ std::shared_ptr StatusDetailFromErrno(int errnum) { return std::make_shared(errnum); } +std::optional ErrnoFromStatusDetail(const StatusDetail& detail) { + if (detail.type_id() == kErrnoDetailTypeId) { + return checked_cast(detail).errnum(); + } + return std::nullopt; +} + #if _WIN32 std::shared_ptr StatusDetailFromWinError(int errnum) { if (!errnum) { diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 113b1bdd93103..bba71c0d80ad4 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -264,6 +265,8 @@ std::string WinErrorMessage(int errnum); ARROW_EXPORT std::shared_ptr StatusDetailFromErrno(int errnum); +ARROW_EXPORT +std::optional ErrnoFromStatusDetail(const StatusDetail& detail); #if _WIN32 ARROW_EXPORT std::shared_ptr StatusDetailFromWinError(int errnum);