From 0a693c84723f1062a451442888ff6f94980d5995 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sat, 14 Oct 2023 18:17:09 +0100 Subject: [PATCH] Paste in input stream tests from gcsfd_test.cc --- cpp/src/arrow/filesystem/azurefs_test.cc | 154 +++++++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 7b09bf3256c5c..6398b7e6f1e68 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -288,6 +288,160 @@ class TestAzureFileSystem : public ::testing::Test { } }; +TEST_F(GcsIntegrationTest, OpenInputStreamString) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath())); + + std::array buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + + EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamStringBuffers) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath())); + + std::string contents; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16)); + contents.append(buffer->ToString()); + } while (buffer && buffer->size() != 0); + + EXPECT_EQ(contents, kLoremIpsum); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamInfo) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + arrow::fs::FileInfo info; + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath())); + + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(info)); + + std::array buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + + EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamEmpty) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + const auto object_path = + internal::ConcatAbstractPath(PreexistingBucketName(), "empty-object.txt"); + CreateFile(fs.get(), object_path, std::string()); + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(object_path)); + std::array buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + EXPECT_EQ(size, 0); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamNotFound) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + ASSERT_RAISES(IOError, fs->OpenInputStream(NotFoundObjectPath())); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamInfoInvalid) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + arrow::fs::FileInfo info; + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingBucketPath())); + ASSERT_RAISES(IOError, fs->OpenInputStream(info)); + + ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath())); + ASSERT_RAISES(IOError, fs->OpenInputStream(info)); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamUri) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + ASSERT_RAISES(Invalid, fs->OpenInputStream("gs://" + PreexistingObjectPath())); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamReadMetadata) { + auto client = GcsClient(); + const auto custom_time = std::chrono::system_clock::now() + std::chrono::hours(1); + const std::string object_name = "OpenInputStreamMetadataTest/simple.txt"; + const gcs::ObjectMetadata expected = + client + .InsertObject(PreexistingBucketName(), object_name, + "The quick brown fox jumps over the lazy dog", + gcs::WithObjectMetadata(gcs::ObjectMetadata() + .set_content_type("text/plain") + .set_custom_time(custom_time) + .set_cache_control("no-cache") + .upsert_metadata("key0", "value0"))) + .value(); + + auto fs = GcsFileSystem::Make(TestGcsOptions()); + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, + fs->OpenInputStream(PreexistingBucketPath() + object_name)); + + auto format_time = [](std::chrono::system_clock::time_point tp) { + return absl::FormatTime(absl::RFC3339_full, absl::FromChrono(tp), + absl::UTCTimeZone()); + }; + + std::shared_ptr actual; + ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata()); + ASSERT_OK_AND_EQ(expected.self_link(), actual->Get("selfLink")); + ASSERT_OK_AND_EQ(expected.name(), actual->Get("name")); + ASSERT_OK_AND_EQ(expected.bucket(), actual->Get("bucket")); + ASSERT_OK_AND_EQ(std::to_string(expected.generation()), actual->Get("generation")); + ASSERT_OK_AND_EQ(expected.content_type(), actual->Get("Content-Type")); + ASSERT_OK_AND_EQ(format_time(expected.time_created()), actual->Get("timeCreated")); + ASSERT_OK_AND_EQ(format_time(expected.updated()), actual->Get("updated")); + ASSERT_FALSE(actual->Contains("timeDeleted")); + ASSERT_OK_AND_EQ(format_time(custom_time), actual->Get("customTime")); + ASSERT_OK_AND_EQ("false", actual->Get("temporaryHold")); + ASSERT_OK_AND_EQ("false", actual->Get("eventBasedHold")); + ASSERT_FALSE(actual->Contains("retentionExpirationTime")); + ASSERT_OK_AND_EQ(expected.storage_class(), actual->Get("storageClass")); + ASSERT_FALSE(actual->Contains("storageClassUpdated")); + ASSERT_OK_AND_EQ(std::to_string(expected.size()), actual->Get("size")); + ASSERT_OK_AND_EQ(expected.md5_hash(), actual->Get("md5Hash")); + ASSERT_OK_AND_EQ(expected.media_link(), actual->Get("mediaLink")); + ASSERT_OK_AND_EQ(expected.content_encoding(), actual->Get("Content-Encoding")); + ASSERT_OK_AND_EQ(expected.content_disposition(), actual->Get("Content-Disposition")); + ASSERT_OK_AND_EQ(expected.content_language(), actual->Get("Content-Language")); + ASSERT_OK_AND_EQ(expected.cache_control(), actual->Get("Cache-Control")); + auto p = expected.metadata().find("key0"); + ASSERT_TRUE(p != expected.metadata().end()); + ASSERT_OK_AND_EQ(p->second, actual->Get("metadata.key0")); + ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entity")); + ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entityId")); + ASSERT_OK_AND_EQ(expected.crc32c(), actual->Get("crc32c")); + ASSERT_OK_AND_EQ(std::to_string(expected.component_count()), + actual->Get("componentCount")); + ASSERT_OK_AND_EQ(expected.etag(), actual->Get("etag")); + ASSERT_FALSE(actual->Contains("customerEncryption.encryptionAlgorithm")); + ASSERT_FALSE(actual->Contains("customerEncryption.keySha256")); + ASSERT_FALSE(actual->Contains("kmsKeyName")); +} + +TEST_F(GcsIntegrationTest, OpenInputStreamClosed) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(PreexistingObjectPath())); + ASSERT_OK(stream->Close()); + std::array buffer{}; + ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data())); + ASSERT_RAISES(Invalid, stream->Read(buffer.size())); + ASSERT_RAISES(Invalid, stream->Tell()); +} + TEST_F(TestAzureFileSystem, OpenInputFileMixedReadVsReadAt) { // Create a file large enough to make the random access tests non-trivial. auto constexpr kLineWidth = 100;