Skip to content

Commit

Permalink
ARROW-5378: [C++] Local filesystem implementation
Browse files Browse the repository at this point in the history
Author: Antoine Pitrou <antoine@python.org>

Closes #4379 from pitrou/ARROW-5378-local-filesystem and squashes the following commits:

c36e3dc <Antoine Pitrou> ARROW-5378:  Local filesystem implementation
  • Loading branch information
pitrou authored and wesm committed May 30, 2019
1 parent 1b798a3 commit 30b4736
Show file tree
Hide file tree
Showing 20 changed files with 1,497 additions and 155 deletions.
2 changes: 2 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ set(ARROW_SRCS
csv/parser.cc
csv/reader.cc
filesystem/filesystem.cc
filesystem/localfs.cc
filesystem/mockfs.cc
filesystem/path-util.cc
filesystem/util-internal.cc
json/options.cc
json/chunked-builder.cc
json/chunker.cc
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@
arrow_install_all_headers("arrow/filesystem")

add_arrow_test(filesystem-test)
add_arrow_test(localfs-test)
305 changes: 284 additions & 21 deletions cpp/src/arrow/filesystem/filesystem-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ TEST(PathUtil, ConcatAbstractPath) {
ASSERT_EQ("abc", ConcatAbstractPath("", "abc"));
ASSERT_EQ("abc/def", ConcatAbstractPath("abc", "def"));
ASSERT_EQ("abc/def/ghi", ConcatAbstractPath("abc/def", "ghi"));

ASSERT_EQ("abc/def", ConcatAbstractPath("abc/", "def"));
ASSERT_EQ("abc/def/ghi", ConcatAbstractPath("abc/def/", "ghi"));

ASSERT_EQ("/abc", ConcatAbstractPath("/", "abc"));
ASSERT_EQ("/abc/def", ConcatAbstractPath("/abc", "def"));
ASSERT_EQ("/abc/def", ConcatAbstractPath("/abc/", "def"));
}

TEST(PathUtil, JoinAbstractPath) {
Expand All @@ -128,6 +135,15 @@ TEST(PathUtil, JoinAbstractPath) {
ASSERT_EQ("", JoinAbstractPath(parts.begin(), parts.begin()));
}

TEST(PathUtil, EnsureTrailingSlash) {
ASSERT_EQ("", EnsureTrailingSlash(""));
ASSERT_EQ("/", EnsureTrailingSlash("/"));
ASSERT_EQ("abc/", EnsureTrailingSlash("abc"));
ASSERT_EQ("abc/", EnsureTrailingSlash("abc/"));
ASSERT_EQ("/abc/", EnsureTrailingSlash("/abc"));
ASSERT_EQ("/abc/", EnsureTrailingSlash("/abc/"));
}

////////////////////////////////////////////////////////////////////////////
// Generic MockFileSystem tests

Expand Down Expand Up @@ -179,34 +195,14 @@ class TestMockFS : public ::testing::Test {
}

void CreateFile(const std::string& path, const std::string& data) {
std::shared_ptr<io::OutputStream> stream;
ASSERT_OK(fs_->OpenAppendStream(path, &stream));
ASSERT_OK(WriteString(stream.get(), data));
ASSERT_OK(stream->Close());
::arrow::fs::CreateFile(fs_.get(), path, data);
}

protected:
TimePoint time_;
std::shared_ptr<MockFileSystem> fs_;
};

void AssertFileStats(const FileStats& st, const std::string& path, FileType type) {
ASSERT_EQ(st.path(), path);
ASSERT_EQ(st.type(), type);
}

void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
TimePoint mtime) {
AssertFileStats(st, path, type);
ASSERT_EQ(st.mtime(), mtime);
}

void AssertFileStats(const FileStats& st, const std::string& path, FileType type,
TimePoint mtime, int64_t size) {
AssertFileStats(st, path, type, mtime);
ASSERT_EQ(st.size(), size);
}

TEST_F(TestMockFS, Empty) {
CheckDirs({});
CheckFiles({});
Expand Down Expand Up @@ -317,6 +313,10 @@ TEST_F(TestMockFS, GetTargetStatsSelector) {
AssertFileStats(stats[0], "AB", FileType::Directory, time_);
AssertFileStats(stats[1], "AB/CD", FileType::Directory, time_);
AssertFileStats(stats[2], "ab", FileType::File, time_, 4);

// Invalid path
s.base_dir = "//foo//bar//baz//";
ASSERT_RAISES(Invalid, fs_->GetTargetStats(s, &stats));
}

TEST_F(TestMockFS, OpenOutputStream) {
Expand All @@ -342,6 +342,269 @@ TEST_F(TestMockFS, OpenAppendStream) {
CheckFiles({{"ab", time_, "some data"}});
}

////////////////////////////////////////////////////////////////////////////
// Concrete SubTreeFileSystem tests

class TestSubTreeFileSystem : public TestMockFS {
public:
void SetUp() override {
TestMockFS::SetUp();
ASSERT_OK(fs_->CreateDir("sub/tree"));
subfs_ = std::make_shared<SubTreeFileSystem>("sub/tree", fs_);
}

void CreateFile(const std::string& path, const std::string& data) {
::arrow::fs::CreateFile(subfs_.get(), path, data);
}

protected:
std::shared_ptr<SubTreeFileSystem> subfs_;
};

TEST_F(TestSubTreeFileSystem, CreateDir) {
ASSERT_OK(subfs_->CreateDir("AB"));
ASSERT_OK(subfs_->CreateDir("AB/CD/EF")); // Recursive
// Non-recursive, parent doesn't exist
ASSERT_RAISES(IOError, subfs_->CreateDir("AB/GH/IJ", false /* recursive */));
ASSERT_OK(subfs_->CreateDir("AB/GH", false /* recursive */));
ASSERT_OK(subfs_->CreateDir("AB/GH/IJ", false /* recursive */));
// Can't create root dir
ASSERT_RAISES(IOError, subfs_->CreateDir(""));
CheckDirs({{"sub", time_},
{"sub/tree", time_},
{"sub/tree/AB", time_},
{"sub/tree/AB/CD", time_},
{"sub/tree/AB/CD/EF", time_},
{"sub/tree/AB/GH", time_},
{"sub/tree/AB/GH/IJ", time_}});
CheckFiles({});
}

TEST_F(TestSubTreeFileSystem, DeleteDir) {
ASSERT_OK(subfs_->CreateDir("AB/CD/EF"));
ASSERT_OK(subfs_->CreateDir("AB/GH/IJ"));
ASSERT_OK(subfs_->DeleteDir("AB/CD"));
ASSERT_OK(subfs_->DeleteDir("AB/GH/IJ"));
CheckDirs({{"sub", time_},
{"sub/tree", time_},
{"sub/tree/AB", time_},
{"sub/tree/AB/GH", time_}});
CheckFiles({});
ASSERT_RAISES(IOError, subfs_->DeleteDir("AB/CD"));
ASSERT_OK(subfs_->DeleteDir("AB"));
CheckDirs({{"sub", time_}, {"sub/tree", time_}});
CheckFiles({});

// Can't delete root dir
ASSERT_RAISES(IOError, subfs_->DeleteDir(""));
CheckDirs({{"sub", time_}, {"sub/tree", time_}});
CheckFiles({});
}

TEST_F(TestSubTreeFileSystem, DeleteFile) {
ASSERT_OK(subfs_->CreateDir("AB"));

CreateFile("ab", "");
CheckFiles({{"sub/tree/ab", time_, ""}});
ASSERT_OK(subfs_->DeleteFile("ab"));
CheckFiles({});

CreateFile("AB/cd", "");
CheckFiles({{"sub/tree/AB/cd", time_, ""}});
ASSERT_OK(subfs_->DeleteFile("AB/cd"));
CheckFiles({});

ASSERT_RAISES(IOError, subfs_->DeleteFile("non-existent"));
ASSERT_RAISES(IOError, subfs_->DeleteFile(""));
}

TEST_F(TestSubTreeFileSystem, MoveFile) {
CreateFile("ab", "");
CheckFiles({{"sub/tree/ab", time_, ""}});
ASSERT_OK(subfs_->Move("ab", "cd"));
CheckFiles({{"sub/tree/cd", time_, ""}});

ASSERT_OK(subfs_->CreateDir("AB"));
ASSERT_OK(subfs_->Move("cd", "AB/ef"));
CheckFiles({{"sub/tree/AB/ef", time_, ""}});

ASSERT_RAISES(IOError, subfs_->Move("AB/ef", ""));
ASSERT_RAISES(IOError, subfs_->Move("", "xxx"));
CheckFiles({{"sub/tree/AB/ef", time_, ""}});
CheckDirs({{"sub", time_}, {"sub/tree", time_}, {"sub/tree/AB", time_}});
}

TEST_F(TestSubTreeFileSystem, MoveDir) {
ASSERT_OK(subfs_->CreateDir("AB/CD/EF"));
ASSERT_OK(subfs_->Move("AB/CD", "GH"));
CheckDirs({{"sub", time_},
{"sub/tree", time_},
{"sub/tree/AB", time_},
{"sub/tree/GH", time_},
{"sub/tree/GH/EF", time_}});

ASSERT_RAISES(IOError, subfs_->Move("AB", ""));
}

TEST_F(TestSubTreeFileSystem, CopyFile) {
CreateFile("ab", "data");
CheckFiles({{"sub/tree/ab", time_, "data"}});
ASSERT_OK(subfs_->CopyFile("ab", "cd"));
CheckFiles({{"sub/tree/ab", time_, "data"}, {"sub/tree/cd", time_, "data"}});

ASSERT_OK(subfs_->CreateDir("AB"));
ASSERT_OK(subfs_->CopyFile("cd", "AB/ef"));
CheckFiles({{"sub/tree/AB/ef", time_, "data"},
{"sub/tree/ab", time_, "data"},
{"sub/tree/cd", time_, "data"}});

ASSERT_RAISES(IOError, subfs_->CopyFile("ab", ""));
ASSERT_RAISES(IOError, subfs_->CopyFile("", "xxx"));
CheckFiles({{"sub/tree/AB/ef", time_, "data"},
{"sub/tree/ab", time_, "data"},
{"sub/tree/cd", time_, "data"}});
}

TEST_F(TestSubTreeFileSystem, OpenInputStream) {
std::shared_ptr<io::InputStream> stream;
std::shared_ptr<Buffer> buffer;
CreateFile("ab", "data");

ASSERT_OK(subfs_->OpenInputStream("ab", &stream));
ASSERT_OK(stream->Read(4, &buffer));
AssertBufferEqual(*buffer, "data");
ASSERT_OK(stream->Close());

ASSERT_RAISES(IOError, subfs_->OpenInputStream("non-existent", &stream));
ASSERT_RAISES(IOError, subfs_->OpenInputStream("", &stream));
}

TEST_F(TestSubTreeFileSystem, OpenInputFile) {
std::shared_ptr<io::RandomAccessFile> stream;
std::shared_ptr<Buffer> buffer;
CreateFile("ab", "some data");

ASSERT_OK(subfs_->OpenInputFile("ab", &stream));
ASSERT_OK(stream->ReadAt(5, 4, &buffer));
AssertBufferEqual(*buffer, "data");
ASSERT_OK(stream->Close());

ASSERT_RAISES(IOError, subfs_->OpenInputFile("non-existent", &stream));
ASSERT_RAISES(IOError, subfs_->OpenInputFile("", &stream));
}

TEST_F(TestSubTreeFileSystem, OpenOutputStream) {
std::shared_ptr<io::OutputStream> stream;

ASSERT_OK(subfs_->OpenOutputStream("ab", &stream));
ASSERT_OK(stream->Write("data"));
ASSERT_OK(stream->Close());
CheckFiles({{"sub/tree/ab", time_, "data"}});

ASSERT_OK(subfs_->CreateDir("AB"));
ASSERT_OK(subfs_->OpenOutputStream("AB/cd", &stream));
ASSERT_OK(stream->Write("other"));
ASSERT_OK(stream->Close());
CheckFiles({{"sub/tree/AB/cd", time_, "other"}, {"sub/tree/ab", time_, "data"}});

ASSERT_RAISES(IOError, subfs_->OpenOutputStream("non-existent/xxx", &stream));
ASSERT_RAISES(IOError, subfs_->OpenOutputStream("AB", &stream));
ASSERT_RAISES(IOError, subfs_->OpenOutputStream("", &stream));
CheckFiles({{"sub/tree/AB/cd", time_, "other"}, {"sub/tree/ab", time_, "data"}});
}

TEST_F(TestSubTreeFileSystem, OpenAppendStream) {
std::shared_ptr<io::OutputStream> stream;

ASSERT_OK(subfs_->OpenAppendStream("ab", &stream));
ASSERT_OK(stream->Write("some"));
ASSERT_OK(stream->Close());
CheckFiles({{"sub/tree/ab", time_, "some"}});

ASSERT_OK(subfs_->OpenAppendStream("ab", &stream));
ASSERT_OK(stream->Write(" data"));
ASSERT_OK(stream->Close());
CheckFiles({{"sub/tree/ab", time_, "some data"}});
}

TEST_F(TestSubTreeFileSystem, GetTargetStatsSingle) {
FileStats st;
ASSERT_OK(subfs_->CreateDir("AB/CD"));

ASSERT_OK(subfs_->GetTargetStats("AB", &st));
AssertFileStats(st, "AB", FileType::Directory, time_);
ASSERT_OK(subfs_->GetTargetStats("AB/CD", &st));
AssertFileStats(st, "AB/CD", FileType::Directory, time_);

CreateFile("ab", "data");
ASSERT_OK(subfs_->GetTargetStats("ab", &st));
AssertFileStats(st, "ab", FileType::File, time_, 4);

ASSERT_OK(subfs_->GetTargetStats("non-existent", &st));
AssertFileStats(st, "non-existent", FileType::NonExistent);
}

TEST_F(TestSubTreeFileSystem, GetTargetStatsVector) {
std::vector<FileStats> stats;

ASSERT_OK(subfs_->CreateDir("AB/CD"));
CreateFile("ab", "data");
CreateFile("AB/cd", "other data");

ASSERT_OK(subfs_->GetTargetStats({"ab", "AB", "AB/cd", "non-existent"}, &stats));
ASSERT_EQ(stats.size(), 4);
AssertFileStats(stats[0], "ab", FileType::File, time_, 4);
AssertFileStats(stats[1], "AB", FileType::Directory, time_);
AssertFileStats(stats[2], "AB/cd", FileType::File, time_, 10);
AssertFileStats(stats[3], "non-existent", FileType::NonExistent);
}

TEST_F(TestSubTreeFileSystem, GetTargetStatsSelector) {
std::vector<FileStats> stats;
Selector selector;

ASSERT_OK(subfs_->CreateDir("AB/CD"));
CreateFile("ab", "data");
CreateFile("AB/cd", "data2");
CreateFile("AB/CD/ef", "data34");

selector.base_dir = "AB";
selector.recursive = false;
ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
ASSERT_EQ(stats.size(), 2);
AssertFileStats(stats[0], "AB/CD", FileType::Directory, time_);
AssertFileStats(stats[1], "AB/cd", FileType::File, time_, 5);

selector.recursive = true;
ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
ASSERT_EQ(stats.size(), 3);
AssertFileStats(stats[0], "AB/CD", FileType::Directory, time_);
AssertFileStats(stats[1], "AB/CD/ef", FileType::File, time_, 6);
AssertFileStats(stats[2], "AB/cd", FileType::File, time_, 5);

selector.base_dir = "";
selector.recursive = false;
ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
ASSERT_EQ(stats.size(), 2);
AssertFileStats(stats[0], "AB", FileType::Directory, time_);
AssertFileStats(stats[1], "ab", FileType::File, time_, 4);

selector.recursive = true;
ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
ASSERT_EQ(stats.size(), 5);
AssertFileStats(stats[0], "AB", FileType::Directory, time_);
AssertFileStats(stats[1], "AB/CD", FileType::Directory, time_);
AssertFileStats(stats[2], "AB/CD/ef", FileType::File, time_, 6);
AssertFileStats(stats[3], "AB/cd", FileType::File, time_, 5);
AssertFileStats(stats[4], "ab", FileType::File, time_, 4);

selector.base_dir = "non-existent";
ASSERT_RAISES(IOError, subfs_->GetTargetStats(selector, &stats));
selector.allow_non_existent = true;
ASSERT_OK(subfs_->GetTargetStats(selector, &stats));
ASSERT_EQ(stats.size(), 0);
}

} // namespace internal
} // namespace fs
} // namespace arrow
Loading

0 comments on commit 30b4736

Please sign in to comment.