Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Commit

Permalink
shutdown-and-flushing: Improve shutdown, support/test FlushHtml
Browse files Browse the repository at this point in the history
- Improve shutting down both gracefully and quickly
- Set the flush flags on buffers when FlushHtml is on for optimized
  html responses
- Add tests for these features
- Don't call proxy_fetch->Flush() each time our body filter is called,
  because that would only make sense if we'd also set buf->flush.

Needs a support call in PSOL to work to cancel outstanding proxyfetches:

```diff
Index: proxy_fetch.cc
===================================================================
--- proxy_fetch.cc      (revision 4597)
+++ proxy_fetch.cc      (working copy)
@@ -90,6 +90,19 @@
             << " outstanding requests.";
 }

+
+void ProxyFetchFactory::CancelOutstanding() {
+  ScopedMutex lock(outstanding_proxy_fetches_mutex_.get());
+
+  for (std::set<ProxyFetch*>::iterator i = outstanding_proxy_fetches_.begin(); i !=  outstanding_proxy_fetches_.end(); i++) {
+    ProxyFetch* proxy_fetch = *i;
+    proxy_fetch->Done(false);
+  }
+
+  outstanding_proxy_fetches_.clear();
+}
+
+
 ProxyFetch* ProxyFetchFactory::CreateNewProxyFetch(
     const GoogleString& url_in, AsyncFetch* async_fetch,
     RewriteDriver* driver,
Index: proxy_fetch.h
===================================================================
--- proxy_fetch.h       (revision 4597)
+++ proxy_fetch.h       (working copy)
@@ -64,6 +64,7 @@
   explicit ProxyFetchFactory(ServerContext* server_context);
   ~ProxyFetchFactory();

+  void CancelOutstanding();
   // Convenience method that calls CreateNewProxyFetch and then StartFetch() on
   // the resulting fetch.
   void StartNewProxyFetch(
```
  • Loading branch information
oschaaf committed Mar 25, 2015
1 parent 5ea8fb9 commit 2dcaf7b
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 130 deletions.
126 changes: 89 additions & 37 deletions src/ngx_base_fetch.cc
Expand Up @@ -16,6 +16,7 @@

// Author: jefftk@google.com (Jeff Kaufman)
#include <unistd.h> //for usleep
#include <inttypes.h> // PRId64

#include "ngx_base_fetch.h"
#include "ngx_event_connection.h"
Expand All @@ -36,7 +37,29 @@ const char kFlush = 'F';
const char kDone = 'D';

NgxEventConnection* NgxBaseFetch::event_connection = NULL;
int NgxBaseFetch::active_base_fetches = 0;
// We'll set this event to uncancelable to prevent nginx from exiting before we
// are up for it. See ngx_worker_process_cycle().
ngx_event_t* NgxBaseFetch::shutdown_event = NULL;
int64 NgxBaseFetch::active_base_fetches = 0;
int64 NgxBaseFetch::request_ctx_count = 0;


// Periodically checks whether the conditions are met for ::Terminate() to be
// called:
// - A forced/quick shutdown, e.g. SIGTERM was received.
// - A graceful shutdown, e.g. SIGQUIT was recevied, after all outstanding
// work as been finished.
// When one of those conditions is met, ev->cancelable will have been set to
// false by us, and nginx will end up calling ::Terminate() next after calling
// Done(false) on any outstanding proxy fetches.
// Terminate() will clear out any pending events to make sure we release all
// associated NgxBaseFetch instances.
static void ps_base_fetch_shutdown_event_handler(ngx_event_t *ev) {
NgxBaseFetch::CheckShutdownEvent();
if (!ev->cancelable) {
ngx_add_timer(ev, 1000);
}
}

NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
NgxServerContext* server_context,
Expand Down Expand Up @@ -65,40 +88,56 @@ NgxBaseFetch::~NgxBaseFetch() {
bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
CHECK(event_connection == NULL) << "event connection already set";
event_connection = new NgxEventConnection(ReadCallback);

shutdown_event = reinterpret_cast<ngx_event_t*>(
ngx_pcalloc(cycle->pool, sizeof(ngx_event_t)));
shutdown_event->handler = ps_base_fetch_shutdown_event_handler;

shutdown_event->data = cycle;
shutdown_event->log = cycle->log;
// Prevents nginx from exiting until we are up for it.
shutdown_event->cancelable = 0;
ngx_add_timer(shutdown_event, 1000);

return event_connection->Init(cycle);
}

void NgxBaseFetch::Terminate() {
if (event_connection != NULL) {
GoogleMessageHandler handler;
PosixTimer timer;
int64 timeout_us = Timer::kSecondUs * 30;
// A second should be more then enough.
int64 timeout_us = Timer::kSecondUs;
int64 end_us = timer.NowUs() + timeout_us;
static unsigned int sleep_microseconds = 100;

handler.Message(
kInfo,"NgxBaseFetch::Terminate rounding up %d active base fetches.",
kInfo,"NgxBaseFetch::Terminate start: %" PRId64 " base fetches.",
NgxBaseFetch::active_base_fetches);

// Try to continue processing and get the active base fetch count to 0
// untill the timeout expires.
// TODO(oschaaf): This needs more work.
// Drain any events after a quick or graceful shutdown to clear out
// associated NgxBaseFetch instances.
while (NgxBaseFetch::active_base_fetches > 0 && end_us > timer.NowUs()) {
event_connection->Drain();
usleep(sleep_microseconds);
}

if (NgxBaseFetch::active_base_fetches != 0) {
DCHECK_EQ(NgxBaseFetch::active_base_fetches, 0);
DCHECK_EQ(NgxBaseFetch::request_ctx_count, 0);

if (NgxBaseFetch::active_base_fetches != 0 || NgxBaseFetch::request_ctx_count) {
handler.Message(
kWarning,"NgxBaseFetch::Terminate timed out with %d active base fetches.",
NgxBaseFetch::active_base_fetches);
kWarning,"NgxBaseFetch::Terminate exit: %" PRId64 " base fetches, "
"%" PRId64 " request contexts", NgxBaseFetch::active_base_fetches,
NgxBaseFetch::request_ctx_count);
}

// Close down the named pipe.
event_connection->Shutdown();
delete event_connection;
event_connection = NULL;
}
NgxBaseFetch::CheckShutdownEvent();
}

const char* BaseFetchTypeToCStr(NgxBaseFetchType type) {
Expand Down Expand Up @@ -131,36 +170,38 @@ void NgxBaseFetch::ReadCallback(const ps_event_data& data) {

// If we ended up destructing the base fetch, or the request context is
// detached, skip this event.
if (refcount == 0 || detached) {
return;
}
ps_request_ctx_t* ctx = ps_get_request_context(r);

CHECK(data.sender == ctx->base_fetch);
CHECK(r->count > 0) << "r->count: " << r->count;

int rc;
// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if ((ctx->base_fetch->base_fetch_type_ != kIproLookup)
&& r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
rc = NGX_ERROR;
} else {
rc = ps_base_fetch::ps_base_fetch_handler(r);
}

if (refcount != 0 && !detached) {
ps_request_ctx_t* ctx = ps_get_request_context(r);

CHECK(data.sender == ctx->base_fetch);
CHECK(r->count > 0) << "r->count: " << r->count;

int rc;
// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if ((ctx->base_fetch->base_fetch_type_ != kIproLookup)
&& r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
rc = NGX_ERROR;
} else {
rc = ps_base_fetch::ps_base_fetch_handler(r);
}
#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
#endif
if (ngx_terminate) {
return;
}

ngx_connection_t* c = r->connection;
ngx_http_finalize_request(r, rc);
// See http://forum.nginx.org/read.php?2,253006,253061
ngx_http_run_posted_requests(c);
ngx_connection_t* c = r->connection;
ngx_http_finalize_request(r, rc);
// See http://forum.nginx.org/read.php?2,253006,253061
ngx_http_run_posted_requests(c);
}
NgxBaseFetch::CheckShutdownEvent();
}

void NgxBaseFetch::Lock() {
Expand Down Expand Up @@ -191,7 +232,8 @@ ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) {
}

int rc = string_piece_to_buffer_chain(
request_->pool, buffer_, link_ptr, done_called_ /* send_last_buf */);
request_->pool, buffer_, link_ptr, done_called_ /* send_last_buf */,
true);
if (rc != NGX_OK) {
return rc;
}
Expand Down Expand Up @@ -290,6 +332,16 @@ int NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
return r;
}

void NgxBaseFetch::CheckShutdownEvent() {
bool quit = ngx_terminate || ((ngx_quit || ngx_exiting) &&
NgxBaseFetch::request_ctx_count == 0);
if (NgxBaseFetch::shutdown_event != NULL && quit) {
NgxBaseFetch::shutdown_event->cancelable = 1;
NgxBaseFetch::shutdown_event = NULL;
}
}


void NgxBaseFetch::HandleDone(bool success) {
// TODO(jefftk): it's possible that instead of locking here we can just modify
// CopyBufferToNginx to only read done_called_ once.
Expand Down
6 changes: 4 additions & 2 deletions src/ngx_base_fetch.h
Expand Up @@ -118,11 +118,13 @@ class NgxBaseFetch : public AsyncFetch {
// this to be able to handle events which nginx request context has been
// released while the event was in-flight.
void Detach() { detached_ = true; DecrementRefCount(); }
static void CheckShutdownEvent();

bool detached() { return detached_; }

ngx_http_request_t* request() { return request_; }
NgxBaseFetchType base_fetch_type() { return base_fetch_type_; }
static int64 request_ctx_count;

private:
virtual bool HandleWrite(const StringPiece& sp, MessageHandler* handler);
Expand All @@ -149,11 +151,11 @@ class NgxBaseFetch : public AsyncFetch {
// Called by Done() and Release(). Decrements our reference count, and if
// it's zero we delete ourself.
int DecrefAndDeleteIfUnreferenced();

static NgxEventConnection* event_connection;
static ngx_event_t* shutdown_event;

// Live count of NgxBaseFetch instances that are currently in use.
static int active_base_fetches;
static int64 active_base_fetches;

ngx_http_request_t* request_;
GoogleString buffer_;
Expand Down
6 changes: 4 additions & 2 deletions src/ngx_fetch.cc
Expand Up @@ -508,11 +508,13 @@ bool NgxFetch::ParseUrl() {
url_.url.len = str_url_.length();
url_.url.data = static_cast<u_char*>(ngx_palloc(pool_, url_.url.len));
if (url_.url.data == NULL) {
DCHECK(false) << "NgxFetch::ParseUrl() without data";
return false;
}
str_url_.copy(reinterpret_cast<char*>(url_.url.data), str_url_.length(), 0);

return NgxUrlAsyncFetcher::ParseUrl(&url_, pool_);
bool r = NgxUrlAsyncFetcher::ParseUrl(&url_, pool_);
DCHECK(r) << "NgxUrlAsyncFetcher::ParseUrl(&url_, pool_) failed";
return r;
}

// Issue a request after the resolver is done
Expand Down

0 comments on commit 2dcaf7b

Please sign in to comment.