Skip to content

Commit

Permalink
Fix input stream tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Oct 14, 2023
1 parent 0a693c8 commit 8ff5684
Showing 1 changed file with 117 additions and 101 deletions.
218 changes: 117 additions & 101 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "arrow/filesystem/azurefs.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"

#include <gmock/gmock-matchers.h>
#include <gmock/gmock-more-matchers.h>
Expand Down Expand Up @@ -288,11 +289,9 @@ class TestAzureFileSystem : public ::testing::Test {
}
};

TEST_F(GcsIntegrationTest, OpenInputStreamString) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

TEST_F(TestAzureFileSystem, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath()));
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));

std::array<char, 1024> buffer{};
std::int64_t size;
Expand All @@ -301,11 +300,9 @@ TEST_F(GcsIntegrationTest, OpenInputStreamString) {
EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
}

TEST_F(GcsIntegrationTest, OpenInputStreamStringBuffers) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

TEST_F(TestAzureFileSystem, OpenInputStreamStringBuffers) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath()));
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));

std::string contents;
std::shared_ptr<Buffer> buffer;
Expand All @@ -317,14 +314,13 @@ TEST_F(GcsIntegrationTest, OpenInputStreamStringBuffers) {
EXPECT_EQ(contents, kLoremIpsum);
}

TEST_F(GcsIntegrationTest, OpenInputStreamInfo) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath()));
TEST_F(TestAzureFileSystem, OpenInputStreamInfo) {
// TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
// fs->GetFileInfo(PreexistingObjectPath()));
arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File);

std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(info));
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(info));

std::array<char, 1024> buffer{};
std::int64_t size;
Expand All @@ -333,108 +329,128 @@ TEST_F(GcsIntegrationTest, OpenInputStreamInfo) {
EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
}

TEST_F(GcsIntegrationTest, OpenInputStreamEmpty) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

const auto object_path =
internal::ConcatAbstractPath(PreexistingBucketName(), "empty-object.txt");
CreateFile(fs.get(), object_path, std::string());
TEST_F(TestAzureFileSystem, OpenInputStreamEmpty) {
const auto path_to_file = "empty-object.txt";
const auto path = PreexistingContainerPath() + path_to_file;
gen2_client_->GetFileSystemClient(PreexistingContainerName())
.GetFileClient(path_to_file)
.UploadFrom(nullptr, 0);

ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(object_path));
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(path));
std::array<char, 1024> buffer{};
std::int64_t size;
ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
EXPECT_EQ(size, 0);
}

TEST_F(GcsIntegrationTest, OpenInputStreamNotFound) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

ASSERT_RAISES(IOError, fs->OpenInputStream(NotFoundObjectPath()));
TEST_F(TestAzureFileSystem, OpenInputStreamNotFound) {
ASSERT_RAISES(IOError, fs_->OpenInputStream(NotFoundObjectPath()));
}

TEST_F(GcsIntegrationTest, OpenInputStreamInfoInvalid) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingBucketPath()));
ASSERT_RAISES(IOError, fs->OpenInputStream(info));

ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));
ASSERT_RAISES(IOError, fs->OpenInputStream(info));
}
TEST_F(TestAzureFileSystem, OpenInputStreamInfoInvalid) {
// TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
// fs->GetFileInfo(PreexistingBucketPath()));
arrow::fs::FileInfo info(PreexistingContainerPath(), FileType::Directory);
ASSERT_RAISES(IOError, fs_->OpenInputStream(info));

TEST_F(GcsIntegrationTest, OpenInputStreamUri) {
auto fs = GcsFileSystem::Make(TestGcsOptions());
ASSERT_RAISES(Invalid, fs->OpenInputStream("gs://" + PreexistingObjectPath()));
// TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
// fs->GetFileInfo(NotFoundObjectPath()));
arrow::fs::FileInfo info2(PreexistingContainerPath(), FileType::NotFound);
ASSERT_RAISES(IOError, fs_->OpenInputStream(info2));
}

TEST_F(GcsIntegrationTest, OpenInputStreamReadMetadata) {
auto client = GcsClient();
const auto custom_time = std::chrono::system_clock::now() + std::chrono::hours(1);
const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";
const gcs::ObjectMetadata expected =
client
.InsertObject(PreexistingBucketName(), object_name,
"The quick brown fox jumps over the lazy dog",
gcs::WithObjectMetadata(gcs::ObjectMetadata()
.set_content_type("text/plain")
.set_custom_time(custom_time)
.set_cache_control("no-cache")
.upsert_metadata("key0", "value0")))
.value();

auto fs = GcsFileSystem::Make(TestGcsOptions());
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream,
fs->OpenInputStream(PreexistingBucketPath() + object_name));

auto format_time = [](std::chrono::system_clock::time_point tp) {
return absl::FormatTime(absl::RFC3339_full, absl::FromChrono(tp),
absl::UTCTimeZone());
};

std::shared_ptr<const KeyValueMetadata> actual;
ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata());
ASSERT_OK_AND_EQ(expected.self_link(), actual->Get("selfLink"));
ASSERT_OK_AND_EQ(expected.name(), actual->Get("name"));
ASSERT_OK_AND_EQ(expected.bucket(), actual->Get("bucket"));
ASSERT_OK_AND_EQ(std::to_string(expected.generation()), actual->Get("generation"));
ASSERT_OK_AND_EQ(expected.content_type(), actual->Get("Content-Type"));
ASSERT_OK_AND_EQ(format_time(expected.time_created()), actual->Get("timeCreated"));
ASSERT_OK_AND_EQ(format_time(expected.updated()), actual->Get("updated"));
ASSERT_FALSE(actual->Contains("timeDeleted"));
ASSERT_OK_AND_EQ(format_time(custom_time), actual->Get("customTime"));
ASSERT_OK_AND_EQ("false", actual->Get("temporaryHold"));
ASSERT_OK_AND_EQ("false", actual->Get("eventBasedHold"));
ASSERT_FALSE(actual->Contains("retentionExpirationTime"));
ASSERT_OK_AND_EQ(expected.storage_class(), actual->Get("storageClass"));
ASSERT_FALSE(actual->Contains("storageClassUpdated"));
ASSERT_OK_AND_EQ(std::to_string(expected.size()), actual->Get("size"));
ASSERT_OK_AND_EQ(expected.md5_hash(), actual->Get("md5Hash"));
ASSERT_OK_AND_EQ(expected.media_link(), actual->Get("mediaLink"));
ASSERT_OK_AND_EQ(expected.content_encoding(), actual->Get("Content-Encoding"));
ASSERT_OK_AND_EQ(expected.content_disposition(), actual->Get("Content-Disposition"));
ASSERT_OK_AND_EQ(expected.content_language(), actual->Get("Content-Language"));
ASSERT_OK_AND_EQ(expected.cache_control(), actual->Get("Cache-Control"));
auto p = expected.metadata().find("key0");
ASSERT_TRUE(p != expected.metadata().end());
ASSERT_OK_AND_EQ(p->second, actual->Get("metadata.key0"));
ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entity"));
ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entityId"));
ASSERT_OK_AND_EQ(expected.crc32c(), actual->Get("crc32c"));
ASSERT_OK_AND_EQ(std::to_string(expected.component_count()),
actual->Get("componentCount"));
ASSERT_OK_AND_EQ(expected.etag(), actual->Get("etag"));
ASSERT_FALSE(actual->Contains("customerEncryption.encryptionAlgorithm"));
ASSERT_FALSE(actual->Contains("customerEncryption.keySha256"));
ASSERT_FALSE(actual->Contains("kmsKeyName"));
TEST_F(TestAzureFileSystem, OpenInputStreamUri) {
ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfss://" + PreexistingObjectPath()));
}

TEST_F(GcsIntegrationTest, OpenInputStreamClosed) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenInputStream(PreexistingObjectPath()));
// TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
// const auto custom_time = std::chrono::system_clock::now() + std::chrono::hours(1);
// const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";
// // const gcs::ObjectMetadata expected =
// // client
// // .InsertObject(PreexistingBucketName(), object_name,
// // "The quick brown fox jumps over the lazy dog",
// // gcs::WithObjectMetadata(gcs::ObjectMetadata()
// // .set_content_type("text/plain")
// // .set_custom_time(custom_time)
// // .set_cache_control("no-cache")
// // .upsert_metadata("key0",
// // "value0")))
// // .value();

// const std::string blob_content = "The quick brown fox jumps over the lazy dog";
// auto file_client = gen2_client_->GetFileSystemClient(PreexistingContainerName())
// .GetFileClient(object_name);

// const auto expected =
// file_client
// .UploadFrom(reinterpret_cast<const uint8_t*>(blob_content.data()),
// blob_content.size())
// .Value;

// std::shared_ptr<io::InputStream> stream;
// ASSERT_OK_AND_ASSIGN(stream,
// fs_->OpenInputStream(PreexistingContainerPath() + object_name));

// // auto format_time = [](std::chrono::system_clock::time_point tp) {
// // return absl::FormatTime(absl::RFC3339_full, absl::FromChrono(tp),
// // absl::UTCTimeZone());
// // };

// const auto properties = file_client.GetProperties();

// std::shared_ptr<const KeyValueMetadata> actual;
// ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata());

// for (int i = 0; i < actual->size(); i++) {
// std::cout << "key=" << actual->key(i) << ", value=" << actual->value(i) << std::endl;
// }

// std::string actual_last_modified;
// // ASSERT_OK_AND_ASSIGN(actual_last_modified, actual->Get("LastModified"));
// auto expected_last_modified = expected.LastModified;
// std::cout << "actual=" << actual_last_modified << std::endl;

// // ASSERT_OK_AND_EQ(expected.LastModified, actual->Get("LastModified"));

// // ASSERT_OK_AND_EQ(expected.self_link(), actual->Get("selfLink"));
// // ASSERT_OK_AND_EQ(expected.name(), actual->Get("name"));
// // ASSERT_OK_AND_EQ(expected.bucket(), actual->Get("bucket"));
// // ASSERT_OK_AND_EQ(std::to_string(expected.generation()), actual->Get("generation"));
// // ASSERT_OK_AND_EQ(expected.content_type(), actual->Get("Content-Type"));
// // ASSERT_OK_AND_EQ(format_time(expected.time_created()), actual->Get("timeCreated"));
// // ASSERT_OK_AND_EQ(format_time(expected.updated()), actual->Get("updated"));
// // ASSERT_FALSE(actual->Contains("timeDeleted"));
// // ASSERT_OK_AND_EQ(format_time(custom_time), actual->Get("customTime"));
// // ASSERT_OK_AND_EQ("false", actual->Get("temporaryHold"));
// // ASSERT_OK_AND_EQ("false", actual->Get("eventBasedHold"));
// // ASSERT_FALSE(actual->Contains("retentionExpirationTime"));
// // ASSERT_OK_AND_EQ(expected.storage_class(), actual->Get("storageClass"));
// // ASSERT_FALSE(actual->Contains("storageClassUpdated"));
// // ASSERT_OK_AND_EQ(std::to_string(expected.size()), actual->Get("size"));
// // ASSERT_OK_AND_EQ(expected.md5_hash(), actual->Get("md5Hash"));
// // ASSERT_OK_AND_EQ(expected.media_link(), actual->Get("mediaLink"));
// // ASSERT_OK_AND_EQ(expected.content_encoding(), actual->Get("Content-Encoding"));
// // ASSERT_OK_AND_EQ(expected.content_disposition(), actual->Get("Content-Disposition"));
// // ASSERT_OK_AND_EQ(expected.content_language(), actual->Get("Content-Language"));
// // ASSERT_OK_AND_EQ(expected.cache_control(), actual->Get("Cache-Control"));
// // auto p = expected.metadata().find("key0");
// // ASSERT_TRUE(p != expected.metadata().end());
// // ASSERT_OK_AND_EQ(p->second, actual->Get("metadata.key0"));
// // ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entity"));
// // ASSERT_EQ(expected.has_owner(), actual->Contains("owner.entityId"));
// // ASSERT_OK_AND_EQ(expected.crc32c(), actual->Get("crc32c"));
// // ASSERT_OK_AND_EQ(std::to_string(expected.component_count()),
// // actual->Get("componentCount"));
// // ASSERT_OK_AND_EQ(expected.etag(), actual->Get("etag"));
// // ASSERT_FALSE(actual->Contains("customerEncryption.encryptionAlgorithm"));
// // ASSERT_FALSE(actual->Contains("customerEncryption.keySha256"));
// // ASSERT_FALSE(actual->Contains("kmsKeyName"));
// }

TEST_F(TestAzureFileSystem, OpenInputStreamClosed) {
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(PreexistingObjectPath()));
ASSERT_OK(stream->Close());
std::array<char, 16> buffer{};
ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data()));
Expand Down

0 comments on commit 8ff5684

Please sign in to comment.