diff --git a/CMakeLists.txt b/CMakeLists.txt index 21610561..22624dbf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2) # There is no C per se in WDT but if you use CXX only here many checks fail # Version is Major.Minor.YYMMDDX for up to 10 releases per day # Minor currently is also the protocol version - has to match with Protocol.cpp -project("WDT" LANGUAGES C CXX VERSION 1.14.1507270) +project("WDT" LANGUAGES C CXX VERSION 1.15.1507280) # On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2) set(CMAKE_CXX_STANDARD 11) diff --git a/Receiver.cpp b/Receiver.cpp index 6d304838..5d784f30 100644 --- a/Receiver.cpp +++ b/Receiver.cpp @@ -128,13 +128,8 @@ Receiver::Receiver(const WdtTransferRequest &transferRequest) { } } -Receiver::Receiver(int port, int numSockets) - : Receiver(WdtTransferRequest(port, numSockets)) { -} - Receiver::Receiver(int port, int numSockets, const std::string &destDir) - : Receiver(port, numSockets) { - setDir(destDir); + : Receiver(WdtTransferRequest(port, numSockets, destDir)) { } WdtTransferRequest Receiver::init() { diff --git a/Receiver.h b/Receiver.h index 43c24b07..1f4d2b70 100644 --- a/Receiver.h +++ b/Receiver.h @@ -40,15 +40,9 @@ class Receiver : public WdtBase { explicit Receiver(const WdtTransferRequest &transferRequest); /** - * Constructor that needs start port and number of ports. + * Constructor with start port, number of ports and directory to write to. * If the start port is specified as zero, it auto configures the ports */ - Receiver(int startPort, int numPorts); - - /** - * Constructor which also takes the directory where receiver - * will be writing files to. - */ Receiver(int port, int numSockets, const std::string &destDir); /// Setup before starting (@see WdtBase.h) diff --git a/WdtBase.cpp b/WdtBase.cpp index cf49ca05..68025ac7 100644 --- a/WdtBase.cpp +++ b/WdtBase.cpp @@ -122,7 +122,9 @@ WdtTransferRequest::WdtTransferRequest(const vector& ports) { this->ports = ports; } -WdtTransferRequest::WdtTransferRequest(int startPort, int numPorts) { +WdtTransferRequest::WdtTransferRequest(int startPort, int numPorts, + const string& directory) { + this->directory = directory; int portNum = startPort; for (int i = 0; i < numPorts; i++) { ports.push_back(portNum); @@ -147,7 +149,7 @@ WdtTransferRequest::WdtTransferRequest(const string& uriString) { errorCode = URI_PARSE_ERROR; } string portsStr(wdtUri.getQueryParam(PORTS_PARAM)); - StringPiece portsList(portsStr); // pointers into portsStr + StringPiece portsList(portsStr); // pointers into portsStr do { StringPiece portNum = portsList.split_step(','); int port; @@ -262,6 +264,8 @@ void WdtBase::setTransferId(const std::string& transferId) { } void WdtBase::setProtocolVersion(int64_t protocolVersion) { + WDT_CHECK(protocolVersion > 0) << "Protocol version can't be <= 0 " + << protocolVersion; WDT_CHECK(Protocol::negotiateProtocol(protocolVersion) == protocolVersion) << "Can not support wdt version " << protocolVersion; protocolVersion_ = protocolVersion; diff --git a/WdtBase.h b/WdtBase.h index 76c6cc13..43b84e96 100644 --- a/WdtBase.h +++ b/WdtBase.h @@ -127,7 +127,7 @@ struct WdtTransferRequest { * Constructor with start port and num ports. Fills the vector with * ports from [startPort, startPort + numPorts) */ - WdtTransferRequest(int startPort, int numPorts); + WdtTransferRequest(int startPort, int numPorts, const std::string& directory); /// Constructor to construct the request object from a url string explicit WdtTransferRequest(const std::string& uriString); diff --git a/WdtConfig.h b/WdtConfig.h index b2e43c97..b6b356f0 100644 --- a/WdtConfig.h +++ b/WdtConfig.h @@ -5,10 +5,10 @@ #pragma once #define WDT_VERSION_MAJOR 1 -#define WDT_VERSION_MINOR 14 -#define WDT_VERSION_BUILD 1507270 +#define WDT_VERSION_MINOR 15 +#define WDT_VERSION_BUILD 1507280 // Add -fbcode to version str -#define WDT_VERSION_STR "1.14.1507270-fbcode" +#define WDT_VERSION_STR "1.15.1507280-fbcode" // Tie minor and proto version #define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR diff --git a/WdtResourceControllerTest.cpp b/WdtResourceControllerTest.cpp index fd7aec82..3df6ffdf 100644 --- a/WdtResourceControllerTest.cpp +++ b/WdtResourceControllerTest.cpp @@ -40,11 +40,10 @@ class WdtResourceControllerTest : public WdtResourceController { } WdtTransferRequest makeTransferRequest(const string &transferId) { - WdtTransferRequest request(startPort, numPorts); + WdtTransferRequest request(startPort, numPorts, directory); request.hostName = hostName; request.transferId = transferId; request.protocolVersion = protocolVersion; - request.directory = directory; return request; } }; @@ -356,7 +355,7 @@ void WdtResourceControllerTest::RequestSerializationTest() { EXPECT_EQ(dummy, transferRequest); } { - WdtTransferRequest transferRequest(0, 1); + WdtTransferRequest transferRequest(0, 1, "dir1/dir2"); // Lets not populate anything else transferRequest.hostName = "localhost"; string serializedString = transferRequest.generateUrl(true); @@ -366,7 +365,7 @@ void WdtResourceControllerTest::RequestSerializationTest() { EXPECT_EQ(transferRequest, dummy); } { - WdtTransferRequest transferRequest(0, 8); + WdtTransferRequest transferRequest(0, 8, "/dir3/dir4"); Receiver receiver(transferRequest); transferRequest = receiver.init(); ASSERT_TRUE(!receiver.getTransferId().empty()); diff --git a/wdtCmdLine.cpp b/wdtCmdLine.cpp index 773827f7..5f5d4a4f 100644 --- a/wdtCmdLine.cpp +++ b/wdtCmdLine.cpp @@ -8,6 +8,7 @@ */ #include "Sender.h" #include "Receiver.h" +#include "Protocol.h" #include "WdtResourceController.h" #include #include @@ -43,13 +44,16 @@ DEFINE_string( DEFINE_bool(parse_transfer_log, false, "If true, transfer log is parsed and fixed"); -DEFINE_string(transfer_id, "", "Transfer id (optional, should match"); +DEFINE_string(transfer_id, "", + "Transfer id. Receiver will generate one to be used (via URL) on" + " the sender if not set explictly"); DEFINE_int32( - protocol_version, 0, + protocol_version, facebook::wdt::Protocol::protocol_version, "Protocol version to use, this is used to simulate protocol negotiation"); DEFINE_string(connection_url, "", - "Provide the connection string to connect to receiver"); + "Provide the connection string to connect to receiver" + " (incl. transfer_id and other parameters)"); DECLARE_bool(logtostderr); // default of standard glog is off - let's set it on @@ -118,27 +122,37 @@ int main(int argc, char *argv[]) { LOG(INFO) << "Running WDT " << Protocol::getFullVersion(); ErrorCode retCode = OK; + + // Odd ball case of log parsing if (FLAGS_parse_transfer_log) { // Log parsing mode TransferLogManager transferLogManager; transferLogManager.setRootDir(FLAGS_directory); if (!transferLogManager.parseAndPrint()) { LOG(ERROR) << "Transfer log parsing failed"; - retCode = ERROR; - } - } else if (FLAGS_destination.empty() && FLAGS_connection_url.empty()) { - Receiver receiver(FLAGS_start_port, FLAGS_num_ports, FLAGS_directory); - receiver.setTransferId(FLAGS_transfer_id); - if (FLAGS_protocol_version > 0) { - receiver.setProtocolVersion(FLAGS_protocol_version); + return ERROR; } - WdtTransferRequest transferRequest = receiver.init(); - if (transferRequest.errorCode == ERROR) { + return OK; + } + + // General case : Sender or Receiver + const auto &options = WdtOptions::get(); + WdtTransferRequest req(options.start_port, options.num_ports, + FLAGS_directory); + req.transferId = FLAGS_transfer_id; + if (FLAGS_protocol_version > 0) { + req.protocolVersion = FLAGS_protocol_version; + } + + if (FLAGS_destination.empty() && FLAGS_connection_url.empty()) { + Receiver receiver(req); + WdtTransferRequest augmentedReq = receiver.init(); + if (augmentedReq.errorCode == ERROR) { LOG(ERROR) << "Error setting up receiver"; - return transferRequest.errorCode; + return augmentedReq.errorCode; } LOG(INFO) << "Starting receiver with connection url "; - std::cout << transferRequest.generateUrl() << std::endl; + std::cout << augmentedReq.generateUrl() << std::endl; std::cout.flush(); setUpAbort(receiver); if (!FLAGS_run_as_daemon) { @@ -167,40 +181,23 @@ int main(int argc, char *argv[]) { fileInfo.emplace_back(fields[0], filesize); } } - std::vector ports; - const auto &options = WdtOptions::get(); - for (int i = 0; i < options.num_ports; i++) { - ports.push_back(options.start_port + i); - } - std::unique_ptr sender; - if (FLAGS_connection_url.empty()) { - sender.reset( - new Sender(FLAGS_destination, FLAGS_directory, ports, fileInfo)); - if (FLAGS_protocol_version > 0) { - sender->setProtocolVersion(FLAGS_protocol_version); - } - sender->setTransferId(FLAGS_transfer_id); - } else { - // If you are using a connection url it is - // expected that you set protocol version, ports - // and transfer id in the url - WdtTransferRequest transferRequest(FLAGS_connection_url); - LOG(INFO) << transferRequest.generateUrl(true); - if (transferRequest.directory.empty()) { - transferRequest.directory = FLAGS_directory; - } - sender.reset(new Sender(transferRequest)); + req.hostName = FLAGS_destination; + if (!FLAGS_connection_url.empty()) { + LOG(INFO) << "Input url: " << FLAGS_connection_url; + // TODO: merge instead + req = WdtTransferRequest(FLAGS_connection_url); + req.directory = FLAGS_directory; // re-set it for now } - WdtTransferRequest processedRequest = sender->init(); + Sender sender(req); + WdtTransferRequest processedRequest = sender.init(); LOG(INFO) << "Starting sender with details " << processedRequest.generateUrl(true); ADDITIONAL_SENDER_SETUP - setUpAbort(*sender); - sender->setIncludeRegex(FLAGS_include_regex); - sender->setExcludeRegex(FLAGS_exclude_regex); - sender->setPruneDirRegex(FLAGS_prune_dir_regex); - // TODO fix that - std::unique_ptr report = sender->transfer(); + setUpAbort(sender); + sender.setIncludeRegex(FLAGS_include_regex); + sender.setExcludeRegex(FLAGS_exclude_regex); + sender.setPruneDirRegex(FLAGS_prune_dir_regex); + std::unique_ptr report = sender.transfer(); retCode = report->getSummary().getErrorCode(); } cancelAbort(); diff --git a/wdt_download_resumption_test.sh b/wdt_download_resumption_test.sh index 17a6d535..feaa620d 100755 --- a/wdt_download_resumption_test.sh +++ b/wdt_download_resumption_test.sh @@ -16,7 +16,7 @@ checkLastCmdStatusExpectingFailure() { if [ $LAST_STATUS -eq 0 ] ; then sudo iptables-restore < $DIR/iptable echo "expecting wdt failure, but transfer was successful, failing test" - exit 1 + exit 1 fi } @@ -165,7 +165,7 @@ TEST_COUNT=$((TEST_COUNT + 1)) echo "Download resumption with network error test(3)" startNewTransfer -sleep 10 +sleep 10 killCurrentTransfer STARTING_PORT=$((STARTING_PORT + threads)) startNewTransfer diff --git a/wdt_e2e_simple_test.sh b/wdt_e2e_simple_test.sh index e77ee6e1..26b8653c 100755 --- a/wdt_e2e_simple_test.sh +++ b/wdt_e2e_simple_test.sh @@ -60,34 +60,28 @@ echo "done with setup" (cd $DIR/src ; touch a; ln -s doesntexist badlink; dd if=/dev/zero of=c bs=1024 count=1; mkdir d; ln -s ../d d/e; ln -s ../c d/a) (cd $DIR/extsrc; mkdir TestDir; mkdir TestDir/test; cd TestDir; echo "Text1" >> file1; cd test; echo "Text2" >> file1; ln -s $DIR/extsrc/TestDir; cp -R $DIR/extsrc/TestDir $DIR/src) -# Can't have both client and server send to stdout in parallel or log lines -# get mangled/are missing - so we redirect the server one -echo "$WDTBIN -minloglevel=1 -directory $DIR/dst > $DIR/server.log 2>&1 &" -$WDTBIN -minloglevel=1 -directory $DIR/dst > $DIR/server.log 2>&1 & -# client now retries connects so no need wait for server to be up -pidofreceiver=$! -# To test only 1 socket (single threaded send/receive) -#$WDTBIN -num_sockets=1 -directory $DIR/src -destination ::1 -# Normal +CMD="$WDTBIN -minloglevel=1 -directory $DIR/dst 2> $DIR/server.log | head -1 | \ + xargs -I URL time $WDTBIN -directory $DIR/src -connection_url URL 2>&1 | \ + tee $DIR/client.log" +echo "First transfer: $CMD" +eval $CMD +STATUS=$? +# TODO check for $? / crash... though diff will indirectly find that case -echo "$WDTBIN -directory $DIR/src -destination $HOSTNAME 2>&1 | tee $DIR/client.log" -time $WDTBIN -directory $DIR/src -destination $HOSTNAME 2>&1 | tee $DIR/client.log - -# 2nd Receiver: -echo "$WDTBIN -directory $DIR/dst_symlinks >> $DIR/server.log 2>&1 &" -$WDTBIN -directory $DIR/dst_symlinks >> $DIR/server.log 2>&1 & - - -echo "$WDTBIN -follow_symlinks -directory $DIR/src -destination $HOSTNAME 2>&1 | tee -a $DIR/client.log" -time $WDTBIN -follow_symlinks -directory $DIR/src -destination $HOSTNAME 2>&1 | tee -a $DIR/client.log +CMD="$WDTBIN -minloglevel=1 -directory $DIR/dst_symlinks 2>> $DIR/server.log |\ + head -1 | xargs -I URL time $WDTBIN -follow_symlinks -directory $DIR/src \ + -connection_url URL 2>&1 | tee $DIR/client.log" +echo "Second transfer: $CMD" +eval $CMD +# TODO check for $? / crash... though diff will indirectly find that case if [ $DO_VERIFY -eq 1 ] ; then echo "Verifying for run without follow_symlinks" echo "Checking for difference `date`" - NUM_FILES=`(cd $DIR/dst ; ( find . -type f | wc -l))` + NUM_FILES=`(cd $DIR/dst && ( find . -type f | wc -l))` echo "Transfered `du -ks $DIR/dst` kbytes across $NUM_FILES files" (cd $DIR/src ; ( find . -type f -print0 | xargs -0 md5sum | sort ) \ @@ -103,7 +97,7 @@ if [ $DO_VERIFY -eq 1 ] ; then echo "Verifying for run with follow_symlinks" echo "Checking for difference `date`" - NUM_FILES=`(cd $DIR/dst_symlinks; ( find . -type f | wc -l))` + NUM_FILES=`(cd $DIR/dst_symlinks && ( find . -type f | wc -l))` echo "Transfered `du -ks $DIR/dst_symlinks` kbytes across $NUM_FILES files" (cd $DIR/src ; ( find -L . -type f -print0 | xargs -0 md5sum | sort ) \ @@ -117,10 +111,8 @@ if [ $DO_VERIFY -eq 1 ] ; then if [ $STATUS -eq 0 ] ; then STATUS=$SYMLINK_STATUS fi -#(cd $DIR; ls -lR src/ dst/ ) else echo "Skipping independant verification" - STATUS=0 fi diff --git a/wdt_e2e_test.sh b/wdt_e2e_test.sh index 9227a598..391c1949 100755 --- a/wdt_e2e_test.sh +++ b/wdt_e2e_test.sh @@ -90,7 +90,8 @@ printf "(Sockets,Average rate, Max_rate, Save local?, Delay)=%s,%s,%s,%s,%s\n" " #WDTBIN_OPTS="-buffer_size=$BS -num_sockets=8 -minloglevel 2 -sleep_ms 1 -max_retries 999" WDTBIN_OPTS="-minloglevel=0 -sleep_millis 1 -max_retries 999 -full_reporting "\ "-avg_mbytes_per_sec=$avg_rate -max_mbytes_per_sec=$max_rate "\ -"-num_ports=$threads -throttler_log_time_millis=200 -enable_checksum=true" +"-num_ports=$threads -throttler_log_time_millis=200 -enable_checksum=true "\ +"-transfer_id=$$" WDTBIN="_bin/wdt/wdt $WDTBIN_OPTS" BASEDIR=/dev/shm/tmpWDT @@ -107,6 +108,7 @@ mkdir $DIR/extsrc #cp -R wdt folly /usr/bin /usr/lib /usr/lib64 /usr/libexec /usr/share $DIR/src #cp -R wdt folly /usr/bin /usr/lib /usr/lib64 /usr/libexec $DIR/src cp -R wdt folly /usr/bin /usr/lib $DIR/src +#cp -R wdt folly /usr/bin $DIR/src #cp -R wdt folly $DIR/src # Removing symlinks which point to the same source tree for link in `find -L $DIR/src -xtype l` diff --git a/wdt_max_send_test.sh b/wdt_max_send_test.sh index 9db240a6..cd7811c0 100755 --- a/wdt_max_send_test.sh +++ b/wdt_max_send_test.sh @@ -34,10 +34,12 @@ fi REMOTE=::1 SKIP_WRITES="true" +# TODO: switch to url + # Without throttling: #WDTBIN_OPTS="-sleep_millis 1 -max_retries 3 -num_sockets 13" # With, still gets almost same max (21G) with throttling set high enough -WDTBIN_OPTS="-sleep_millis 1 -max_retries 3 -num_ports 13 +WDTBIN_OPTS="-sleep_millis 1 -max_retries 3 -num_ports 13 -transfer_id=$$ --avg_mbytes_per_sec=26000 --max_mbytes_per_sec=26001 --enable_checksum=false" CLIENT_PROFILE_FORMAT="%Uuser %Ssystem %Eelapsed %PCPU (%Xtext+%Ddata \ %Mmax)k\n%Iinputs+%Ooutputs (%Fmajor+%Rminor)pagefaults %Wswaps\nCLIENT_PROFILE %U \ diff --git a/wdt_network_test.sh b/wdt_network_test.sh index 13608cda..00a77a9e 100755 --- a/wdt_network_test.sh +++ b/wdt_network_test.sh @@ -52,16 +52,17 @@ STARTING_PORT=22400 ERROR_COUNT=25 TEST_COUNT=0 +WDTBIN_BASE="_bin/wdt/wdt --transfer_id $$" WDTBIN_OPTS="-ipv4 -ipv6=false -start_port=$STARTING_PORT \ -avg_mbytes_per_sec=60 -max_mbytes_per_sec=65 -run_as_daemon=false \ -full_reporting -read_timeout_millis=495 -write_timeout_millis=495 \ -progress_report_interval_millis=-1 -abort_check_interval_millis=100 \ -max_transfer_retries=5" -WDTBIN="_bin/wdt/wdt -num_ports=$threads $WDTBIN_OPTS" +WDTBIN="$WDTBIN_BASE -num_ports=$threads $WDTBIN_OPTS" WDTBIN_SERVER="$WDTBIN -protocol_version=$RECEIVER_PROTOCOL_VERSION" WDTBIN_CLIENT="$WDTBIN -protocol_version=$SENDER_PROTOCOL_VERSION" -WDTBIN_MORE_THREADS="_bin/wdt/wdt -num_ports=$((threads + 1)) $WDTBIN_OPTS" -WDTBIN_LESS_THREADS="_bin/wdt/wdt -num_ports=$((threads - 1)) $WDTBIN_OPTS" +WDTBIN_MORE_THREADS="$WDTBIN_BASE -num_ports=$((threads + 1)) $WDTBIN_OPTS" +WDTBIN_LESS_THREADS="$WDTBIN_BASE -num_ports=$((threads - 1)) $WDTBIN_OPTS" BASEDIR=/dev/shm/tmpWDT mkdir -p $BASEDIR @@ -85,7 +86,8 @@ echo "Testing with different start ports in sender and receiver" $WDTBIN_SERVER -directory $DIR/dst${TEST_COUNT} > \ $DIR/server${TEST_COUNT}.log 2>&1 & pidofreceiver=$! -_bin/wdt/wdt -ipv6=false -num_ports=$threads -start_port=$((STARTING_PORT + 1)) \ +$WDTBIN_BASE -ipv6=false -num_ports=$threads \ +-start_port=$((STARTING_PORT + 1)) \ -destination $HOSTNAME -directory $DIR/src -full_reporting \ |& tee -a $DIR/client${TEST_COUNT}.log checkLastCmdStatus @@ -99,7 +101,8 @@ echo "Testing with less number of threads in client" $WDTBIN_SERVER -directory $DIR/dst${TEST_COUNT} > \ $DIR/server${TEST_COUNT}.log 2>&1 & pidofreceiver=$! -_bin/wdt/wdt -ipv6=false -num_ports=$((threads - 1)) -start_port=$STARTING_PORT \ +$WDTBIN_BASE -ipv6=false -num_ports=$((threads - 1)) \ +-start_port=$STARTING_PORT \ -destination $HOSTNAME -directory $DIR/src -full_reporting \ |& tee -a $DIR/client${TEST_COUNT}.log checkLastCmdStatus @@ -112,7 +115,8 @@ echo "Testing with more number of threads in client" $WDTBIN_SERVER -directory $DIR/dst${TEST_COUNT} > \ $DIR/server${TEST_COUNT}.log 2>&1 & pidofreceiver=$! -_bin/wdt/wdt -ipv6=false -num_ports=$((threads + 1)) -start_port=$STARTING_PORT \ +$WDTBIN_BASE -ipv6=false -num_ports=$((threads + 1)) \ +-start_port=$STARTING_PORT \ -destination $HOSTNAME -directory $DIR/src -full_reporting \ |& tee -a $DIR/client${TEST_COUNT}.log checkLastCmdStatus @@ -164,7 +168,7 @@ sleep 5 sudo iptables-save > $DIR/iptable echo "blocking $STARTING_PORT" sudo iptables -A INPUT -p tcp --sport $STARTING_PORT -j DROP -wait $pidofsender +wait $pidofsender checkLastCmdStatus wait $pidofreceiver checkLastCmdStatus @@ -183,7 +187,7 @@ sleep 5 sudo iptables-save > $DIR/iptable echo "blocking $STARTING_PORT" sudo iptables -A INPUT -p tcp --dport $((STARTING_PORT + 1)) -j DROP -wait $pidofsender +wait $pidofsender checkLastCmdStatus wait $pidofreceiver checkLastCmdStatus @@ -203,7 +207,7 @@ sleep 5 sudo iptables-save > $DIR/iptable echo "blocking $STARTING_PORT" sudo iptables -A INPUT -p tcp --dport $((STARTING_PORT + 1)) -j DROP -wait $pidofsender +wait $pidofsender checkLastCmdStatus wait $pidofreceiver checkLastCmdStatus @@ -223,7 +227,7 @@ sleep 5 sudo iptables-save > $DIR/iptable echo "blocking $STARTING_PORT" sudo iptables -A INPUT -p tcp --dport $((STARTING_PORT + 1)) -j DROP -wait $pidofsender +wait $pidofsender checkLastCmdStatus wait $pidofreceiver checkLastCmdStatus