Skip to content

Commit

Permalink
FileInfoGenerator for sending files in batches
Browse files Browse the repository at this point in the history
Summary:
Today sender computes the list of files to be send at the
begining of the transfer, transfers all of the files and
finishes. When we are transferring, for example, a large DB
we might have ended up with new files while the transfer is
in progress, which requires some application side changes to
start another transfer and send these changes. This diff tries
to simplify this by providing the Sender an ability to check
for more files before completing the transfer.

Differential Revision: D17215618

fbshipit-source-id: 3cad04f92bba00efa0510b132c78c3f0989e32ba
  • Loading branch information
sarangbh authored and facebook-github-bot committed Sep 13, 2019
1 parent 22ef0bd commit 1557fce
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 9 deletions.
7 changes: 7 additions & 0 deletions Sender.cpp
Expand Up @@ -293,8 +293,15 @@ ErrorCode Sender::start() {
dirQueue_->setOpenFilesDuringDiscovery(options_.open_files_during_discovery);
dirQueue_->setDirectReads(options_.odirect_reads);
if (!transferRequest_.fileInfo.empty() ||
transferRequest_.fileInfoGenerator ||
transferRequest_.disableDirectoryTraversal) {
dirQueue_->setFileInfo(transferRequest_.fileInfo);
if (transferRequest_.fileInfoGenerator) {
dirQueue_->setFileInfoGenerator([this]() {
dirQueue_->waitForPreviousTransfer();
return transferRequest_.fileInfoGenerator();
});
}
}
transferHistoryController_ =
std::make_unique<TransferHistoryController>(*dirQueue_);
Expand Down
8 changes: 6 additions & 2 deletions Sender.h
Expand Up @@ -8,12 +8,15 @@
*/
#pragma once

#include <wdt/WdtBase.h>
#include <wdt/util/ClientSocket.h>
#include <chrono>
#include <iostream>
#include <memory>

#include <gtest/gtest_prod.h>

#include <wdt/WdtBase.h>
#include <wdt/util/ClientSocket.h>

namespace facebook {
namespace wdt {

Expand Down Expand Up @@ -102,6 +105,7 @@ class Sender : public WdtBase {
private:
friend class SenderThread;
friend class QueueAbortChecker;
FRIEND_TEST(SenderTest, FileInfoGenerator);

/// Validate the transfer request
ErrorCode validateTransferRequest() override;
Expand Down
19 changes: 19 additions & 0 deletions TARGETS
Expand Up @@ -830,6 +830,7 @@ cpp_library(
],
external_deps = [
("openssl", None, "crypto"),
("googletest", None, "gtest_headers"),
"gflags",
"glog",
],
Expand Down Expand Up @@ -987,3 +988,21 @@ cpp_binary(
"glog",
],
)

cpp_unittest(
name = "wdt_sender_test",
srcs = ["test/SenderTest.cpp"],
auto_headers = AutoHeaders.RECURSIVE_GLOB, # https://fburl.com/424819295
compiler_flags = WDT_COMPILER_FLAGS,
deps = [
":wdtlib",
"//folly:file_util",
"//folly:random",
],
external_deps = [
("googletest", None, "gtest"),
("boost", None, "boost_filesystem"),
"gflags",
"glog",
],
)
13 changes: 13 additions & 0 deletions WdtTransferRequest.h
Expand Up @@ -10,6 +10,8 @@
#include <wdt/ErrorCodes.h>
#include <wdt/Protocol.h>
#include <wdt/WdtOptions.h>
#include <folly/Optional.h>
#include <functional>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -176,6 +178,17 @@ struct WdtTransferRequest {
/// Only used for the sender and when not using directory discovery
std::vector<WdtFileInfo> fileInfo;

/// Only used for the sender and when not using directory discovery.
/// Sender repeatedly invokes this function to get list of files to
/// send and stops when the function return folly::none. File transfer
/// is done in batches. After intiating the transfer for a initial list of
/// files (either via fileInfo or a call to fileInfoGenerator), we wait
/// until transfer is complete before we read the next batch, if any, by
/// invoking fileInfoGenerator again.
using FileInfoGenerator =
std::function<folly::Optional<std::vector<WdtFileInfo>>()>;
FileInfoGenerator fileInfoGenerator{};

/// Use fileInfo even if empty (don't use the directory exploring)
bool disableDirectoryTraversal{false};

Expand Down
110 changes: 110 additions & 0 deletions test/SenderTest.cpp
@@ -0,0 +1,110 @@
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#include <algorithm>

#include <boost/filesystem.hpp>
#include <folly/FileUtil.h>
#include <folly/Random.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include <wdt/Receiver.h>
#include <wdt/Sender.h>
#include <wdt/Wdt.h>

namespace facebook {
namespace wdt {

namespace {
boost::filesystem::path createTmpDir() {
boost::filesystem::path dir = "/tmp";
dir /= boost::filesystem::unique_path("wdt_sender_test-%%%-%%%%-%%%");
if (boost::filesystem::exists(dir)) {
boost::filesystem::remove_all(dir);
}
boost::filesystem::create_directories(dir);
return dir;
}

void createFile(const std::string& path, size_t size) {
auto data = std::vector<char>(size, 0);
std::generate(data.begin(), data.end(),
[]() -> char { return (folly::Random::rand32() % 10) + '0'; });
folly::writeFile(data, path.data());
}

} // namespace

TEST(SenderTest, FileInfoGenerator) {
auto senderDir = createTmpDir();
auto receiverDir = createTmpDir();

// create 10 files
std::vector<WdtFileInfo> fileInfo;
std::vector<size_t> cumulativeSize;
cumulativeSize.push_back(0);
const size_t numFiles = 3;
const uint32_t maxFileSize = 1000000;
for (size_t i = 0; i < numFiles; i++) {
auto file = senderDir / std::to_string(i);
size_t fileSz = folly::Random::rand32() % maxFileSize;
createFile(file.c_str(), fileSz);
fileInfo.push_back({std::to_string(i), -1, false});
cumulativeSize.push_back(cumulativeSize.back() + fileSz);
}

auto receiver = std::make_unique<Receiver>(0, 3, receiverDir.c_str());
auto req = receiver->init();
receiver->transferAsync();

std::unique_ptr<Sender> sender;
size_t nextFile = 0;
req.disableDirectoryTraversal = true;
req.directory = senderDir.c_str();
req.fileInfo = std::vector<WdtFileInfo>{fileInfo[nextFile++]};
req.fileInfoGenerator = [&]() -> folly::Optional<std::vector<WdtFileInfo>> {
auto stats = sender->getGlobalTransferStats();
EXPECT_EQ(cumulativeSize[nextFile], stats.getDataBytes());
if (nextFile < fileInfo.size()) {
return std::vector<WdtFileInfo>{fileInfo[nextFile++]};
}
return folly::none;
};
sender = std::make_unique<Sender>(req);
sender->transfer();
receiver->finish();

for (size_t i = 0; i < numFiles; i++) {
auto sentPath = senderDir / std::to_string(i);
std::string sent;
EXPECT_TRUE(folly::readFile(sentPath.c_str(), sent));

auto recvPath = receiverDir / std::to_string(i);
std::string recv;
EXPECT_TRUE(folly::readFile(recvPath.c_str(), recv));

EXPECT_EQ(sent, recv);
}

boost::filesystem::remove_all(senderDir);
boost::filesystem::remove_all(receiverDir);
}

} // namespace wdt
} // namespace facebook

int main(int argc, char *argv[]) {
FLAGS_logtostderr = true;
testing::InitGoogleTest(&argc, argv);
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
facebook::wdt::Wdt::initializeWdt("wdt");
int ret = RUN_ALL_TESTS();
return ret;
}
38 changes: 34 additions & 4 deletions util/DirectorySourceQueue.cpp
Expand Up @@ -87,7 +87,14 @@ void DirectorySourceQueue::setFileInfo(
exploreDirectory_ = false;
}

const std::vector<WdtFileInfo> &DirectorySourceQueue::getFileInfo() const {
void DirectorySourceQueue::setFileInfoGenerator(
WdtTransferRequest::FileInfoGenerator gen) {
fileInfoGenerator_ = std::move(gen);
exploreDirectory_ = false;
}

std::vector<WdtFileInfo> DirectorySourceQueue::getFileInfo() const {
std::lock_guard<std::mutex> lock(mutex_);
return fileInfo_;
}

Expand Down Expand Up @@ -200,7 +207,19 @@ bool DirectorySourceQueue::buildQueueSynchronously() {
} else {
WLOG(INFO) << "Using list of file info. Number of files "
<< fileInfo_.size();
res = enqueueFiles();
res = enqueueFiles(fileInfo_);

// also check if there is a generator to get more files
while (res && fileInfoGenerator_) {
auto files = fileInfoGenerator_();
if (!files) {
break;
}
res = enqueueFiles(*files);

std::lock_guard<std::mutex> lock(mutex_);
fileInfo_.insert(fileInfo_.end(), files->begin(), files->end());
}
}
{
std::lock_guard<std::mutex> lock(mutex_);
Expand Down Expand Up @@ -550,8 +569,8 @@ std::vector<string> &DirectorySourceQueue::getFailedDirectories() {
return failedDirectories_;
}

bool DirectorySourceQueue::enqueueFiles() {
for (auto &info : fileInfo_) {
bool DirectorySourceQueue::enqueueFiles(std::vector<WdtFileInfo>& fileInfo) {
for (auto &info : fileInfo) {
if (threadCtx_->getAbortChecker()->shouldAbort()) {
WLOG(ERROR) << "Directory transfer thread aborted";
return false;
Expand Down Expand Up @@ -664,9 +683,13 @@ std::unique_ptr<ByteSource> DirectorySourceQueue::getNextSource(
std::unique_ptr<ByteSource> source;
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
numWaiters_++;
// notify if someone's waiting for previous batch to finish
conditionPrevTransfer_.notify_all();
while (sourceQueue_.empty() && !initFinished_) {
conditionNotEmpty_.wait(lock);
}
numWaiters_--;
if (!failedSourceStats_.empty() || !failedDirectories_.empty()) {
status = ERROR;
} else {
Expand Down Expand Up @@ -698,5 +721,12 @@ std::unique_ptr<ByteSource> DirectorySourceQueue::getNextSource(
failedSourceStats_.emplace_back(std::move(source->getTransferStats()));
}
}

void DirectorySourceQueue::waitForPreviousTransfer() {
std::unique_lock<std::mutex> lock(mutex_);
while (!sourceQueue_.empty() || numWaiters_ < numClientThreads_) {
conditionPrevTransfer_.wait(lock);
}
}
}
}
30 changes: 27 additions & 3 deletions util/DirectorySourceQueue.h
Expand Up @@ -158,11 +158,13 @@ class DirectorySourceQueue : public SourceQueue {
*/
void setFileInfo(const std::vector<WdtFileInfo> &fileInfo);

void setFileInfoGenerator(WdtTransferRequest::FileInfoGenerator gen);

/// @param blockSizeMbytes block size in Mbytes
void setBlockSizeMbytes(int64_t blockSizeMbytes);

/// Get the file info in this directory queue
const std::vector<WdtFileInfo> &getFileInfo() const;
std::vector<WdtFileInfo> getFileInfo() const;

/**
* Sets whether to follow symlink or not
Expand Down Expand Up @@ -229,6 +231,15 @@ class DirectorySourceQueue : public SourceQueue {
*/
bool setRootDir(const std::string &newRootDir);

/**
* Allows the caller to block until all the previous transfers have
* finished, before invoking fileInfoGenerator_ to get the next batch.
* NOTE: This uses numClientThreads_ to get the number of clients pulling
* from the queue and the size of queue to determine if transfers have
* finished.
*/
void waitForPreviousTransfer();

private:
/**
* Resolves a symlink.
Expand All @@ -248,7 +259,7 @@ class DirectorySourceQueue : public SourceQueue {
* Stat the input files and populate queue
* @return true on success, false on error
*/
bool enqueueFiles();
bool enqueueFiles(std::vector<WdtFileInfo>& fileInfo);

/**
* initial creation from either explore or enqueue files, uses
Expand Down Expand Up @@ -303,12 +314,20 @@ class DirectorySourceQueue : public SourceQueue {
/// List of files to enqueue instead of recursing over rootDir_.
std::vector<WdtFileInfo> fileInfo_;

/// protects initCalled_/initFinished_/sourceQueue_/failedSourceStats_
/// A generator function to invoke to get more files to send
WdtTransferRequest::FileInfoGenerator fileInfoGenerator_;

/// protects
/// initCalled_/initFinished_/sourceQueue_/failedSourceStats_/numWaiters_
mutable std::mutex mutex_;

/// condition variable indicating sourceQueue_ is not empty
mutable std::condition_variable conditionNotEmpty_;

/// condition variable indicating previous batch transfer has finished i.e.
/// queue is empty and all client threads are waiting.
mutable std::condition_variable conditionPrevTransfer_;

/// Indicates whether init() has been called to prevent multiple calls
bool initCalled_{false};

Expand Down Expand Up @@ -354,6 +373,11 @@ class DirectorySourceQueue : public SourceQueue {
SourceComparator>
sourceQueue_;

/**
* number of threads waiting on the queue
*/
int64_t numWaiters_{0};

/// Transfer stats for sources which are not transferred
std::vector<TransferStats> failedSourceStats_;

Expand Down

0 comments on commit 1557fce

Please sign in to comment.