Skip to content

Commit

Permalink
Remeve memset(0) from arena allocated memory.
Browse files Browse the repository at this point in the history
Callers are updated to properly initialize the memory.

This behavior can be overridden using GRPC_ARENA_INIT_STRATEGY
environment variable.
  • Loading branch information
soheilhy committed Nov 5, 2018
1 parent 5e6c449 commit 48e4a81
Show file tree
Hide file tree
Showing 44 changed files with 1,408 additions and 1,242 deletions.
10 changes: 10 additions & 0 deletions doc/environment_variables.md
Expand Up @@ -140,3 +140,13 @@ some configuration as environment variables that can be set.
* grpc_cfstream
set to 1 to turn on CFStream experiment. With this experiment gRPC uses CFStream API to make TCP
connections. The option is only available on iOS platform and when macro GRPC_CFSTREAM is defined.

* GRPC_ARENA_INIT_STRATEGY
Selects the initialization strategy for blocks allocated in the arena. Valid
values are:
- no_init (default): Do not inialize the arena block.
- zero_init: Initialize the arena blocks with 0.
- non_zero_init: Initialize the arena blocks with a non-zero value.

NOTE: This environment variable is experimental and will be removed. Thus, it
should not be relied upon.
270 changes: 153 additions & 117 deletions src/core/ext/filters/client_channel/client_channel.cc

Large diffs are not rendered by default.

Expand Up @@ -286,10 +286,9 @@ HealthCheckClient::CallState::CallState(
health_check_client_(std::move(health_check_client)),
pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
arena_(gpr_arena_create(health_check_client_->connected_subchannel_
->GetInitialCallSizeEstimate(0))) {
memset(&call_combiner_, 0, sizeof(call_combiner_));
->GetInitialCallSizeEstimate(0))),
payload_(context_) {
grpc_call_combiner_init(&call_combiner_);
memset(context_, 0, sizeof(context_));
gpr_atm_rel_store(&seen_response_, static_cast<gpr_atm>(0));
}

Expand Down
Expand Up @@ -97,7 +97,7 @@ class HealthCheckClient

gpr_arena* arena_;
grpc_call_combiner call_combiner_;
grpc_call_context_element context_[GRPC_CONTEXT_COUNT];
grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};

// The streaming call to the backend. Always non-NULL.
grpc_subchannel_call* call_;
Expand Down
12 changes: 6 additions & 6 deletions src/core/ext/filters/client_channel/lb_policy.h
Expand Up @@ -63,29 +63,29 @@ class LoadBalancingPolicy
/// State used for an LB pick.
struct PickState {
/// Initial metadata associated with the picking call.
grpc_metadata_batch* initial_metadata;
grpc_metadata_batch* initial_metadata = nullptr;
/// Bitmask used for selective cancelling. See
/// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in
/// grpc_types.h.
uint32_t initial_metadata_flags;
uint32_t initial_metadata_flags = 0;
/// Storage for LB token in \a initial_metadata, or nullptr if not used.
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
/// If null, pick will fail if a result is not available synchronously.
grpc_closure* on_complete;
grpc_closure* on_complete = nullptr;
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if
/// needed.
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT] = {};
/// Upon success, \a *user_data will be set to whatever opaque information
/// may need to be propagated from the LB policy, or nullptr if not needed.
// TODO(roth): As part of revamping our metadata APIs, try to find a
// way to clean this up and C++-ify it.
void** user_data;
void** user_data = nullptr;
/// Next pointer. For internal use by LB policy.
PickState* next;
PickState* next = nullptr;
};

// Not copyable nor movable.
Expand Down
Expand Up @@ -37,16 +37,27 @@ static void destroy_channel_elem(grpc_channel_element* elem) {}
namespace {

struct call_data {
call_data(const grpc_call_element_args& args) {
if (args.context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
// Get stats object from context and take a ref.
client_stats = static_cast<grpc_core::GrpcLbClientStats*>(
args.context[GRPC_GRPCLB_CLIENT_STATS].value)
->Ref();
// Record call started.
client_stats->AddCallStarted();
}
}

// Stats object to update.
grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
// State for intercepting send_initial_metadata.
grpc_closure on_complete_for_send;
grpc_closure* original_on_complete_for_send;
bool send_initial_metadata_succeeded;
bool send_initial_metadata_succeeded = false;
// State for intercepting recv_initial_metadata.
grpc_closure recv_initial_metadata_ready;
grpc_closure* original_recv_initial_metadata_ready;
bool recv_initial_metadata_succeeded;
bool recv_initial_metadata_succeeded = false;
};

} // namespace
Expand All @@ -70,16 +81,8 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {

static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// Get stats object from context and take a ref.
GPR_ASSERT(args->context != nullptr);
if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
calld->client_stats = static_cast<grpc_core::GrpcLbClientStats*>(
args->context[GRPC_GRPCLB_CLIENT_STATS].value)
->Ref();
// Record call started.
calld->client_stats->AddCallStarted();
}
new (elem->call_data) call_data(*args);
return GRPC_ERROR_NONE;
}

Expand All @@ -97,6 +100,7 @@ static void destroy_call_elem(grpc_call_element* elem,
// TODO(roth): Eliminate this once filter stack is converted to C++.
calld->client_stats.reset();
}
calld->~call_data();
}

static void start_transport_stream_op_batch(
Expand Down
15 changes: 9 additions & 6 deletions src/core/ext/filters/client_channel/subchannel.cc
Expand Up @@ -162,12 +162,16 @@ struct grpc_subchannel {
};

struct grpc_subchannel_call {
grpc_subchannel_call(grpc_core::ConnectedSubchannel* connection,
const grpc_core::ConnectedSubchannel::CallArgs& args)
: connection(connection), deadline(args.deadline) {}

grpc_core::ConnectedSubchannel* connection;
grpc_closure* schedule_closure_after_destroy;
grpc_closure* schedule_closure_after_destroy = nullptr;
// state needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready;
grpc_closure* original_recv_trailing_metadata;
grpc_metadata_batch* recv_trailing_metadata;
grpc_metadata_batch* recv_trailing_metadata = nullptr;
grpc_millis deadline;
};

Expand Down Expand Up @@ -905,6 +909,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) {
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy);
connection->Unref(DEBUG_LOCATION, "subchannel_call");
c->~grpc_subchannel_call();
}

void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call,
Expand Down Expand Up @@ -1102,14 +1107,12 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
grpc_subchannel_call** call) {
const size_t allocation_size =
GetInitialCallSizeEstimate(args.parent_data_size);
*call = static_cast<grpc_subchannel_call*>(
gpr_arena_alloc(args.arena, allocation_size));
*call = new (gpr_arena_alloc(args.arena, allocation_size))
grpc_subchannel_call(this, args);
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
RefCountedPtr<ConnectedSubchannel> connection =
Ref(DEBUG_LOCATION, "subchannel_call");
connection.release(); // Ref is passed to the grpc_subchannel_call object.
(*call)->connection = this;
(*call)->deadline = args.deadline;
const grpc_call_element_args call_args = {
callstk, /* call_stack */
nullptr, /* server_transport_data */
Expand Down
42 changes: 19 additions & 23 deletions src/core/ext/filters/deadline/deadline_filter.cc
Expand Up @@ -27,6 +27,7 @@
#include <grpc/support/time.h>

#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel_init.h"
Expand Down Expand Up @@ -152,7 +153,11 @@ static void inject_recv_trailing_metadata_ready(
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
bool in_call_combiner;
start_timer_after_init_state(grpc_call_element* elem, grpc_millis deadline)
: elem(elem), deadline(deadline) {}
~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); }

bool in_call_combiner = false;
grpc_call_element* elem;
grpc_millis deadline;
grpc_closure closure;
Expand All @@ -171,20 +176,16 @@ static void start_timer_after_init(void* arg, grpc_error* error) {
"scheduling deadline timer");
return;
}
start_timer_if_needed(state->elem, state->deadline);
gpr_free(state);
grpc_core::Delete(state);
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"done scheduling deadline timer");
}

void grpc_deadline_state_init(grpc_call_element* elem,
grpc_call_stack* call_stack,
grpc_call_combiner* call_combiner,
grpc_millis deadline) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
deadline_state->call_stack = call_stack;
deadline_state->call_combiner = call_combiner;
grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
grpc_call_stack* call_stack,
grpc_call_combiner* call_combiner,
grpc_millis deadline)
: call_stack(call_stack), call_combiner(call_combiner) {
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
if (deadline != GRPC_MILLIS_INF_FUTURE) {
Expand All @@ -196,21 +197,14 @@ void grpc_deadline_state_init(grpc_call_element* elem,
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
struct start_timer_after_init_state* state =
static_cast<struct start_timer_after_init_state*>(
gpr_zalloc(sizeof(*state)));
state->elem = elem;
state->deadline = deadline;
grpc_core::New<start_timer_after_init_state>(elem, deadline);
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE);
}
}

void grpc_deadline_state_destroy(grpc_call_element* elem) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
cancel_timer_if_needed(deadline_state);
}
grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }

void grpc_deadline_state_reset(grpc_call_element* elem,
grpc_millis new_deadline) {
Expand Down Expand Up @@ -269,16 +263,18 @@ typedef struct server_call_data {
// Constructor for call_data. Used for both client and server filters.
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
args->deadline);
new (elem->call_data) grpc_deadline_state(
elem, args->call_stack, args->call_combiner, args->deadline);
return GRPC_ERROR_NONE;
}

// Destructor for call_data. Used for both client and server filters.
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
grpc_deadline_state_destroy(elem);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
deadline_state->~grpc_deadline_state();
}

// Method for starting a call op for client filter.
Expand Down
22 changes: 9 additions & 13 deletions src/core/ext/filters/deadline/deadline_filter.h
Expand Up @@ -22,19 +22,23 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"

typedef enum grpc_deadline_timer_state {
enum grpc_deadline_timer_state {
GRPC_DEADLINE_STATE_INITIAL,
GRPC_DEADLINE_STATE_PENDING,
GRPC_DEADLINE_STATE_FINISHED
} grpc_deadline_timer_state;
};

// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
typedef struct grpc_deadline_state {
struct grpc_deadline_state {
grpc_deadline_state(grpc_call_element* elem, grpc_call_stack* call_stack,
grpc_call_combiner* call_combiner, grpc_millis deadline);
~grpc_deadline_state();

// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
grpc_call_combiner* call_combiner;
grpc_deadline_timer_state timer_state;
grpc_deadline_timer_state timer_state = GRPC_DEADLINE_STATE_INITIAL;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when we receive trailing metadata.
Expand All @@ -43,21 +47,13 @@ typedef struct grpc_deadline_state {
// The original recv_trailing_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* original_recv_trailing_metadata_ready;
} grpc_deadline_state;
};

//
// NOTE: All of these functions require that the first field in
// elem->call_data is a grpc_deadline_state.
//

// assumes elem->call_data is zero'd
void grpc_deadline_state_init(grpc_call_element* elem,
grpc_call_stack* call_stack,
grpc_call_combiner* call_combiner,
grpc_millis deadline);

void grpc_deadline_state_destroy(grpc_call_element* elem);

// Cancels the existing timer and starts a new one with new_deadline.
//
// Note: It is generally safe to call this with an earlier deadline
Expand Down

0 comments on commit 48e4a81

Please sign in to comment.