Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take into account network latency when syncing #55

Merged
merged 6 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cicd/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ else # Linux
TEST_COMMAND="'\"'$MOUNTED_DIR/$1'\"' ${@: 2}"
COMMANDS="echo \"$ $TEST_COMMAND\" && eval $TEST_COMMAND"
. "$HELPERS_DIR/file-hash.sh" "$CICD_DIR/platforms/$PLATFORM_TYPE/$IMAGE_TAG.dockerfile"
# --cap-add=NET_ADMIN needed to run tc (traffic control in linux kernel) inside docker for p2p_high_latency_test.py test.
DOCKER_RUN_COMMAND="docker run --rm --init -v \"\$(pwd):$MOUNTED_DIR\" $(buildkite-intrinsics) -e JOBS -e BUILDKITE_API_KEY '$FULL_TAG' bash -c '$COMMANDS'"
set +e # defer error handling to end
echo "$ $DOCKER_RUN_COMMAND"
Expand Down
2 changes: 1 addition & 1 deletion plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace eosio {
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
chain::public_key_type key; ///< authentication key; may be a producer or peer key, or empty
tstamp time{0};
int64_t time{0}; ///< time message created in nanoseconds from epoch
fc::sha256 token; ///< digest of time to prove we own the private key of the key above
chain::signature_type sig; ///< signature for the digest
string p2p_address;
Expand Down
43 changes: 26 additions & 17 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ namespace eosio {
using connection_ptr = std::shared_ptr<connection>;
using connection_wptr = std::weak_ptr<connection>;

using io_work_t = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;

template <typename Strand>
void verify_strand_in_this_thread(const Strand& strand, const char* func, int line) {
if( !strand.running_in_this_thread() ) {
Expand Down Expand Up @@ -137,6 +135,9 @@ namespace eosio {
in_sync
};

static constexpr int64_t block_interval_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();

mutable std::mutex sync_mtx;
uint32_t sync_known_lib_num{0};
uint32_t sync_last_requested_num{0};
Expand Down Expand Up @@ -1631,6 +1632,7 @@ namespace eosio {
}
sync_next_expected_num = std::max( lib_num + 1, sync_next_expected_num );

// p2p_high_latency_test.py test depends on this exact log statement.
fc_ilog( logger, "Catching up with chain, our last req is ${cc}, theirs is ${t} peer ${p}",
("cc", sync_last_requested_num)( "t", target )( "p", c->peer_name() ) );

Expand Down Expand Up @@ -1663,15 +1665,29 @@ namespace eosio {

sync_reset_lib_num(c, false);

auto current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds
if( network_latency_ns < 0 ) {
peer_wlog(c, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000));
network_latency_ns = 0;
}
// number of blocks syncing node is behind from a peer node
uint32_t nblk_behind_by_net_latency = static_cast<uint32_t>(network_latency_ns / block_interval_ns);
// 2x for time it takes for message to reach back to peer node, +1 to compensate for integer division truncation
uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency + 1;
// message in the log below is used in p2p_high_latency_test.py test
peer_dlog(c, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));

//--------------------------------
// sync need checks; (lib == last irreversible block)
//
// 0. my head block id == peer head id means we are all caught up block wise
// 1. my head block num < peer lib - start sync locally
// 2. my lib > peer head num - send an last_irr_catch_up notice if not the first generation
// 2. my lib > peer head num + nblk_combined_latency - send last_irr_catch_up notice if not the first generation
//
// 3 my head block num < peer head block num - update sync state and send a catchup request
// 4 my head block num >= peer block num send a notice catchup if this is not the first generation
// 3 my head block num + nblk_combined_latency < peer head block num - update sync state and send a catchup request
// 4 my head block num >= peer block num + nblk_combined_latency send a notice catchup if this is not the first generation
// 4.1 if peer appears to be on a different fork ( our_id_for( msg.head_num ) != msg.head_id )
// then request peer's blocks
//
Expand Down Expand Up @@ -1700,7 +1716,7 @@ namespace eosio {
}
return;
}
if (lib_num > msg.head_num ) {
if (lib_num > msg.head_num + nblk_combined_latency) {
fc_ilog( logger, "handshake from ${ep}, lib ${lib}, head ${head}, head id ${id}.. sync 2",
("ep", c->peer_name())("lib", msg.last_irreversible_block_num)("head", msg.head_num)
("id", msg.head_id.str().substr(8,16)) );
Expand All @@ -1716,14 +1732,14 @@ namespace eosio {
return;
}

if (head < msg.head_num ) {
if (head + nblk_combined_latency < msg.head_num ) {
fc_ilog( logger, "handshake from ${ep}, lib ${lib}, head ${head}, head id ${id}.. sync 3",
("ep", c->peer_name())("lib", msg.last_irreversible_block_num)("head", msg.head_num)
("id", msg.head_id.str().substr(8,16)) );
c->syncing = false;
verify_catchup(c, msg.head_num, msg.head_id);
return;
} else {
} else if(head >= msg.head_num + nblk_combined_latency) {
fc_ilog( logger, "handshake from ${ep}, lib ${lib}, head ${head}, head id ${id}.. sync 4",
("ep", c->peer_name())("lib", msg.last_irreversible_block_num)("head", msg.head_num)
("id", msg.head_id.str().substr(8,16)) );
Expand Down Expand Up @@ -1753,6 +1769,8 @@ namespace eosio {
}
} );
return;
} else {
peer_dlog( c, "Block discrepancy is within network latency range.");
}
}

Expand Down Expand Up @@ -3276,15 +3294,6 @@ namespace eosio {
}
}

namespace sc = std::chrono;
sc::system_clock::duration msg_time(msg.time);
auto time = sc::system_clock::now().time_since_epoch();
if(time - msg_time > peer_authentication_interval) {
fc_elog( logger, "Peer ${peer} sent a handshake with a timestamp skewed by more than ${time}.",
("peer", msg.p2p_address)("time", "1 second")); // TODO Add to_variant for std::chrono::system_clock::duration
return false;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding note to this PR here for future documentation of why this was removed. Removed this code because it could never have worked. time is in microseconds where msg.time is in nanoseconds so time - msg_time is always negative.

Also there is no way to do what this was trying to do. You don't know how much network latency is involved so you have no idea what clock skew is involved.

if(msg.sig != chain::signature_type() && msg.token != sha256()) {
sha256 hash = fc::sha256::hash(msg.time);
if(hash != msg.token) {
Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_D
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_client.js ${CMAKE_CURRENT_BINARY_DIR}/ship_client.js COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/large-lib-test.py ${CMAKE_CURRENT_BINARY_DIR}/large-lib-test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_high_latency_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_high_latency_test.py COPYONLY)

#To run plugin_test with all log from blockchain displayed, put --verbose after --, i.e. plugin_test -- --verbose
add_test(NAME plugin_test COMMAND plugin_test --report_level=detailed --color_output)
Expand Down Expand Up @@ -116,6 +117,10 @@ set_property(TEST nodeos_run_check_lr_test PROPERTY LABELS long_running_tests)
add_test(NAME nodeos_remote_lr_test COMMAND tests/nodeos_run_remote_test.py -v --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST nodeos_remote_lr_test PROPERTY LABELS long_running_tests)

# needs iproute-tc or iproute2 depending on platform
#add_test(NAME p2p_high_latency_test COMMAND tests/p2p_high_latency_test.py -v --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
#set_property(TEST p2p_high_latency_test PROPERTY LABELS nonparallelizable_tests)

#add_test(NAME distributed_transactions_lr_test COMMAND tests/distributed-transactions-test.py -d 2 -p 21 -n 21 -v --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
#set_property(TEST distributed_transactions_lr_test PROPERTY LABELS long_running_tests)

Expand Down
112 changes: 112 additions & 0 deletions tests/p2p_high_latency_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env python3
from testUtils import Utils, WaitSpec
from Cluster import Cluster
from WalletMgr import WalletMgr
from TestHelper import TestHelper
import signal
import platform
import subprocess
import time
import re

###############################################################
# p2p connection in high latency network for one producer and one syning node cluster.
#
# This test simulates p2p connections in high latency network. The test case is such that there are one producer
# and one syncing node and a latency of 1100ms is introduced to their p2p connection.
# The expected behavior is that producer recognize the net latency and do not send lib catchup to syncing node.
# As syncing node is always behind, therefore sending lib catchup is useless as producer/peer node gets caught into infinite
# loop of sending lib catch up to syncing node.
###############################################################

def readlogs(node_num, net_latency):
filename = 'var/lib/node_0{}/stderr.txt'.format(node_num)
f = subprocess.Popen(['tail','-F',filename], \
stdout=subprocess.PIPE,stderr=subprocess.PIPE)
latRegex = re.compile(r'\d+ms')
t_end = time.time() + 80 # cluster runs for 80 seconds and and logs are being processed
while time.time() <= t_end:
line = f.stdout.readline().decode("utf-8")
print(line)
if 'info' in line and 'Catching up with chain, our last req is ' in line:
Utils.Print("Syncing node is catching up with chain, however it should not due to net latency")
return False
if 'debug' in line and 'Network latency' in line and float(latRegex.search(line).group()[:-2]) < 0.8 * net_latency:
Utils.Print("Network latency is lower than expected.")
return False

return True
def exec(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate()
process.wait()
process.stdout.close()
process.stderr.close()
return err, process.returncode

Print=Utils.Print

args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running","--clean-run"})
Utils.Debug=args.v

producers=1
syncingNodes=1
totalNodes=producers+syncingNodes
cluster=Cluster(walletd=True)
dumpErrorDetails=args.dump_error_details
keepLogs=args.keep_logs
dontKill=args.leave_running
killAll=args.clean_run

testSuccessful=False
killEosInstances=not dontKill

specificExtraNodeosArgs={}
producerNodeId=0
syncingNodeId=1

specificExtraNodeosArgs[producerNodeId]=" --p2p-listen-endpoint 0.0.0.0:{}".format(9876+producerNodeId)
specificExtraNodeosArgs[syncingNodeId]="--p2p-peer-address 0.0.0.0:{}".format(9876+producerNodeId)

try:
TestHelper.printSystemInfo("BEGIN")
cluster.killall(allInstances=killAll)
cluster.cleanup()
traceNodeosArgs=" --plugin eosio::trace_api_plugin --trace-no-abis --plugin eosio::producer_plugin --produce-time-offset-us 0 --last-block-time-offset-us 0 --cpu-effort-percent 100 \
--last-block-cpu-effort-percent 100 --producer-threads 1 --plugin eosio::net_plugin --net-threads 1"
if cluster.launch(pnodes=1, totalNodes=totalNodes, totalProducers=1, useBiosBootFile=False, specificExtraNodeosArgs=specificExtraNodeosArgs, extraNodeosArgs=traceNodeosArgs) is False:
Utils.cmdError("launcher")
Utils.errorExit("Failed to stand up eos cluster.")

cluster.waitOnClusterSync(blockAdvancing=5)
Utils.Print("Cluster in Sync")
cluster.biosNode.kill(signal.SIGTERM)
Utils.Print("Bios node killed")
latency = 1100 # 1100 millisecond
# adding latency to all inbound and outbound traffic
Utils.Print( "adding {}ms latency to network.".format(latency) )
if platform.system() == 'Darwin':
cmd = 'sudo dnctl pipe 1 config delay {} && \
echo "dummynet out proto tcp from any to any pipe 1" | sudo pfctl -f - && \
sudo pfctl -e'.format(latency)
else:
cmd = 'tc qdisc add dev lo root netem delay {}ms'.format(latency)
err, ReturnCode = exec(cmd)
if ReturnCode != 0:
print(err.decode("utf-8")) # print error details of network slowdown initialization commands
Utils.errorExit("failed to initialize network latency, exited with error code {}".format(ReturnCode))
# processing logs to make sure syncing node doesn't get into lib catch up mode.
testSuccessful=readlogs(syncingNodeId, latency)
if platform.system() == 'Darwin':
cmd = 'sudo pfctl -f /etc/pf.conf && \
sudo dnctl -q flush && sudo pfctl -d'
else:
cmd = 'tc qdisc del dev lo root netem'
err, ReturnCode = exec(cmd)
if ReturnCode != 0:
print(err.decode("utf-8")) # print error details of network slowdown termination commands
Utils.errorExit("failed to remove network latency, exited with error code {}".format(ReturnCode))
finally:
TestHelper.shutdown(cluster, None, testSuccessful, killEosInstances, False, keepLogs, killAll, dumpErrorDetails)

exit(0)