Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix use of Cronet refcount #12281

Merged
merged 3 commits into from
Sep 5, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 59 additions & 16 deletions src/core/ext/transport/cronet/transport/cronet_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,34 @@ struct stream_obj {

/* Mutex to protect storage */
gpr_mu mu;

/* Refcount object of the stream */
grpc_stream_refcount *refcount;
};
typedef struct stream_obj stream_obj;

#ifndef NDEBUG
#define GRPC_CRONET_STREAM_REF(stream, reason) \
grpc_cronet_stream_ref((stream), (reason))
#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
grpc_cronet_stream_unref((exec_ctx), (stream), (reason))
void grpc_cronet_stream_ref(stream_obj *s, const char *reason) {
grpc_stream_ref(s->refcount, reason);
}
void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s,
const char *reason) {
grpc_stream_unref(exec_ctx, s->refcount, reason);
}
#else
#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
grpc_cronet_stream_unref((exec_ctx), (stream))
void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); }
void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) {
grpc_stream_unref(exec_ctx, s->refcount);
}
#endif

static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas);

Expand Down Expand Up @@ -346,13 +371,12 @@ static void remove_from_storage(struct stream_obj *s,
This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function.
*/
static void execute_from_storage(stream_obj *s) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) {
gpr_mu_lock(&s->mu);
for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0);
enum e_op_result result = execute_stream_op(&exec_ctx, curr);
enum e_op_result result = execute_stream_op(exec_ctx, curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string(result));
/* if this op is done, then remove it and free memory */
Expand All @@ -369,14 +393,15 @@ static void execute_from_storage(stream_obj *s) {
}
}
gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
}

/*
Cronet callback
*/
static void on_failed(bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;

stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
Expand All @@ -392,14 +417,18 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
execute_from_storage(&exec_ctx, s);
GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
grpc_exec_ctx_finish(&exec_ctx);
}

/*
Cronet callback
*/
static void on_canceled(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;

stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
Expand All @@ -415,29 +444,36 @@ static void on_canceled(bidirectional_stream *stream) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
execute_from_storage(&exec_ctx, s);
GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
grpc_exec_ctx_finish(&exec_ctx);
}

/*
Cronet callback
*/
static void on_succeeded(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;

stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
execute_from_storage(&exec_ctx, s);
GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
grpc_exec_ctx_finish(&exec_ctx);
}

/*
Cronet callback
*/
static void on_stream_ready(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
gpr_mu_lock(&s->mu);
Expand All @@ -457,7 +493,8 @@ static void on_stream_ready(bidirectional_stream *stream) {
}
}
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
execute_from_storage(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
}

/*
Expand Down Expand Up @@ -513,14 +550,15 @@ static void on_response_headers_received(
s->state.pending_read_from_cronet = true;
}
gpr_mu_unlock(&s->mu);
execute_from_storage(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
execute_from_storage(s);
}

/*
Cronet callback
*/
static void on_write_completed(bidirectional_stream *stream, const char *data) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
Expand All @@ -530,14 +568,16 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
}
s->state.state_callback_received[OP_SEND_MESSAGE] = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
execute_from_storage(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
}

/*
Cronet callback
*/
static void on_read_completed(bidirectional_stream *stream, char *data,
int count) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
Expand All @@ -563,14 +603,15 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
execute_from_storage(&exec_ctx, s);
}
} else {
null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
execute_from_storage(&exec_ctx, s);
}
grpc_exec_ctx_finish(&exec_ctx);
}

/*
Expand Down Expand Up @@ -625,12 +666,11 @@ static void on_response_trailers_received(
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;

gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
} else {
gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
execute_from_storage(s);
execute_from_storage(&exec_ctx, s);
}
grpc_exec_ctx_finish(&exec_ctx);
}

/*
Expand Down Expand Up @@ -1313,6 +1353,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data, gpr_arena *arena) {
stream_obj *s = (stream_obj *)gs;

s->refcount = refcount;
GRPC_CRONET_STREAM_REF(s, "cronet transport");
memset(&s->storage, 0, sizeof(s->storage));
s->storage.head = NULL;
memset(&s->state, 0, sizeof(s->state));
Expand Down Expand Up @@ -1370,7 +1413,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
stream_obj *s = (stream_obj *)gs;
add_to_storage(s, op);
execute_from_storage(s);
execute_from_storage(exec_ctx, s);
}

static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
Expand Down