Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Commit

Permalink
Shutdown: improve shutdown handling
Browse files Browse the repository at this point in the history
- Fix valgrind errors
- Add a test to make sure all logged output looks sane by whitelisting
  current errors/warnings.
- Stop our nginx test instances after we are done testing.
- Add tests for shutting down and reloading configuration under high
  load (depends on ab).
- Reduce the number of keepalive requests in the keepalive tests to speed
  up test runs.
- Fix exiting with open file descriptors, fix cleanup in nginx's cache
  manager/loader processes
- Attempt to finish up queued up NgxBaseFetches/requests on shutdown/reload
- Under valgrind the blocking rewrite started failing after adding a test
  for reloading configuration under high load.
  I've added it to the expected failures for valgrind, looking into this
  is up next.
- Decline in ps_resource_handler when nginx is quitting. This makes us
  more reliable on continued stress during shutdown/reload.
  • Loading branch information
oschaaf committed Feb 10, 2015
1 parent 737e939 commit d7f1c0d
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 18 deletions.
30 changes: 30 additions & 0 deletions src/ngx_base_fetch.cc
Expand Up @@ -15,6 +15,7 @@
*/

// Author: jefftk@google.com (Jeff Kaufman)
#include <unistd.h> //for usleep

#include "ngx_base_fetch.h"
#include "ngx_event_connection.h"
Expand All @@ -25,6 +26,7 @@
#include "net/instaweb/rewriter/public/rewrite_stats.h"
#include "pagespeed/kernel/base/google_message_handler.h"
#include "pagespeed/kernel/base/message_handler.h"
#include "pagespeed/kernel/base/posix_timer.h"
#include "pagespeed/kernel/http/response_headers.h"

namespace net_instaweb {
Expand All @@ -34,6 +36,7 @@ const char kFlush = 'F';
const char kDone = 'D';

NgxEventConnection* NgxBaseFetch::event_connection = NULL;
int NgxBaseFetch::active_base_fetches = 0;

NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
NgxServerContext* server_context,
Expand All @@ -51,10 +54,12 @@ NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
detached_(false),
suppress_(false) {
if (pthread_mutex_init(&mutex_, NULL)) CHECK(0);
__sync_add_and_fetch(&NgxBaseFetch::active_base_fetches, 1);
}

NgxBaseFetch::~NgxBaseFetch() {
pthread_mutex_destroy(&mutex_);
__sync_add_and_fetch(&NgxBaseFetch::active_base_fetches, -1);
}

bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
Expand All @@ -65,6 +70,31 @@ bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {

void NgxBaseFetch::Terminate() {
if (event_connection != NULL) {
GoogleMessageHandler handler;
PosixTimer timer;
int64 timeout_us = Timer::kSecondUs * 30;
int64 end_us = timer.NowUs() + timeout_us;
static unsigned int sleep_microseconds = 100;

handler.Message(
kInfo,"NgxBaseFetch::Terminate rounding up %d active base fetches.",
NgxBaseFetch::active_base_fetches);

// Try to continue processing and get the active base fetch count to 0
// untill the timeout expires.
// TODO(oschaaf): This needs more work.
while (NgxBaseFetch::active_base_fetches > 0 && end_us > timer.NowUs()) {
event_connection->Drain();
usleep(sleep_microseconds);
}

if (NgxBaseFetch::active_base_fetches != 0) {
handler.Message(
kWarning,"NgxBaseFetch::Terminate timed out with %d active base fetches.",
NgxBaseFetch::active_base_fetches);
}

// Close down the named pipe.
event_connection->Shutdown();
delete event_connection;
event_connection = NULL;
Expand Down
9 changes: 9 additions & 0 deletions src/ngx_base_fetch.h
Expand Up @@ -78,11 +78,17 @@ class NgxBaseFetch : public AsyncFetch {
PreserveCachingHeaders preserve_caching_headers,
NgxBaseFetchType base_fetch_type);
virtual ~NgxBaseFetch();

// Statically initializes event_connection, require for PSOL and nginx to
// communicate.
static bool Initialize(ngx_cycle_t* cycle);

// Attempts to finish up request processing queued up in the named pipe and
// PSOL for a fixed amount of time. If time is up, a fast and rough shutdown
// is attempted.
// Statically terminates and NULLS event_connection.
static void Terminate();

static void ReadCallback(const ps_event_data& data);

// Puts a chain in link_ptr if we have any output data buffered. Returns
Expand Down Expand Up @@ -145,6 +151,9 @@ class NgxBaseFetch : public AsyncFetch {
int DecrefAndDeleteIfUnreferenced();

static NgxEventConnection* event_connection;

// Live count of NgxBaseFetch instances that are currently in use.
static int active_base_fetches;

ngx_http_request_t* request_;
GoogleString buffer_;
Expand Down
8 changes: 6 additions & 2 deletions src/ngx_event_connection.cc
Expand Up @@ -159,10 +159,14 @@ bool NgxEventConnection::WriteEvent(char type, void* sender) {
return false;
}

// Reads and processes what is available in the pipe.
void NgxEventConnection::Drain() {
NgxEventConnection::ReadAndNotify(pipe_read_fd_);
}

void NgxEventConnection::Shutdown() {
close(pipe_write_fd_);
// Drain the pipe, process final events, and shut down.
while (NgxEventConnection::ReadAndNotify(pipe_read_fd_));
close(pipe_read_fd_);
}

} // namespace net_instaweb
2 changes: 2 additions & 0 deletions src/ngx_event_connection.h
Expand Up @@ -64,6 +64,8 @@ class NgxEventConnection {
bool WriteEvent(char type, void* sender);
// Convenience overload for clients that have a single event type.
bool WriteEvent(void* sender);
// Reads and processes what is available in the named pipe's buffer.
void Drain();
private:
static bool CreateNgxConnection(ngx_cycle_t* cycle, ngx_fd_t pipe_fd);
static void ReadEventHandler(ngx_event_t* e);
Expand Down
39 changes: 34 additions & 5 deletions src/ngx_pagespeed.cc
Expand Up @@ -85,6 +85,8 @@ extern ngx_module_t ngx_pagespeed;
// Needed for SystemRewriteDriverFactory to use shared memory.
#define PAGESPEED_SUPPORT_POSIX_SHARED_MEM

net_instaweb::NgxRewriteDriverFactory* active_driver_factory = NULL;

namespace net_instaweb {

const char* kInternalEtagName = "@psol-etag";
Expand Down Expand Up @@ -880,11 +882,12 @@ void ps_cleanup_srv_conf(void* data) {
// to be shut down when we destroy any proxy_fetch_factories. This
// will prevent any queued callbacks to destroyed proxy fetch factories
// from being executed

if (!factory_deleted && cfg_s->server_context != NULL) {
if (active_driver_factory == cfg_s->server_context->factory()) {
active_driver_factory = NULL;
}
delete cfg_s->server_context->factory();
factory_deleted = true;
NgxBaseFetch::Terminate();
}
if (cfg_s->proxy_fetch_factory != NULL) {
delete cfg_s->proxy_fetch_factory;
Expand Down Expand Up @@ -931,12 +934,20 @@ void ps_set_conf_cleanup_handler(
}

void terminate_process_context() {
if (active_driver_factory != NULL) {
// If we got here, that means we are in the cache loader/manager
// or did not get a chance to cleanup otherwise.
delete active_driver_factory;
active_driver_factory = NULL;
NgxBaseFetch::Terminate();
}
delete process_context;
process_context = NULL;
}

void* ps_create_main_conf(ngx_conf_t* cf) {
if (!process_context_cleanup_hooked) {
SystemRewriteDriverFactory::InitApr();
atexit(terminate_process_context);
process_context_cleanup_hooked = true;
}
Expand All @@ -953,6 +964,7 @@ void* ps_create_main_conf(ngx_conf_t* cf) {
new SystemThreadSystem(),
"" /* hostname, not used */,
-1 /* port, not used */);
active_driver_factory = cfg_m->driver_factory;
cfg_m->driver_factory->Init();
ps_set_conf_cleanup_handler(cf, ps_cleanup_main_conf, cfg_m);
return cfg_m;
Expand Down Expand Up @@ -1648,6 +1660,17 @@ ngx_int_t ps_resource_handler(ngx_http_request_t* r,
ps_srv_conf_t* cfg_s = ps_get_srv_config(r);
ps_request_ctx_t* ctx = ps_get_request_context(r);

if (ngx_terminate || ngx_exiting) {
cfg_s->server_context->message_handler()->Message(
kInfo, "ps_resource_handler declining: nginx worker is shutting down");

if (ctx == NULL) {
return NGX_DECLINED;
}
ps_release_base_fetch(ctx);
return NGX_DECLINED;
}

CHECK(!(html_rewrite && (ctx == NULL || ctx->html_rewrite == false)));

if (!html_rewrite &&
Expand Down Expand Up @@ -2967,10 +2990,18 @@ ngx_int_t ps_init_module(ngx_cycle_t* cycle) {
} else {
delete cfg_m->driver_factory;
cfg_m->driver_factory = NULL;
active_driver_factory = NULL;
}
return NGX_OK;
}

void ps_exit_child_process(ngx_cycle_t* cycle) {
ps_main_conf_t* cfg_m = static_cast<ps_main_conf_t*>(
ngx_http_cycle_get_module_main_conf(cycle, ngx_pagespeed));
NgxBaseFetch::Terminate();
cfg_m->driver_factory->ShutDown();
}

// Called when nginx forks worker processes. No threads should be started
// before this.
ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) {
Expand All @@ -2980,7 +3011,6 @@ ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) {
return NGX_OK;
}

SystemRewriteDriverFactory::InitApr();
if (!NgxBaseFetch::Initialize(cycle)) {
return NGX_ERROR;
}
Expand Down Expand Up @@ -3016,7 +3046,6 @@ ngx_int_t ps_init_child_process(ngx_cycle_t* cycle) {
return NGX_ERROR;
}
cfg_m->driver_factory->StartThreads();

return NGX_OK;
}

Expand Down Expand Up @@ -3049,7 +3078,7 @@ ngx_module_t ngx_pagespeed = {
net_instaweb::ps_init_child_process,
NULL,
NULL,
NULL,
net_instaweb::ps_exit_child_process,
NULL,
NGX_MODULE_V1_PADDING
};
10 changes: 9 additions & 1 deletion src/ngx_rewrite_driver_factory.cc
Expand Up @@ -84,7 +84,8 @@ NgxRewriteDriverFactory::NgxRewriteDriverFactory(
hostname_(hostname.as_string()),
port_(port),
process_script_variables_(false),
process_script_variables_set_(false) {
process_script_variables_set_(false),
shut_down_(false) {
InitializeDefaultOptions();
default_options()->set_beacon_url("/ngx_pagespeed_beacon");
SystemRewriteOptions* system_options = dynamic_cast<SystemRewriteOptions*>(
Expand Down Expand Up @@ -188,6 +189,13 @@ ServerContext* NgxRewriteDriverFactory::NewServerContext() {
return NULL;
}

void NgxRewriteDriverFactory::ShutDown() {
if (!shut_down_) {
shut_down_ = true;
SystemRewriteDriverFactory::ShutDown();
}
}

void NgxRewriteDriverFactory::ShutDownMessageHandlers() {
ngx_message_handler_->set_buffer(NULL);
ngx_html_parse_message_handler_->set_buffer(NULL);
Expand Down
2 changes: 2 additions & 0 deletions src/ngx_rewrite_driver_factory.h
Expand Up @@ -72,6 +72,7 @@ class NgxRewriteDriverFactory : public SystemRewriteDriverFactory {
static void InitStats(Statistics* statistics);
NgxServerContext* MakeNgxServerContext(StringPiece hostname, int port);
virtual ServerContext* NewServerContext();
virtual void ShutDown();

// Starts pagespeed threads if they've not been started already. Must be
// called after the caller has finished any forking it intends to do.
Expand Down Expand Up @@ -155,6 +156,7 @@ class NgxRewriteDriverFactory : public SystemRewriteDriverFactory {
int port_;
bool process_script_variables_;
bool process_script_variables_set_;
bool shut_down_;

DISALLOW_COPY_AND_ASSIGN(NgxRewriteDriverFactory);
};
Expand Down

0 comments on commit d7f1c0d

Please sign in to comment.