Skip to content

Commit

Permalink
IMPALA-9137: Blacklist node if a DataStreamService RPC to the node fails
Browse files Browse the repository at this point in the history
Introduces a new optional field to FragmentInstanceExecStatusPB:
AuxErrorInfoPB. AuxErrorInfoPB contains optional metadata associated
with a failed fragment instance. Currently, AuxErrorInfoPB only contains
one field: RPCErrorInfoPB, which is only set if the fragment failed
because a RPC to another impalad failed. The RPCErrorInfoPB contains
the destination node of the failed RPC and the posix error code of the
failed RPC.

Coordinator::UpdateBackendExecStatus(ReportExecStatusRequestPB, ...)
uses the information in RPCErrorInfoPB (if one is set) to blacklist
the target node. While RPCErrorInfoPB::dest_node can be set to the address
of the Coordinator, the Coordinator will not blacklist itself. The
Coordinator only blacklists the node if the RPC failed with a specific
error code (currently either ENOTCONN, ECONNREFUSED, ESHUTDOWN).

Testing:
* Ran core tests
* Added new test to test_blacklist.py

Change-Id: I733cca13847fde43c8ea2ae574d3ae04bd06419c
Reviewed-on: http://gerrit.cloudera.org:8080/14677
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
  • Loading branch information
sahilTakiar authored and Impala Public Jenkins committed Dec 20, 2019
1 parent ed5e7da commit 8a4fece
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 0 deletions.
73 changes: 73 additions & 0 deletions be/src/runtime/coordinator.cc
Expand Up @@ -17,6 +17,7 @@

#include "runtime/coordinator.h"

#include <cerrno>
#include <unordered_set>

#include <thrift/protocol/TDebugProtocol.h>
Expand Down Expand Up @@ -236,6 +237,15 @@ void Coordinator::InitBackendStates() {
schedule_, query_ctx(), backend_idx, filter_mode_, entry.second));
backend_state->Init(fragment_stats_, host_profiles_, obj_pool());
backend_states_[backend_idx++] = backend_state;
// was_inserted is true if the pair was successfully inserted into the map, false
// otherwise.
bool was_inserted = addr_to_backend_state_
.emplace(backend_state->krpc_impalad_address(), backend_state)
.second;
if (UNLIKELY(!was_inserted)) {
DCHECK(false) << "Network address " << backend_state->krpc_impalad_address()
<< " associated with multiple BackendStates";
}
}
backend_resource_state_ =
obj_pool()->Add(new BackendResourceState(backend_states_, schedule_));
Expand Down Expand Up @@ -827,6 +837,11 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
// We may start receiving status reports before all exec rpcs are complete.
// Can't apply state transition until no more exec rpcs will be sent.
exec_rpcs_complete_barrier_.Wait();

// Iterate through all instance exec statuses, and use each fragment's AuxErrorInfo
// to possibly blacklist any "faulty" nodes.
UpdateBlacklistWithAuxErrorInfo(request);

// Transition the status if we're not already in a terminal state. This won't block
// because either this transitions to an ERROR state or the query is already in
// a terminal state.
Expand Down Expand Up @@ -855,6 +870,64 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
return IsExecuting() ? Status::OK() : Status::CANCELLED;
}

void Coordinator::UpdateBlacklistWithAuxErrorInfo(
const ReportExecStatusRequestPB& request) {
// If the Backend failed due to a RPC failure, blacklist the destination node of
// the failed RPC. Only blacklist one node per ReportExecStatusRequestPB to avoid
// blacklisting nodes too aggressively. Currently, only blacklist the first node
// that contains a valid RPCErrorInfoPB object.
for (auto instance_exec_status : request.instance_exec_status()) {
if (instance_exec_status.has_aux_error_info()
&& instance_exec_status.aux_error_info().has_rpc_error_info()) {
RPCErrorInfoPB rpc_error_info =
instance_exec_status.aux_error_info().rpc_error_info();
DCHECK(rpc_error_info.has_dest_node());
DCHECK(rpc_error_info.has_posix_error_code());
const NetworkAddressPB& dest_node = rpc_error_info.dest_node();

auto dest_node_and_be_state =
addr_to_backend_state_.find(FromNetworkAddressPB(dest_node));

// If the target address of the RPC is not known to the Coordinator, it cannot
// be blacklisted.
if (dest_node_and_be_state == addr_to_backend_state_.end()) {
string err_msg = "Query failed due to a failed RPC to an unknown target address "
+ NetworkAddressPBToString(dest_node);
DCHECK(false) << err_msg;
LOG(ERROR) << err_msg;
continue;
}

// The execution parameters of the destination node for the failed RPC.
const BackendExecParams* dest_node_exec_params =
dest_node_and_be_state->second->exec_params();

// The Coordinator for the query should never be blacklisted.
if (dest_node_exec_params->is_coord_backend) {
VLOG_QUERY << "Query failed due to a failed RPC to the Coordinator";
continue;
}

// A set of RPC related posix error codes that should cause the target node
// of the failed RPC to be blacklisted.
static const set<int32_t> blacklistable_rpc_error_codes = {
ENOTCONN, // 107: Transport endpoint is not connected
ESHUTDOWN, // 108: Cannot send after transport endpoint shutdown
ECONNREFUSED // 111: Connection refused
};

if (blacklistable_rpc_error_codes.find(rpc_error_info.posix_error_code())
!= blacklistable_rpc_error_codes.end()) {
LOG(INFO) << "Blacklisting " << NetworkAddressPBToString(dest_node)
<< " because a RPC to it failed.";
ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
dest_node_exec_params->be_desc);
break;
}
}
}
}

int64_t Coordinator::GetMaxBackendStateLagMs(TNetworkAddress* address) {
if (exec_rpcs_complete_barrier_.pending() > 0) {
// Exec() hadn't completed for all the backends, so we can't rely on
Expand Down
17 changes: 17 additions & 0 deletions be/src/runtime/coordinator.h
Expand Up @@ -248,6 +248,13 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
std::vector<BackendState*> backend_states_;

/// A map from the TNetworkAddress of a backend to the BackendState running on the
/// TNetworkAddress. All values are non-nullptr and owned by obj_pool(). The address
/// is the kRPC address (Coordinator::BackendState::krpc_impalad_address) of the
/// Backend. This map is distinct from QuerySchedule::per_backend_exec_params(),
/// which uses the Thrift address as the key rather than the kRPC address.
boost::unordered_map<TNetworkAddress, BackendState*> addr_to_backend_state_;

/// Protects the population of backend_states_ vector (not the BackendState objects).
/// Used when accessing backend_states_ if it's not guaranteed that
/// InitBackendStates() has completed.
Expand Down Expand Up @@ -541,6 +548,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Checks the exec_state_ of the query and returns true if the query is executing.
bool IsExecuting();

/// Helper function for UpdateBackendExecStatus that iterates through the
/// FragmentInstanceExecStatusPB for each fragment and uses AuxErrorInfoPB to check if
/// any nodes should be blacklisted. AuxErrorInfoPB contains additional error
/// information about why the fragment failed, beyond what is available in the
/// ReportExecStatusRequestPB::overall_status field. This method uses information in
/// AuxErrorInfoPB to classify specific nodes as "faulty" and then blacklists them. A
/// node might be considered "faulty" if, for example, a RPC to that node failed, or a
/// fragment on that node failed due to a disk IO error.
void UpdateBlacklistWithAuxErrorInfo(const ReportExecStatusRequestPB& request);

/// BackendState and BackendResourceState are private to the Coordinator class, so mark
/// all tests in CoordinatorBackendStateTest as friends.
friend class CoordinatorBackendStateTest;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/fragment-instance-state.cc
Expand Up @@ -290,6 +290,10 @@ void FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
stateful_report->set_report_seq_no(report_seq_no_);
runtime_state()->GetUnreportedErrors(stateful_report->mutable_error_log());
}
// If set in the RuntimeState, set the AuxErrorInfoPB field.
if (runtime_state()->HasAuxErrorInfo()) {
runtime_state()->GetAuxErrorInfo(instance_status->mutable_aux_error_info());
}
}

void FragmentInstanceState::ReportSuccessful(
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/krpc-data-stream-sender.cc
Expand Up @@ -443,6 +443,10 @@ void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn,
MonoDelta::FromMilliseconds(FLAGS_rpc_retry_interval_ms));
return;
}
// If the RPC failed due to a network error, set the RPC error info in RuntimeState.
if (controller_status.IsNetworkError()) {
parent_->state_->SetRPCErrorInfo(address_, controller_status.posix_code());
}
MarkDone(FromKuduStatus(controller_status, prepend));
}

Expand Down
19 changes: 19 additions & 0 deletions be/src/runtime/runtime-state.cc
Expand Up @@ -311,6 +311,25 @@ void RuntimeState::ReleaseResources() {
released_resources_ = true;
}

void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code) {
boost::lock_guard<SpinLock> l(aux_error_info_lock_);
if (aux_error_info_ == nullptr) {
aux_error_info_.reset(new AuxErrorInfoPB());
RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info();
NetworkAddressPB* network_addr = rpc_error_info->mutable_dest_node();
network_addr->set_hostname(dest_node.hostname);
network_addr->set_port(dest_node.port);
rpc_error_info->set_posix_error_code(posix_error_code);
}
}

void RuntimeState::GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info) {
boost::lock_guard<SpinLock> l(aux_error_info_lock_);
if (aux_error_info_ != nullptr) {
aux_error_info->CopyFrom(*aux_error_info_);
}
}

const std::string& RuntimeState::GetEffectiveUser() const {
return impala::GetEffectiveUser(query_ctx().session);
}
Expand Down
27 changes: 27 additions & 0 deletions be/src/runtime/runtime-state.h
Expand Up @@ -302,6 +302,26 @@ class RuntimeState {
/// Release resources and prepare this object for destruction. Can only be called once.
void ReleaseResources();

/// If the fragment instance associated with this RuntimeState failed due to a RPC
/// failure, use this method to set the network address of the RPC's target node and
/// the posix error code of the failed RPC. The target node address and posix error code
/// will be included in the AuxErrorInfo returned by GetAuxErrorInfo. This method is
/// idempotent.
void SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code);

/// Returns true if this RuntimeState has any auxiliary error information, false
/// otherwise. Currently, only SetRPCErrorInfo() sets aux error info.
bool HasAuxErrorInfo() {
boost::lock_guard<SpinLock> l(aux_error_info_lock_);
return aux_error_info_ != nullptr;
}

/// Sets the given AuxErrorInfoPB with all relevant aux error info from the fragment
/// instance associated with this RuntimeState. If no aux error info for this
/// RuntimeState has been set, this method does nothing. Currently, only
/// SetRPCErrorInfo() sets aux error info.
void GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info);

static const char* LLVM_CLASS_NAME;

private:
Expand Down Expand Up @@ -414,6 +434,13 @@ class RuntimeState {
/// nodes that share this runtime state.
boost::scoped_ptr<RuntimeFilterBank> filter_bank_;

/// Lock protecting aux_error_info_.
SpinLock aux_error_info_lock_;

/// Auxiliary error information, only set if the fragment instance failed (e.g.
/// query_status_ != Status::OK()). Owned by this RuntimeState.
std::unique_ptr<AuxErrorInfoPB> aux_error_info_;

/// prohibit copies
RuntimeState(const RuntimeState&);

Expand Down
13 changes: 13 additions & 0 deletions be/src/util/network-util.cc
Expand Up @@ -167,6 +167,19 @@ string TNetworkAddressToString(const TNetworkAddress& address) {
return ss.str();
}

string NetworkAddressPBToString(const NetworkAddressPB& address) {
stringstream ss;
ss << address.hostname() << ":" << dec << address.port();
return ss.str();
}

TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address) {
TNetworkAddress t_address;
t_address.__set_hostname(address.hostname());
t_address.__set_port(address.port());
return t_address;
}

/// Pick a random port in the range of ephemeral ports
/// https://tools.ietf.org/html/rfc6335
int FindUnusedEphemeralPort() {
Expand Down
7 changes: 7 additions & 0 deletions be/src/util/network-util.h
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "common/status.h"
#include "gen-cpp/common.pb.h"
#include "gen-cpp/StatestoreService_types.h"
#include "gen-cpp/Types_types.h"
#include <vector>
Expand Down Expand Up @@ -64,6 +65,12 @@ bool IsWildcardAddress(const std::string& ipaddress);
/// Utility method to print address as address:port
std::string TNetworkAddressToString(const TNetworkAddress& address);

/// Utility method to print a NetworkAddressPB as address:port.
std::string NetworkAddressPBToString(const NetworkAddressPB& address);

/// Utility method to convert a NetworkAddressPB to a TNetworkAddress.
TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address);

/// Utility method to convert TNetworkAddress to Kudu sock addr.
/// Note that 'address' has to contain a resolved IP address.
Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
Expand Down
6 changes: 6 additions & 0 deletions common/protobuf/common.proto
Expand Up @@ -21,6 +21,12 @@ syntax="proto2";

package impala;

// Refer to Types.thrift for documentation.
message NetworkAddressPB {
required string hostname = 1;
required int32 port = 2;
}

// Proto-serialized version of Impala's Status object.
message StatusPB {
optional int32 status_code = 1;
Expand Down
23 changes: 23 additions & 0 deletions common/protobuf/control_service.proto
Expand Up @@ -124,6 +124,25 @@ message StatefulStatusPB {
map<int32, ErrorLogEntryPB> error_log = 2;
}

// RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a
// RPC to another node failed.
message RPCErrorInfoPB {
// The address of the RPC's target node.
required NetworkAddressPB dest_node = 1;

// The posix error code of the failed RPC.
required int32 posix_error_code = 2;
}

// Error metadata that can be associated with a failed fragment instance. Used to store
// extra info about errors encountered during fragment execution. This information is
// used by the Coordinator to blacklist potentially unhealthy nodes.
message AuxErrorInfoPB {
// Set if the fragment instance failed because a RPC to another node failed. Only set
// if the RPC failed due to a network error.
optional RPCErrorInfoPB rpc_error_info = 1;
}

message FragmentInstanceExecStatusPB {
// Sequence number prevents out-of-order or duplicated updates from being applied.
optional int64 report_seq_no = 1;
Expand All @@ -144,6 +163,10 @@ message FragmentInstanceExecStatusPB {
// The non-idempotent parts of the report, and any prior reports that are not known to
// have been received by the coordinator.
repeated StatefulStatusPB stateful_report = 6;

// Metadata associated with a failed fragment instance. Only set for failed fragment
// instances.
optional AuxErrorInfoPB aux_error_info = 7;
}

message ReportExecStatusRequestPB {
Expand Down
37 changes: 37 additions & 0 deletions tests/custom_cluster/test_blacklist.py
Expand Up @@ -20,6 +20,7 @@
import pytest
import re

from beeswaxd.BeeswaxService import QueryState
from tests.common.skip import SkipIfNotHdfsMinicluster
from time import sleep

Expand Down Expand Up @@ -113,3 +114,39 @@ def test_restart_impalad(self, cursor):
assert re.search("Blacklisted Executors: (.*)", result.runtime_profile) is None, \
result.runtime_profile
assert re.search("NumBackends: 3", result.runtime_profile), result.runtime_profile

@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
def test_kill_impalad_with_running_queries(self, cursor):
"""Verifies that when an Impala executor is killed while running a query, that the
Coordinator blacklists the killed executor."""

# Run a query asynchronously. Normally, this query should take a few seconds to
# complete.
query = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
where t1.l_orderkey = t2.l_orderkey"
handle = self.execute_query_async(query)

# Wait for the query to start running
self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 10)

# Kill one of the Impala executors
killed_impalad = self.cluster.impalads[2]
killed_impalad.kill()

# Try to fetch results from the query. Fetch requests should fail because one of the
# impalads running the query was killed. When the query fails, the Coordinator should
# add the killed Impala executor to the blacklist (since a RPC to that node failed).
try:
self.client.fetch(query, handle)
assert False, "Query was expected to fail"
except Exception as e:
# The query should fail due to an RPC error.
assert "TransmitData() to " in str(e) or "EndDataStream() to " in str(e)

# Run another query which should succeed and verify the impalad was blacklisted.
result = self.execute_query("select count(*) from tpch.lineitem")
match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
assert match is not None and match.group(1) == "%s:%s" % \
(killed_impalad.hostname, killed_impalad.service.be_port), \
result.runtime_profile

0 comments on commit 8a4fece

Please sign in to comment.