Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Commit

Permalink
shutdown-and-flushing: Improve shutdown, support/test FlushHtml
Browse files Browse the repository at this point in the history
- Improve shutting down both gracefully and quickly
- Set the flush flags on buffers when FlushHtml is on for optimized
  html responses
- Add tests for these features
- Don't call proxy_fetch->Flush() each time our body filter is called,
  because that would only make sense if we'd also set buf->flush.

Needs a support call in PSOL to work to cancel outstanding proxyfetches:

```diff
Index: proxy_fetch.cc
===================================================================
--- proxy_fetch.cc      (revision 4597)
+++ proxy_fetch.cc      (working copy)
@@ -90,6 +90,19 @@
             << " outstanding requests.";
 }

+
+void ProxyFetchFactory::CancelOutstanding() {
+  ScopedMutex lock(outstanding_proxy_fetches_mutex_.get());
+
+  for (std::set<ProxyFetch*>::iterator i = outstanding_proxy_fetches_.begin(); i !=  outstanding_proxy_fetches_.end(); i++) {
+    ProxyFetch* proxy_fetch = *i;
+    proxy_fetch->Done(false);
+  }
+
+  outstanding_proxy_fetches_.clear();
+}
+
+
 ProxyFetch* ProxyFetchFactory::CreateNewProxyFetch(
     const GoogleString& url_in, AsyncFetch* async_fetch,
     RewriteDriver* driver,
Index: proxy_fetch.h
===================================================================
--- proxy_fetch.h       (revision 4597)
+++ proxy_fetch.h       (working copy)
@@ -64,6 +64,7 @@
   explicit ProxyFetchFactory(ServerContext* server_context);
   ~ProxyFetchFactory();

+  void CancelOutstanding();
   // Convenience method that calls CreateNewProxyFetch and then StartFetch() on
   // the resulting fetch.
   void StartNewProxyFetch(
```
  • Loading branch information
oschaaf committed Mar 16, 2015
1 parent 5ea8fb9 commit 14460b2
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 109 deletions.
121 changes: 86 additions & 35 deletions src/ngx_base_fetch.cc
Expand Up @@ -36,13 +36,36 @@ const char kFlush = 'F';
const char kDone = 'D';

NgxEventConnection* NgxBaseFetch::event_connection = NULL;
// We'll set this event to uncancelable to prevent nginx from exiting before we
// are up for it. See ngx_worker_process_cycle().
ngx_event_t* NgxBaseFetch::shutdown_event = NULL;
int NgxBaseFetch::active_base_fetches = 0;
int NgxBaseFetch::request_ctx_count = 0;


// Periodically checks whether the conditions are met for ::Terminate() to be
// called:
// - A forced/quick shutdown, e.g. SIGTERM was received.
// - A graceful shutdown, e.g. SIGQUIT was recevied, after all outstanding
// work as been finished.
// When one of those conditions is met, ev->cancelable will have been set to
// false by us, and nginx will end up calling ::Terminate() next after calling
// Done(false) on any outstanding proxy fetches.
// Terminate() will clear out any pending events to make sure we release all
// associated NgxBaseFetch instances.
static void ps_base_fetch_shutdown_event_handler(ngx_event_t *ev) {
NgxBaseFetch::IndicateShutdownIsOK(false /* force */);
if (!ev->cancelable) {
ngx_add_timer(ev, 1000);
}
}

NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
NgxServerContext* server_context,
const RequestContextPtr& request_ctx,
PreserveCachingHeaders preserve_caching_headers,
NgxBaseFetchType base_fetch_type)
NgxBaseFetchType base_fetch_type,
bool flush)
: AsyncFetch(request_ctx),
request_(r),
server_context_(server_context),
Expand All @@ -52,7 +75,8 @@ NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
base_fetch_type_(base_fetch_type),
preserve_caching_headers_(preserve_caching_headers),
detached_(false),
suppress_(false) {
suppress_(false),
flush_(flush) {
if (pthread_mutex_init(&mutex_, NULL)) CHECK(0);
__sync_add_and_fetch(&NgxBaseFetch::active_base_fetches, 1);
}
Expand All @@ -65,32 +89,43 @@ NgxBaseFetch::~NgxBaseFetch() {
bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
CHECK(event_connection == NULL) << "event connection already set";
event_connection = new NgxEventConnection(ReadCallback);

shutdown_event = reinterpret_cast<ngx_event_t*>(
ngx_pcalloc(cycle->pool, sizeof(ngx_event_t)));
shutdown_event->handler = ps_base_fetch_shutdown_event_handler;

shutdown_event->data = cycle;
shutdown_event->log = cycle->log;
// Prevents nginx from exiting until we are up for it.
shutdown_event->cancelable = 0;
ngx_add_timer(shutdown_event, 1000);

return event_connection->Init(cycle);
}

void NgxBaseFetch::Terminate() {
if (event_connection != NULL) {
GoogleMessageHandler handler;
PosixTimer timer;
int64 timeout_us = Timer::kSecondUs * 30;
// A second should be more then enough.
int64 timeout_us = Timer::kSecondUs;
int64 end_us = timer.NowUs() + timeout_us;
static unsigned int sleep_microseconds = 100;

handler.Message(
kInfo,"NgxBaseFetch::Terminate rounding up %d active base fetches.",
NgxBaseFetch::active_base_fetches);

// Try to continue processing and get the active base fetch count to 0
// untill the timeout expires.
// TODO(oschaaf): This needs more work.
// Drain any events after a quick or graceful shutdown to clear out
// associated NgxBaseFetch instances.
while (NgxBaseFetch::active_base_fetches > 0 && end_us > timer.NowUs()) {
event_connection->Drain();
usleep(sleep_microseconds);
}

if (NgxBaseFetch::active_base_fetches != 0) {
handler.Message(
kWarning,"NgxBaseFetch::Terminate timed out with %d active base fetches.",
kWarning,"NgxBaseFetch::Terminate exits with %d active base fetches.",
NgxBaseFetch::active_base_fetches);
}

Expand All @@ -99,6 +134,8 @@ void NgxBaseFetch::Terminate() {
delete event_connection;
event_connection = NULL;
}

NgxBaseFetch::IndicateShutdownIsOK(true /*force*/);
}

const char* BaseFetchTypeToCStr(NgxBaseFetchType type) {
Expand Down Expand Up @@ -131,36 +168,38 @@ void NgxBaseFetch::ReadCallback(const ps_event_data& data) {

// If we ended up destructing the base fetch, or the request context is
// detached, skip this event.
if (refcount == 0 || detached) {
return;
}
ps_request_ctx_t* ctx = ps_get_request_context(r);

CHECK(data.sender == ctx->base_fetch);
CHECK(r->count > 0) << "r->count: " << r->count;

int rc;
// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if ((ctx->base_fetch->base_fetch_type_ != kIproLookup)
&& r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
rc = NGX_ERROR;
} else {
rc = ps_base_fetch::ps_base_fetch_handler(r);
}

if (refcount != 0 && !detached) {
ps_request_ctx_t* ctx = ps_get_request_context(r);

CHECK(data.sender == ctx->base_fetch);
CHECK(r->count > 0) << "r->count: " << r->count;

int rc;
// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if ((ctx->base_fetch->base_fetch_type_ != kIproLookup)
&& r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
rc = NGX_ERROR;
} else {
rc = ps_base_fetch::ps_base_fetch_handler(r);
}
#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
#endif
if (ngx_terminate) {
return;
}

ngx_connection_t* c = r->connection;
ngx_http_finalize_request(r, rc);
// See http://forum.nginx.org/read.php?2,253006,253061
ngx_http_run_posted_requests(c);
ngx_connection_t* c = r->connection;
ngx_http_finalize_request(r, rc);
// See http://forum.nginx.org/read.php?2,253006,253061
ngx_http_run_posted_requests(c);
}
NgxBaseFetch::IndicateShutdownIsOK(false /* force */);
}

void NgxBaseFetch::Lock() {
Expand Down Expand Up @@ -191,7 +230,8 @@ ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) {
}

int rc = string_piece_to_buffer_chain(
request_->pool, buffer_, link_ptr, done_called_ /* send_last_buf */);
request_->pool, buffer_, link_ptr, done_called_ /* send_last_buf */,
flush_);
if (rc != NGX_OK) {
return rc;
}
Expand Down Expand Up @@ -290,6 +330,17 @@ int NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
return r;
}

void NgxBaseFetch::IndicateShutdownIsOK(bool force) {
if (force || ((ngx_quit || ngx_exiting) &&
NgxBaseFetch::request_ctx_count == 0)) {
if (NgxBaseFetch::shutdown_event != NULL) {
NgxBaseFetch::shutdown_event->cancelable = 1;
NgxBaseFetch::shutdown_event = NULL;
}
}
}


void NgxBaseFetch::HandleDone(bool success) {
// TODO(jefftk): it's possible that instead of locking here we can just modify
// CopyBufferToNginx to only read done_called_ once.
Expand Down
7 changes: 5 additions & 2 deletions src/ngx_base_fetch.h
Expand Up @@ -76,7 +76,7 @@ class NgxBaseFetch : public AsyncFetch {
NgxBaseFetch(ngx_http_request_t* r, NgxServerContext* server_context,
const RequestContextPtr& request_ctx,
PreserveCachingHeaders preserve_caching_headers,
NgxBaseFetchType base_fetch_type);
NgxBaseFetchType base_fetch_type, bool flush);
virtual ~NgxBaseFetch();

// Statically initializes event_connection, require for PSOL and nginx to
Expand Down Expand Up @@ -118,11 +118,13 @@ class NgxBaseFetch : public AsyncFetch {
// this to be able to handle events which nginx request context has been
// released while the event was in-flight.
void Detach() { detached_ = true; DecrementRefCount(); }
static void IndicateShutdownIsOK(bool force);

bool detached() { return detached_; }

ngx_http_request_t* request() { return request_; }
NgxBaseFetchType base_fetch_type() { return base_fetch_type_; }
static int request_ctx_count;

private:
virtual bool HandleWrite(const StringPiece& sp, MessageHandler* handler);
Expand All @@ -149,8 +151,8 @@ class NgxBaseFetch : public AsyncFetch {
// Called by Done() and Release(). Decrements our reference count, and if
// it's zero we delete ourself.
int DecrefAndDeleteIfUnreferenced();

static NgxEventConnection* event_connection;
static ngx_event_t* shutdown_event;

// Live count of NgxBaseFetch instances that are currently in use.
static int active_base_fetches;
Expand All @@ -171,6 +173,7 @@ class NgxBaseFetch : public AsyncFetch {
// Set to true just before the nginx side releases its reference
bool detached_;
bool suppress_;
bool flush_;

DISALLOW_COPY_AND_ASSIGN(NgxBaseFetch);
};
Expand Down

0 comments on commit 14460b2

Please sign in to comment.