From 7402412fb0245d19b865b2f14c8ec637bfeb6367 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 1 Oct 2023 17:18:28 +0100 Subject: [PATCH] First test builds successfully --- cpp/src/arrow/filesystem/azurefs.cc | 150 ++++++++------------- cpp/src/arrow/filesystem/azurefs_test.cc | 159 +++++++++++++---------- 2 files changed, 146 insertions(+), 163 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 0ffc2c17d03a8..12c325415762c 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "arrow/buffer.h" @@ -26,12 +27,11 @@ #include "arrow/filesystem/util_internal.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" -#include "arrow/util/future.h" #include "arrow/util/string.h" - namespace arrow { namespace fs { @@ -81,7 +81,8 @@ struct AzurePath { // Expected input here => s = synapsemlfs/testdir/testfile.txt, // http://127.0.0.1/accountName/pathToBlob auto src = internal::RemoveTrailingSlash(s); - if (arrow::internal::StartsWith(src, "https://127.0.0.1") || arrow::internal::StartsWith(src, "http://127.0.0.1")) { + if (arrow::internal::StartsWith(src, "https://127.0.0.1") || + arrow::internal::StartsWith(src, "http://127.0.0.1")) { RETURN_NOT_OK(FromLocalHostString(&src)); } auto input_path = std::string(src.data()); @@ -174,11 +175,9 @@ std::shared_ptr GetObjectMetadata(const ObjectResult& re class ObjectInputFile final : public io::RandomAccessFile { public: ObjectInputFile( - std::shared_ptr& path_client, std::shared_ptr& file_client, const io::IOContext& io_context, const AzurePath& path, int64_t size = kNoSize) - : path_client_(std::move(path_client)), - file_client_(std::move(file_client)), + : file_client_(std::move(file_client)), io_context_(io_context), path_(path), content_length_(size) {} @@ -189,7 +188,7 @@ class ObjectInputFile final : public io::RandomAccessFile { return Status::OK(); } try { - auto properties = path_client_->GetProperties(); + auto properties = file_client_->GetProperties(); if (properties.Value.IsDirectory) { return ::arrow::fs::internal::NotAFile(path_.full_path); } @@ -225,12 +224,11 @@ class ObjectInputFile final : public io::RandomAccessFile { } Future> ReadMetadataAsync( - const io::IOContext& io_context) override { + const io::IOContext& io_context) override { return metadata_; } Status Close() override { - path_client_ = nullptr; file_client_ = nullptr; closed_ = true; return Status::OK(); @@ -312,7 +310,6 @@ class ObjectInputFile final : public io::RandomAccessFile { } protected: - std::shared_ptr path_client_; std::shared_ptr file_client_; const io::IOContext io_context_; AzurePath path_; @@ -331,108 +328,73 @@ class ObjectInputFile final : public io::RandomAccessFile { class AzureFileSystem::Impl { public: io::IOContext io_context_; - bool is_hierarchical_namespace_enabled_; + std::shared_ptr service_client_; AzureOptions options_; explicit Impl(AzureOptions options, io::IOContext io_context) : io_context_(io_context), options_(std::move(options)) {} Status Init() { - // TODO: GH-18014 Delete this once we have a proper implementation. This just - // initializes a pointless Azure blob service client with a fake endpoint to ensure - // the build will fail if the Azure SDK build is broken. - auto default_credential = std::make_shared(); - auto service_client = Azure::Storage::Blobs::BlobServiceClient( - "http://fake-blob-storage-endpoint", default_credential); - if (options_.backend == AzureBackend::Azurite) { - // gen1Client_->GetAccountInfo().Value.IsHierarchicalNamespaceEnabled - // throws error in azurite - is_hierarchical_namespace_enabled_ = false; - } + service_client_ = + std::make_shared( + options_.account_dfs_url, options_.storage_credentials_provider); return Status::OK(); } const AzureOptions& options() const { return options_; } Result> OpenInputFile(const std::string& s, - AzureBlobFileSystem* fs) { + AzureFileSystem* fs) { ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); if (path.empty()) { return ::arrow::fs::internal::PathNotFound(path.full_path); } - if (!is_hierarchical_namespace_enabled_) { - if (path.path_to_file_parts.size() > 1) { - return Status::IOError( - "Invalid Azure Blob Storage path provided," - " hierarchical namespace not enabled in storage account"); - } - } - ARROW_ASSIGN_OR_RAISE(auto response, FileExists(dfs_endpoint_url_ + path.full_path)); - if (!response) { - return ::arrow::fs::internal::PathNotFound(path.full_path); - } - std::shared_ptr path_client; - ARROW_ASSIGN_OR_RAISE( - path_client, InitPathClient( - options_, dfs_endpoint_url_ + path.full_path, path.container, - path.path_to_file)); - - std::shared_ptr file_client; - ARROW_ASSIGN_OR_RAISE( - file_client, InitPathClient( - options_, dfs_endpoint_url_ + path.full_path, path.container, - path.path_to_file)); - - auto ptr = std::make_shared(path_client, file_client, - fs->io_context(), path); + + // TODO: Wrap this is a try catch and return + // `::arrow::fs::internal::NotAFile(blob_client.GetUrl());` if it fails because the + // file does not exist. + auto file_client = + std::make_shared( + std::move(service_client_->GetFileSystemClient(path.container) + .GetFileClient(path.path_to_file))); + // auto file_client = + // std::make_shared(options_.account_dfs_url, + // path.container, path.path_to_file, options_.storage_credentials_provider); + + auto ptr = std::make_shared(file_client, fs->io_context(), path); RETURN_NOT_OK(ptr->Init()); return ptr; } -}; - -Result> OpenInputFile(const FileInfo& info, - AzureBlobFileSystem* fs) { - if (info.type() == FileType::NotFound) { - return ::arrow::fs::internal::PathNotFound(info.path()); - } - if (info.type() != FileType::File && info.type() != FileType::Unknown) { - return ::arrow::fs::internal::NotAFile(info.path()); - } - - ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path())); - if (!is_hierarchical_namespace_enabled_) { - if (path.path_to_file_parts.size() > 1) { - return Status::IOError( - "Invalid Azure Blob Storage path provided, hierarchical namespace" - " not enabled in storage account"); - } - } - ARROW_ASSIGN_OR_RAISE(auto response, FileExists(dfs_endpoint_url_ + info.path())); - if (!response) { - return ::arrow::fs::internal::PathNotFound(info.path()); - } - std::shared_ptr path_client; - ARROW_ASSIGN_OR_RAISE( - path_client, - InitPathClient( - options_, dfs_endpoint_url_ + info.path(), path.container, path.path_to_file)); - - std::shared_ptr file_client; - ARROW_ASSIGN_OR_RAISE( - file_client, - InitPathClient( - options_, dfs_endpoint_url_ + info.path(), path.container, path.path_to_file)); - - auto ptr = std::make_shared(path_client, file_client, fs->io_context(), - path, info.size()); - RETURN_NOT_OK(ptr->Init()); - return ptr; -} - -protected: -AzureOptions options_; + // Result> OpenInputFile(const FileInfo& info, + // AzureFileSystem* fs) { + // if (info.type() == FileType::NotFound) { + // return ::arrow::fs::internal::PathNotFound(info.path()); + // } + // if (info.type() != FileType::File && info.type() != FileType::Unknown) { + // return ::arrow::fs::internal::NotAFile(info.path()); + // } + + // ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path())); + + // std::shared_ptr path_client; + // ARROW_ASSIGN_OR_RAISE( + // path_client, + // InitPathClient( + // options_, dfs_endpoint_url_ + info.path(), path.container, path.path_to_file)); + + // std::shared_ptr file_client; + // ARROW_ASSIGN_OR_RAISE( + // file_client, + // InitPathClient( + // options_, dfs_endpoint_url_ + info.path(), path.container, path.path_to_file)); + + // auto ptr = std::make_shared(path_client, file_client, fs->io_context(), + // path, info.size()); + // RETURN_NOT_OK(ptr->Init()); + // return ptr; + // } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -491,7 +453,8 @@ Result> AzureFileSystem::OpenInputStream( Result> AzureFileSystem::OpenInputStream( const FileInfo& info) { - return impl_->OpenInputFile(info, this); + // return impl_->OpenInputFile(info, this); + return Status::NotImplemented("The Azure FileSystem is not fully implemented"); } Result> AzureFileSystem::OpenInputFile( @@ -501,7 +464,8 @@ Result> AzureFileSystem::OpenInputFile( Result> AzureFileSystem::OpenInputFile( const FileInfo& info) { - return impl_->OpenInputFile(info, this); + return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + // return impl_->OpenInputFile(info, this); } Result> AzureFileSystem::OpenOutputStream( diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 486e9af730e85..2178381387924 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -40,6 +40,7 @@ #include #include +#include #include #include "arrow/testing/gtest_util.h" @@ -160,6 +161,8 @@ class TestAzureFileSystem : public ::testing::Test { std::shared_ptr fs_; std::shared_ptr gen2_client_; AzureOptions options_; + std::mt19937_64 generator_; + std::string container_name_; void MakeFileSystem() { const std::string& account_name = GetAzuriteEnv()->account_name(); @@ -177,17 +180,10 @@ class TestAzureFileSystem : public ::testing::Test { ASSERT_OK(GetAzuriteEnv()->status()); MakeFileSystem(); - auto file_system_client = gen2_client_->GetFileSystemClient("container"); + generator_ = std::mt19937_64(std::random_device()()); + container_name_ = RandomChars(32); + auto file_system_client = gen2_client_->GetFileSystemClient(container_name_); file_system_client.CreateIfNotExists(); - file_system_client = gen2_client_->GetFileSystemClient("empty-container"); - file_system_client.CreateIfNotExists(); - auto file_client = - std::make_shared( - options_.account_blob_url + "container/somefile", - options_.storage_credentials_provider); - std::string s = "some data"; - file_client->UploadFrom( - const_cast(reinterpret_cast(s.data())), s.size()); } void TearDown() override { @@ -230,68 +226,91 @@ class TestAzureFileSystem : public ::testing::Test { reinterpret_cast(expected.data()), expected.size()); AssertBufferEqual(*buf_data, *expected_data); } -}; -// TEST_F(TestAzureFileSystem, FromAccountKey) { -// auto options = AzureOptions::FromAccountKey(GetAzuriteEnv()->account_name(), -// GetAzuriteEnv()->account_key()) -// .ValueOrDie(); -// ASSERT_EQ(options.credentials_kind, -// arrow::fs::AzureCredentialsKind::StorageCredentials); -// ASSERT_NE(options.storage_credentials_provider, nullptr); -// } + std::string PreexistingContainerName() const { return container_name_; } -// TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) { -// auto fs = GcsFileSystem::Make(TestGcsOptions()); + std::string PreexistingContainerPath() const { + return PreexistingContainerName() + '/'; + } -// // Create a file large enough to make the random access tests non-trivial. -// auto constexpr kLineWidth = 100; -// auto constexpr kLineCount = 4096; -// std::vector lines(kLineCount); -// int lineno = 0; -// std::generate_n(lines.begin(), lines.size(), -// [&] { return RandomLine(++lineno, kLineWidth); }); + std::string RandomLine(int lineno, std::size_t width) { + auto line = std::to_string(lineno) + ": "; + line += RandomChars(width - line.size() - 1); + line += '\n'; + return line; + } -// const auto path = -// PreexistingBucketPath() + "OpenInputFileMixedReadVsReadAt/object-name"; -// std::shared_ptr output; -// ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); -// for (auto const& line : lines) { -// ASSERT_OK(output->Write(line.data(), line.size())); -// } -// ASSERT_OK(output->Close()); + uint8_t RandomInteger() { + return std::uniform_int_distribution()(generator_); + } -// std::shared_ptr file; -// ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path)); -// for (int i = 0; i != 32; ++i) { -// SCOPED_TRACE("Iteration " + std::to_string(i)); -// // Verify sequential reads work as expected. -// std::array buffer{}; -// std::int64_t size; -// { -// ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth)); -// EXPECT_EQ(lines[2 * i], actual->ToString()); -// } -// { -// ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data())); -// EXPECT_EQ(size, kLineWidth); -// auto actual = std::string{buffer.begin(), buffer.end()}; -// EXPECT_EQ(lines[2 * i + 1], actual); -// } - -// // Verify random reads interleave too. -// auto const index = RandomIndex(kLineCount); -// auto const position = index * kLineWidth; -// ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data())); -// EXPECT_EQ(size, kLineWidth); -// auto actual = std::string{buffer.begin(), buffer.end()}; -// EXPECT_EQ(lines[index], actual); - -// // Verify random reads using buffers work. -// ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth)); -// EXPECT_EQ(lines[index], b->ToString()); -// } -// } + std::size_t RandomIndex(std::size_t end) { + return std::uniform_int_distribution(0, end - 1)(generator_); + } + + std::string RandomChars(std::size_t count) { + auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789"); + std::uniform_int_distribution d(0, fillers.size() - 1); + std::string s; + std::generate_n(std::back_inserter(s), count, [&] { return fillers[d(generator_)]; }); + return s; + } +}; + +TEST_F(TestAzureFileSystem, OpenInputFileMixedReadVsReadAt) { + // Create a file large enough to make the random access tests non-trivial. + auto constexpr kLineWidth = 100; + auto constexpr kLineCount = 4096; + std::vector lines(kLineCount); + int lineno = 0; + std::generate_n(lines.begin(), lines.size(), + [&] { return RandomLine(++lineno, kLineWidth); }); + + const auto path_to_file = "OpenInputFileMixedReadVsReadAt/object-name"; + const auto path = PreexistingContainerPath() + path_to_file; + + // TODO: Switch to using Azure filesystem to write once its implemented. + auto file_client = gen2_client_->GetFileSystemClient("container").GetFileClient(path); + int64_t total_size = 0; + for (auto const& line : lines) { + auto bufferStream = Azure::Core::IO::MemoryBodyStream( + reinterpret_cast(line.data()), line.size()); + file_client.Append(bufferStream, 0); + total_size += line.size(); + } + file_client.Flush(total_size); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path)); + for (int i = 0; i != 32; ++i) { + SCOPED_TRACE("Iteration " + std::to_string(i)); + // Verify sequential reads work as expected. + std::array buffer{}; + std::int64_t size; + { + ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth)); + EXPECT_EQ(lines[2 * i], actual->ToString()); + } + { + ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data())); + EXPECT_EQ(size, kLineWidth); + auto actual = std::string{buffer.begin(), buffer.end()}; + EXPECT_EQ(lines[2 * i + 1], actual); + } + + // Verify random reads interleave too. + auto const index = RandomIndex(kLineCount); + auto const position = index * kLineWidth; + ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data())); + EXPECT_EQ(size, kLineWidth); + auto actual = std::string{buffer.begin(), buffer.end()}; + EXPECT_EQ(lines[index], actual); + + // Verify random reads using buffers work. + ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth)); + EXPECT_EQ(lines[index], b->ToString()); + } +} // TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) { // auto fs = GcsFileSystem::Make(TestGcsOptions()); @@ -304,7 +323,7 @@ class TestAzureFileSystem : public ::testing::Test { // std::generate_n(lines.begin(), lines.size(), // [&] { return RandomLine(++lineno, kLineWidth); }); -// const auto path = PreexistingBucketPath() + "OpenInputFileRandomSeek/object-name"; +// const auto path = PreexistingContainerPath() + "OpenInputFileRandomSeek/object-name"; // std::shared_ptr output; // ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); // for (auto const& line : lines) { @@ -329,7 +348,7 @@ class TestAzureFileSystem : public ::testing::Test { // auto fs = GcsFileSystem::Make(TestGcsOptions()); // // Create a test file. -// const auto path = PreexistingBucketPath() + "OpenInputFileIoContext/object-name"; +// const auto path = PreexistingContainerPath() + "OpenInputFileIoContext/object-name"; // std::shared_ptr output; // ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); // const std::string contents = "The quick brown fox jumps over the lazy dog"; @@ -369,7 +388,7 @@ class TestAzureFileSystem : public ::testing::Test { // auto fs = GcsFileSystem::Make(TestGcsOptions()); // arrow::fs::FileInfo info; -// ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingBucketPath())); +// ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingContainerPath())); // ASSERT_RAISES(IOError, fs->OpenInputFile(info)); // ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));